1
0
mirror of https://github.com/oceanprotocol/ocean.js.git synced 2024-11-26 20:39:05 +01:00

Merge pull request #143 from oceanprotocol/feature/compute-test

Feature/compute test
This commit is contained in:
Ahmed Ali 2020-07-13 12:33:17 +02:00 committed by GitHub
commit 1e3d3eca4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 520 additions and 79 deletions

View File

@ -19,7 +19,7 @@ before_script:
- git clone https://github.com/oceanprotocol/barge
- cd barge
- git checkout v3
- export PROVIDER_VERSION=alex
- export PROVIDER_VERSION=phase2
- bash -x start_ocean.sh --no-dashboard 2>&1 > start_ocean.log &
- cd ..
- sleep 300

View File

@ -33,7 +33,7 @@ export interface ServiceComputeAttributes extends ServiceCommonAttributes {
main: {
creator: string
datePublished: string
price: string
cost: string
timeout: number
provider?: ServiceComputeProvider
name: string
@ -57,7 +57,7 @@ export interface ServiceComputeProvider {
supportedServers: {
serverId: string
serverType: string
price: string
cost: string
cpu: string
gpu: string
memory: string
@ -74,13 +74,11 @@ export interface ServiceMetadata extends ServiceCommon {
export interface ServiceAccess extends ServiceCommon {
type: 'access'
templateId?: string
attributes: ServiceAccessAttributes
}
export interface ServiceCompute extends ServiceCommon {
type: 'compute'
templateId?: string
attributes: ServiceComputeAttributes
}

View File

@ -347,6 +347,20 @@ export class Assets extends Instantiable {
return service
}
public async getServiceByIndex(
did: string,
serviceIndex: number
): Promise<ServiceCommon> {
const services: ServiceCommon[] = (await this.resolve(did)).service
let service
services.forEach((serv) => {
if (serv.index === serviceIndex) {
service = serv
}
})
return service
}
public async createAccessServiceAttributes(
creator: Account,
dtCost: number,
@ -372,12 +386,19 @@ export class Assets extends Instantiable {
public async order(
did: string,
serviceType: string,
consumerAddress: string
consumerAddress: string,
serviceIndex: number = -1
): Promise<string> {
if (serviceIndex === -1) {
const service = await this.getServiceByType(did, serviceType)
serviceIndex = service.index
} else {
const service = await this.getServiceByIndex(did, serviceIndex)
serviceType = service.type
}
return await this.ocean.provider.initialize(
did,
service.index,
serviceIndex,
serviceType,
consumerAddress
)

View File

@ -1,10 +1,16 @@
import { Instantiable, InstantiableConfig } from '../Instantiable.abstract'
import { DDO } from '../ddo/DDO'
import { MetadataAlgorithm } from '../ddo/interfaces/MetadataAlgorithm'
import { Service, ServiceComputePrivacy, ServiceCompute } from '../ddo/interfaces/Service'
import Account from './Account'
import { ServiceComputePrivacy, ServiceCompute } from '../ddo/interfaces/Service'
import { SubscribablePromise } from '../utils'
import { Instantiable, InstantiableConfig } from '../Instantiable.abstract'
import { Output } from './interfaces/ComputeOutput'
import { ComputeJob } from './interfaces/ComputeJob'
export enum OrderProgressStep {
TransferDataToken
}
export const ComputeJobStatus = Object.freeze({
Started: 10,
ConfiguringVolumes: 20,
@ -24,8 +30,8 @@ export const ComputeJobStatus = Object.freeze({
*/
export class Compute extends Instantiable {
/**
* Returns the instance of OceanCompute.
* @return {Promise<OceanCompute>}
* Returns the instance of Compute.
* @return {Promise<Assets>}
*/
public static async getInstance(config: InstantiableConfig): Promise<Compute> {
const instance = new Compute()
@ -34,43 +40,29 @@ export class Compute extends Instantiable {
return instance
}
/**
* Starts an order of a compute service that is defined in an asset's services.
* @param {Account} consumerAccount The account of the consumer ordering the service.
* @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on.
* @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset.
* @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified.
* @return {Promise<string>} Returns a compute job ID.
*
* Note: algorithmDid and algorithmMeta are optional, but if they are not passed,
* you can end up in the situation that you are ordering and paying for your agreement,
* but brizo will not allow the compute, due to privacy settings of the ddo
*/
public order(
consumerAccount: Account,
datasetDid: string,
algorithmDid?: string,
algorithmMeta?: MetadataAlgorithm,
provider?: string
): Promise<any> {
return Promise.resolve('')
}
/**
* Start the execution of a compute job.
* @param {Account} consumerAccount The account of the consumer ordering the service.
* @param {string} did Decentralized identifer for the asset
* @param {string} txId
* @param {string} tokenAddress
* @param {Account} consumerAccount The account of the consumer ordering the service.
* @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset.
* @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified.
* @param {Output} output Define algorithm output publishing. Publishing the result of a compute job is turned off by default.
* @return {Promise<ComputeJob>} Returns compute job ID under status.jobId
*/
public async start(
consumerAccount: Account,
did: string,
txId: string,
tokenAddress: string,
consumerAccount: Account,
algorithmDid?: string,
algorithmMeta?: MetadataAlgorithm,
output?: Output
output?: Output,
serviceIndex?: string,
serviceType?: string,
algorithmTransferTxId?: string,
algorithmDataToken?: string
): Promise<ComputeJob> {
output = this.checkOutput(consumerAccount, output)
if (did) {
@ -81,7 +73,13 @@ export class Compute extends Instantiable {
algorithmDid,
algorithmMeta,
undefined,
output
output,
txId,
serviceIndex,
serviceType,
tokenAddress,
algorithmTransferTxId,
algorithmDataToken
)
return computeJobsList[0] as ComputeJob
} else return null
@ -135,23 +133,6 @@ export class Compute extends Instantiable {
return computeJobsList[0] as ComputeJob
}
/**
* Ends a running compute job and starts it again.
* @param {Account} consumerAccount The account of the consumer ordering the service.
* @param {string} did Decentralized identifier.
* @param {string} jobId The ID of the compute job to be stopped
* @return {Promise<ComputeJob>} Returns the new status of a job
*/
public async restart(
consumerAccount: Account,
did: string,
jobId: string
): Promise<ComputeJob> {
await this.stop(consumerAccount, did, jobId)
const result = await this.start(consumerAccount, did, jobId)
return result
}
/**
* Returns information about the status of all compute jobs, or a single compute job.
* @param {Account} consumerAccount The account of the consumer ordering the service.
@ -200,31 +181,85 @@ export class Compute extends Instantiable {
return computeJobsList[0] as ComputeJob
}
public async createComputeServiceAttributes(
public createServerAttributes(
serverId: string,
serverType: string,
cost: string,
cpu: string,
gpu: string,
memory: string,
disk: string,
maxExecutionTime: number
): object {
return {
serverId,
serverType,
cost,
cpu,
gpu,
memory,
disk,
maxExecutionTime
}
}
public createContainerAttributes(
image: string,
tag: string,
checksum: string
): object {
return { image, tag, checksum }
}
public createClusterAttributes(type: string, url: string): object {
return { type, url }
}
public createProviderAttributes(
type: string,
description: string,
cluster: object,
containers: object[],
servers: object[]
): object {
return {
type,
description,
environment: {
cluster: cluster,
supportedServers: containers,
supportedContainers: servers
}
}
}
public createComputeService(
consumerAccount: Account,
price: string,
cost: string,
datePublished: string,
providerAttributes: object,
computePrivacy?: ServiceComputePrivacy,
timeout?: number
): Promise<ServiceCompute> {
): ServiceCompute {
const name = 'dataAssetComputingService'
if (!timeout) timeout = 3600
// TODO
const service = {
type: 'compute',
index: 3,
serviceEndpoint: this.ocean.provider.getComputeEndpoint(),
attributes: {
main: {
name,
creator: consumerAccount.getId(),
datePublished,
price,
privacy: {},
cost,
timeout: timeout,
name
provider: providerAttributes,
privacy: {}
}
}
}
if (computePrivacy) service.attributes.main.privacy = computePrivacy
return service as ServiceCompute
}
@ -256,4 +291,61 @@ export class Compute extends Instantiable {
owner: output.owner || consumerAccount.getId()
}
}
/**
* Starts an order of a compute service that is defined in an asset's services.
* @param {String} consumerAccount The account of the consumer ordering the service.
* @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on.
* @param {string} serviceIndex The Service index
* @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset.
* @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified.
* @return {Promise<string>} Returns the transaction details
*
* Note: algorithmDid and algorithmMeta are optional, but if they are not passed,
* you can end up in the situation that you are ordering and paying for your compute job,
* but provider will not allow the compute, due to privacy settings of the ddo
*/
public order(
consumerAccount: string,
datasetDid: string,
serviceIndex: number,
algorithmDid?: string,
algorithmMeta?: MetadataAlgorithm
): SubscribablePromise<OrderProgressStep, string> {
return new SubscribablePromise(async (observer) => {
const ddo: DDO = await this.ocean.assets.resolve(datasetDid)
// const service: Service = ddo.findServiceByType('compute')
const service: Service = ddo.findServiceById(serviceIndex)
if (!service) return null
if (service.type !== 'compute') return null
if (algorithmMeta) {
// check if raw algo is allowed
if (service.attributes.main.privacy)
if (!service.attributes.main.privacy.allowRawAlgorithm) {
console.error('This service does not allow Raw Algo')
return null
}
}
if (algorithmDid) {
// check if did is in trusted list
if (service.attributes.main.privacy)
if (service.attributes.main.privacy.trustedAlgorithms)
if (service.attributes.main.privacy.trustedAlgorithms.length > 0)
if (
!service.attributes.main.privacy.trustedAlgorithms.includes(
algorithmDid
)
) {
console.error('This service does not allow this Algo')
return null
}
}
const order = await this.ocean.assets.order(
datasetDid,
service.type,
consumerAccount
)
return order
})
}
}

View File

@ -17,6 +17,7 @@ import {
Instantiable,
generateIntantiableConfigFromConfig
} from '../Instantiable.abstract'
import { Compute } from './Compute'
/**
* Main interface for Ocean Protocol.
@ -47,7 +48,7 @@ export class Ocean extends Instantiable {
instance.accounts = await Accounts.getInstance(instanceConfig)
// instance.auth = await Auth.getInstance(instanceConfig)
instance.assets = await Assets.getInstance(instanceConfig)
// instance.compute = await Compute.getInstance(instanceConfig)
instance.compute = await Compute.getInstance(instanceConfig)
instance.datatokens = new DataTokens(
instanceConfig.config.factoryAddress,
instanceConfig.config.factoryABI,
@ -105,9 +106,8 @@ export class Ocean extends Instantiable {
/**
* Ocean compute submodule
* @type {Compute}
public compute: Compute
*/
public compute: Compute
/**
* Ocean secretStore submodule

View File

@ -60,7 +60,7 @@ export class Provider extends Instantiable {
this.getEncryptEndpoint(),
decodeURI(JSON.stringify(args))
)
return await response.text()
return (await response.json()).encryptedDocument
} catch (e) {
this.logger.error(e)
throw new Error('HTTP request failed')
@ -87,7 +87,6 @@ export class Provider extends Instantiable {
initializeUrl += `&serviceType=${serviceType}`
initializeUrl += `&dataToken=${DDO.dataToken}`
initializeUrl += `&consumerAddress=${consumerAddress}`
try {
const response = await this.ocean.utils.fetch.get(initializeUrl)
return await response.text()
@ -145,10 +144,11 @@ export class Provider extends Instantiable {
txId?: string,
serviceIndex?: string,
serviceType?: string,
tokenAddress?: string
tokenAddress?: string,
algorithmTransferTxId?: string,
algorithmDataToken?: string
): Promise<ComputeJob | ComputeJob[]> {
const address = consumerAccount.getId()
let signatureMessage = address
signatureMessage += jobId || ''
signatureMessage += (did && `${noZeroX(did)}`) || ''
@ -170,10 +170,26 @@ export class Provider extends Instantiable {
url += (jobId && `&jobId=${jobId}`) || ''
url += `&consumerAddress=${address}`
url += `&transferTxId=${txId}` || ''
url +=
(algorithmTransferTxId &&
`&algorithmTransferTxId=${algorithmTransferTxId}`) ||
''
url += (algorithmDataToken && `&algorithmDataToken=${algorithmDataToken}`) || ''
url += `&serviceId=${serviceIndex}` || ''
url += `&serviceType=${serviceType}` || ''
url += `&dataToken=${tokenAddress}` || ''
url += `&consumerAddress=${consumerAccount.getId()}` || ''
// 'signature': signature,
// 'documentId': did,
// 'serviceId': sa.index,
// 'serviceType': sa.type,
// 'consumerAddress': cons_acc.address,
// 'transferTxId': Web3.toHex(tx_id),
// 'dataToken': data_token,
// 'output': build_stage_output_dict(dict(), dataset_ddo_w_compute_service, cons_acc.address, pub_acc),
// 'algorithmDid': alg_ddo.did,
// 'algorithmMeta': {},
// 'algorithmDataToken': alg_data_token
// switch fetch method
let fetch

View File

@ -3,7 +3,12 @@ import { DataTokens } from '../../src/datatokens/Datatokens'
import { Ocean } from '../../src/ocean/Ocean'
import config from './config'
import { assert } from 'console'
import { ComputeJob } from '../../src/ocean/interfaces/ComputeJob'
import {
Service,
ServiceComputePrivacy,
ServiceCompute
} from '../../src/ddo/interfaces/Service'
const Web3 = require('web3')
const web3 = new Web3('http://127.0.0.1:8545')
const factory = require('@oceanprotocol/contracts/artifacts/development/Factory.json')
@ -15,6 +20,9 @@ describe('Marketplace flow', () => {
let ddo
let alice
let asset
let datasetNoRawAlgo
let datasetWithTrustedAlgo
let algorithmAsset
let marketplace
let contracts
let datatoken
@ -22,13 +30,35 @@ describe('Marketplace flow', () => {
let service1
let price
let ocean
let accessService
let computeService
let data
let blob
let jobId
let cluster
let servers
let containers
let provider
const dateCreated = new Date(Date.now()).toISOString().split('.')[0] + 'Z' // remove milliseconds
const marketplaceAllowance = 20
const tokenAmount = 100
const timeout = 86400
const algorithmMeta = {
language: 'js',
format: 'docker-image',
version: '0.1',
url:
'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js',
container: {
entrypoint: 'node $ALGO',
image: 'node',
tag: '10'
}
}
describe('#MarketplaceComputeFlow-Test', () => {
it('Initialize Ocean contracts v3', async () => {
contracts = new TestContractHandler(
@ -60,20 +90,304 @@ describe('Marketplace flow', () => {
assert(tokenAddress != null)
})
// it('Alice publishes dataset with a compute service', async () => {})
it('Generates metadata', async () => {
asset = {
main: {
type: 'dataset',
name: 'UK Weather information 2011',
dateCreated: dateCreated,
author: 'Met Office',
license: 'CC-BY',
files: [
{
url:
'https://raw.githubusercontent.com/tbertinmahieux/MSongsDB/master/Tasks_Demos/CoverSongs/shs_dataset_test.txt',
checksum: 'efb2c764274b745f5fc37f97c6b0e764',
contentLength: '4535431',
contentType: 'text/csv',
encoding: 'UTF-8',
compression: 'zip'
}
]
}
}
})
// it('Alice mints 100 DTs and tranfers them to the compute marketplace', async () => {})
it('Alice publishes dataset with a compute service that allows Raw Algo', async () => {
price = 2 // in datatoken
cluster = ocean.compute.createClusterAttributes(
'Kubernetes',
'http://10.0.0.17/xxx'
)
servers = [
ocean.compute.createServerAttributes(
'1',
'xlsize',
'50',
'16',
'0',
'128gb',
'160gb',
timeout
)
]
containers = [
ocean.compute.createContainerAttributes(
'tensorflow/tensorflow',
'latest',
'sha256:cb57ecfa6ebbefd8ffc7f75c0f00e57a7fa739578a429b6f72a0df19315deadc'
)
]
provider = ocean.compute.createProviderAttributes(
'Azure',
'Compute service with 16gb ram for each node.',
cluster,
containers,
servers
)
const origComputePrivacy = {
allowRawAlgorithm: true,
allowNetworkAccess: false,
trustedAlgorithms: []
}
const computeService = ocean.compute.createComputeService(
alice,
price,
dateCreated,
provider,
origComputePrivacy as ServiceComputePrivacy
)
ddo = await ocean.assets.create(asset, alice, [computeService], tokenAddress)
assert(ddo.dataToken === tokenAddress)
})
// it('Markeplace post compute service for sale', async () => {})
// alex
it('should publish a dataset with a compute service object that does not allow rawAlgo', async () => {
const origComputePrivacy = {
allowRawAlgorithm: false,
allowNetworkAccess: false,
trustedAlgorithms: []
}
// it('Bob buys datatokens from open market and order a compute service', async () => {})
const computeService = ocean.compute.createComputeService(
alice,
'1000',
dateCreated,
provider,
origComputePrivacy as ServiceComputePrivacy
)
datasetNoRawAlgo = await ocean.assets.create(
asset,
alice,
[computeService],
tokenAddress
)
assert(datasetNoRawAlgo.dataToken === tokenAddress)
})
// it('Bob starts compute job', async () => {})
it('should publish a dataset with a compute service object that allows only algo with did:op:1234', async () => {
const origComputePrivacy = {
allowRawAlgorithm: false,
allowNetworkAccess: false,
trustedAlgorithms: ['did:op:1234']
}
// it('Bob gets the compute job status', async () => {})
const computeService = ocean.compute.createComputeService(
alice,
'1000',
dateCreated,
provider,
origComputePrivacy as ServiceComputePrivacy
)
datasetWithTrustedAlgo = await ocean.assets.create(
asset,
alice,
[computeService],
tokenAddress
)
assert(datasetWithTrustedAlgo.dataToken === tokenAddress)
})
it('should publish an algorithm', async () => {
const algoAsset = {
main: {
type: 'algorithm',
name: 'Test Algo',
dateCreated: dateCreated,
author: 'DevOps',
license: 'CC-BY',
files: [
{
url:
'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js',
contentType: 'text/js',
encoding: 'UTF-8'
}
],
algorithm: {
language: 'js',
format: 'docker-image',
version: '0.1',
container: {
entrypoint: 'node $ALGO',
image: 'node',
tag: '10'
}
}
}
}
const service1 = await ocean.assets.createAccessServiceAttributes(
alice,
price,
dateCreated,
0
)
algorithmAsset = await ocean.assets.create(
algoAsset,
alice,
[service1],
tokenAddress
)
assert(algorithmAsset.dataToken === tokenAddress)
})
it('Alice mints 100 DTs and tranfers them to the compute marketplace', async () => {
await datatoken.mint(tokenAddress, alice.getId(), tokenAmount)
})
it('Marketplace posts compute service for sale', async () => {
computeService = await ocean.assets.getServiceByType(ddo.id, 'compute')
assert(computeService.attributes.main.cost === price)
})
it('Bob gets datatokens from Alice to be able to try the compute service', async () => {
const dTamount = 20
await datatoken
.transfer(tokenAddress, bob.getId(), dTamount, alice.getId())
.then(async () => {
const balance = await datatoken.balance(tokenAddress, bob.getId())
assert(balance.toString() === dTamount.toString())
})
})
it('Bob starts compute job with a raw Algo', async () => {
const output = {}
const order = await ocean.compute.order(
bob.getId(),
ddo.id,
computeService.index,
undefined,
algorithmMeta
)
assert(order != null)
const computeOrder = JSON.parse(order)
const tx = await datatoken.transfer(
computeOrder['dataToken'],
computeOrder['to'],
computeOrder['numTokens'],
computeOrder['from']
)
const response = await ocean.compute.start(
ddo.id,
tx.transactionHash,
tokenAddress,
bob,
undefined,
algorithmMeta,
output,
computeService.index,
computeService.type
)
jobId = response.jobId
assert(response.status >= 10)
})
it('Bob should get status of a compute job', async () => {
const response = await ocean.compute.status(bob, ddo.id, jobId)
assert(response[0].jobId === jobId)
})
it('should get status of all compute jobs for an address', async () => {
const response = await ocean.compute.status(bob, undefined, undefined)
assert(response.length > 0)
})
it('Bob should stop compute job', async () => {
await ocean.compute.stop(bob, ddo.id, jobId)
const response = await ocean.compute.status(bob, ddo.id, jobId)
assert(response[0].stopreq === 1)
})
it('should not allow order the compute service with raw algo for dataset that does not allow raw algo', async () => {
const service1 = datasetNoRawAlgo.findServiceByType('compute')
assert(service1 !== null)
const order = await ocean.compute.order(
bob.getId(),
datasetNoRawAlgo.id,
service1.index,
undefined,
algorithmMeta
)
assert(order === null)
})
it('should not allow order the compute service with algoDid != "did:op:1234" for dataset that allows only "did:op:1234" as algo', async () => {
const service1 = datasetWithTrustedAlgo.findServiceByType('compute')
assert(service1 !== null)
const order = await ocean.compute.order(
bob.getId(),
datasetWithTrustedAlgo.id,
service1.index,
'did:op:77777',
undefined
)
assert(order === null)
})
it('should start a compute job with a published algo', async () => {
const output = {}
const serviceAlgo = algorithmAsset.findServiceByType('access')
const orderalgo = await ocean.assets.order(
algorithmAsset.id,
serviceAlgo.type,
bob.getId()
)
const algoOrder = JSON.parse(orderalgo)
const algoTx = await datatoken.transfer(
algoOrder['dataToken'],
algoOrder['to'],
algoOrder['numTokens'],
algoOrder['from']
)
const order = await ocean.compute.order(
bob.getId(),
ddo.id,
computeService.index,
algorithmAsset.id,
undefined
)
assert(order != null)
const computeOrder = JSON.parse(order)
const tx = await datatoken.transfer(
computeOrder['dataToken'],
computeOrder['to'],
computeOrder['numTokens'],
computeOrder['from']
)
const response = await ocean.compute.start(
ddo.id,
tx.transactionHash,
tokenAddress,
bob,
algorithmAsset.id,
undefined,
output,
computeService.index,
computeService.type,
algoTx.transactionHash,
algorithmAsset.dataToken
)
jobId = response.jobId
assert(response.status >= 10)
})
// it('Bob restarts compute job', async () => {})
// it('Bob gets outputs', async () => {})
})
})