diff --git a/.travis.yml b/.travis.yml index 63b820c1..0e670ddc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,7 @@ before_script: - git clone https://github.com/oceanprotocol/barge - cd barge - git checkout v3 - - export PROVIDER_VERSION=alex + - export PROVIDER_VERSION=phase2 - bash -x start_ocean.sh --no-dashboard 2>&1 > start_ocean.log & - cd .. - sleep 300 @@ -39,4 +39,4 @@ deploy: api_key: ${NPM_TOKEN} skip_cleanup: true on: - tags: true \ No newline at end of file + tags: true diff --git a/src/ddo/interfaces/Service.ts b/src/ddo/interfaces/Service.ts index c60d7c05..6e69c86b 100644 --- a/src/ddo/interfaces/Service.ts +++ b/src/ddo/interfaces/Service.ts @@ -33,7 +33,7 @@ export interface ServiceComputeAttributes extends ServiceCommonAttributes { main: { creator: string datePublished: string - price: string + cost: string timeout: number provider?: ServiceComputeProvider name: string @@ -57,7 +57,7 @@ export interface ServiceComputeProvider { supportedServers: { serverId: string serverType: string - price: string + cost: string cpu: string gpu: string memory: string @@ -74,13 +74,11 @@ export interface ServiceMetadata extends ServiceCommon { export interface ServiceAccess extends ServiceCommon { type: 'access' - templateId?: string attributes: ServiceAccessAttributes } export interface ServiceCompute extends ServiceCommon { type: 'compute' - templateId?: string attributes: ServiceComputeAttributes } diff --git a/src/ocean/Assets.ts b/src/ocean/Assets.ts index 6dd9ab40..daaa29fe 100644 --- a/src/ocean/Assets.ts +++ b/src/ocean/Assets.ts @@ -347,6 +347,20 @@ export class Assets extends Instantiable { return service } + public async getServiceByIndex( + did: string, + serviceIndex: number + ): Promise { + const services: ServiceCommon[] = (await this.resolve(did)).service + let service + services.forEach((serv) => { + if (serv.index === serviceIndex) { + service = serv + } + }) + return service + } + public async createAccessServiceAttributes( creator: Account, dtCost: number, @@ -372,12 +386,19 @@ export class Assets extends Instantiable { public async order( did: string, serviceType: string, - consumerAddress: string + consumerAddress: string, + serviceIndex: number = -1 ): Promise { - const service = await this.getServiceByType(did, serviceType) + if (serviceIndex === -1) { + const service = await this.getServiceByType(did, serviceType) + serviceIndex = service.index + } else { + const service = await this.getServiceByIndex(did, serviceIndex) + serviceType = service.type + } return await this.ocean.provider.initialize( did, - service.index, + serviceIndex, serviceType, consumerAddress ) diff --git a/src/ocean/Compute.ts b/src/ocean/Compute.ts index 19d2b69a..d3836477 100644 --- a/src/ocean/Compute.ts +++ b/src/ocean/Compute.ts @@ -1,10 +1,16 @@ -import { Instantiable, InstantiableConfig } from '../Instantiable.abstract' +import { DDO } from '../ddo/DDO' import { MetadataAlgorithm } from '../ddo/interfaces/MetadataAlgorithm' +import { Service, ServiceComputePrivacy, ServiceCompute } from '../ddo/interfaces/Service' import Account from './Account' -import { ServiceComputePrivacy, ServiceCompute } from '../ddo/interfaces/Service' +import { SubscribablePromise } from '../utils' +import { Instantiable, InstantiableConfig } from '../Instantiable.abstract' import { Output } from './interfaces/ComputeOutput' import { ComputeJob } from './interfaces/ComputeJob' +export enum OrderProgressStep { + TransferDataToken +} + export const ComputeJobStatus = Object.freeze({ Started: 10, ConfiguringVolumes: 20, @@ -24,8 +30,8 @@ export const ComputeJobStatus = Object.freeze({ */ export class Compute extends Instantiable { /** - * Returns the instance of OceanCompute. - * @return {Promise} + * Returns the instance of Compute. + * @return {Promise} */ public static async getInstance(config: InstantiableConfig): Promise { const instance = new Compute() @@ -34,43 +40,29 @@ export class Compute extends Instantiable { return instance } - /** - * Starts an order of a compute service that is defined in an asset's services. - * @param {Account} consumerAccount The account of the consumer ordering the service. - * @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on. - * @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset. - * @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified. - * @return {Promise} Returns a compute job ID. - * - * Note: algorithmDid and algorithmMeta are optional, but if they are not passed, - * you can end up in the situation that you are ordering and paying for your agreement, - * but brizo will not allow the compute, due to privacy settings of the ddo - */ - public order( - consumerAccount: Account, - datasetDid: string, - algorithmDid?: string, - algorithmMeta?: MetadataAlgorithm, - provider?: string - ): Promise { - return Promise.resolve('') - } - /** * Start the execution of a compute job. - * @param {Account} consumerAccount The account of the consumer ordering the service. * @param {string} did Decentralized identifer for the asset + * @param {string} txId + * @param {string} tokenAddress + * @param {Account} consumerAccount The account of the consumer ordering the service. * @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset. * @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified. * @param {Output} output Define algorithm output publishing. Publishing the result of a compute job is turned off by default. * @return {Promise} Returns compute job ID under status.jobId */ public async start( - consumerAccount: Account, did: string, + txId: string, + tokenAddress: string, + consumerAccount: Account, algorithmDid?: string, algorithmMeta?: MetadataAlgorithm, - output?: Output + output?: Output, + serviceIndex?: string, + serviceType?: string, + algorithmTransferTxId?: string, + algorithmDataToken?: string ): Promise { output = this.checkOutput(consumerAccount, output) if (did) { @@ -81,7 +73,13 @@ export class Compute extends Instantiable { algorithmDid, algorithmMeta, undefined, - output + output, + txId, + serviceIndex, + serviceType, + tokenAddress, + algorithmTransferTxId, + algorithmDataToken ) return computeJobsList[0] as ComputeJob } else return null @@ -135,23 +133,6 @@ export class Compute extends Instantiable { return computeJobsList[0] as ComputeJob } - /** - * Ends a running compute job and starts it again. - * @param {Account} consumerAccount The account of the consumer ordering the service. - * @param {string} did Decentralized identifier. - * @param {string} jobId The ID of the compute job to be stopped - * @return {Promise} Returns the new status of a job - */ - public async restart( - consumerAccount: Account, - did: string, - jobId: string - ): Promise { - await this.stop(consumerAccount, did, jobId) - const result = await this.start(consumerAccount, did, jobId) - return result - } - /** * Returns information about the status of all compute jobs, or a single compute job. * @param {Account} consumerAccount The account of the consumer ordering the service. @@ -200,31 +181,85 @@ export class Compute extends Instantiable { return computeJobsList[0] as ComputeJob } - public async createComputeServiceAttributes( + public createServerAttributes( + serverId: string, + serverType: string, + cost: string, + cpu: string, + gpu: string, + memory: string, + disk: string, + maxExecutionTime: number + ): object { + return { + serverId, + serverType, + cost, + cpu, + gpu, + memory, + disk, + maxExecutionTime + } + } + + public createContainerAttributes( + image: string, + tag: string, + checksum: string + ): object { + return { image, tag, checksum } + } + + public createClusterAttributes(type: string, url: string): object { + return { type, url } + } + + public createProviderAttributes( + type: string, + description: string, + cluster: object, + containers: object[], + servers: object[] + ): object { + return { + type, + description, + environment: { + cluster: cluster, + supportedServers: containers, + supportedContainers: servers + } + } + } + + public createComputeService( consumerAccount: Account, - price: string, + cost: string, datePublished: string, + providerAttributes: object, computePrivacy?: ServiceComputePrivacy, timeout?: number - ): Promise { + ): ServiceCompute { const name = 'dataAssetComputingService' if (!timeout) timeout = 3600 - // TODO const service = { type: 'compute', index: 3, serviceEndpoint: this.ocean.provider.getComputeEndpoint(), attributes: { main: { + name, creator: consumerAccount.getId(), datePublished, - price, - privacy: {}, + cost, timeout: timeout, - name + provider: providerAttributes, + privacy: {} } } } + if (computePrivacy) service.attributes.main.privacy = computePrivacy return service as ServiceCompute } @@ -256,4 +291,61 @@ export class Compute extends Instantiable { owner: output.owner || consumerAccount.getId() } } + + /** + * Starts an order of a compute service that is defined in an asset's services. + * @param {String} consumerAccount The account of the consumer ordering the service. + * @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on. + * @param {string} serviceIndex The Service index + * @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset. + * @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified. + * @return {Promise} Returns the transaction details + * + * Note: algorithmDid and algorithmMeta are optional, but if they are not passed, + * you can end up in the situation that you are ordering and paying for your compute job, + * but provider will not allow the compute, due to privacy settings of the ddo + */ + public order( + consumerAccount: string, + datasetDid: string, + serviceIndex: number, + algorithmDid?: string, + algorithmMeta?: MetadataAlgorithm + ): SubscribablePromise { + return new SubscribablePromise(async (observer) => { + const ddo: DDO = await this.ocean.assets.resolve(datasetDid) + // const service: Service = ddo.findServiceByType('compute') + const service: Service = ddo.findServiceById(serviceIndex) + if (!service) return null + if (service.type !== 'compute') return null + if (algorithmMeta) { + // check if raw algo is allowed + if (service.attributes.main.privacy) + if (!service.attributes.main.privacy.allowRawAlgorithm) { + console.error('This service does not allow Raw Algo') + return null + } + } + if (algorithmDid) { + // check if did is in trusted list + if (service.attributes.main.privacy) + if (service.attributes.main.privacy.trustedAlgorithms) + if (service.attributes.main.privacy.trustedAlgorithms.length > 0) + if ( + !service.attributes.main.privacy.trustedAlgorithms.includes( + algorithmDid + ) + ) { + console.error('This service does not allow this Algo') + return null + } + } + const order = await this.ocean.assets.order( + datasetDid, + service.type, + consumerAccount + ) + return order + }) + } } diff --git a/src/ocean/Ocean.ts b/src/ocean/Ocean.ts index e52f95a8..6a2cafc9 100644 --- a/src/ocean/Ocean.ts +++ b/src/ocean/Ocean.ts @@ -17,6 +17,7 @@ import { Instantiable, generateIntantiableConfigFromConfig } from '../Instantiable.abstract' +import { Compute } from './Compute' /** * Main interface for Ocean Protocol. @@ -47,7 +48,7 @@ export class Ocean extends Instantiable { instance.accounts = await Accounts.getInstance(instanceConfig) // instance.auth = await Auth.getInstance(instanceConfig) instance.assets = await Assets.getInstance(instanceConfig) - // instance.compute = await Compute.getInstance(instanceConfig) + instance.compute = await Compute.getInstance(instanceConfig) instance.datatokens = new DataTokens( instanceConfig.config.factoryAddress, instanceConfig.config.factoryABI, @@ -105,9 +106,8 @@ export class Ocean extends Instantiable { /** * Ocean compute submodule * @type {Compute} - + */ public compute: Compute - */ /** * Ocean secretStore submodule diff --git a/src/provider/Provider.ts b/src/provider/Provider.ts index 9c14a763..ddbec903 100644 --- a/src/provider/Provider.ts +++ b/src/provider/Provider.ts @@ -60,7 +60,7 @@ export class Provider extends Instantiable { this.getEncryptEndpoint(), decodeURI(JSON.stringify(args)) ) - return await response.text() + return (await response.json()).encryptedDocument } catch (e) { this.logger.error(e) throw new Error('HTTP request failed') @@ -87,7 +87,6 @@ export class Provider extends Instantiable { initializeUrl += `&serviceType=${serviceType}` initializeUrl += `&dataToken=${DDO.dataToken}` initializeUrl += `&consumerAddress=${consumerAddress}` - try { const response = await this.ocean.utils.fetch.get(initializeUrl) return await response.text() @@ -145,10 +144,11 @@ export class Provider extends Instantiable { txId?: string, serviceIndex?: string, serviceType?: string, - tokenAddress?: string + tokenAddress?: string, + algorithmTransferTxId?: string, + algorithmDataToken?: string ): Promise { const address = consumerAccount.getId() - let signatureMessage = address signatureMessage += jobId || '' signatureMessage += (did && `${noZeroX(did)}`) || '' @@ -170,10 +170,26 @@ export class Provider extends Instantiable { url += (jobId && `&jobId=${jobId}`) || '' url += `&consumerAddress=${address}` url += `&transferTxId=${txId}` || '' + url += + (algorithmTransferTxId && + `&algorithmTransferTxId=${algorithmTransferTxId}`) || + '' + url += (algorithmDataToken && `&algorithmDataToken=${algorithmDataToken}`) || '' url += `&serviceId=${serviceIndex}` || '' url += `&serviceType=${serviceType}` || '' url += `&dataToken=${tokenAddress}` || '' url += `&consumerAddress=${consumerAccount.getId()}` || '' + // 'signature': signature, + // 'documentId': did, + // 'serviceId': sa.index, + // 'serviceType': sa.type, + // 'consumerAddress': cons_acc.address, + // 'transferTxId': Web3.toHex(tx_id), + // 'dataToken': data_token, + // 'output': build_stage_output_dict(dict(), dataset_ddo_w_compute_service, cons_acc.address, pub_acc), + // 'algorithmDid': alg_ddo.did, + // 'algorithmMeta': {}, + // 'algorithmDataToken': alg_data_token // switch fetch method let fetch diff --git a/test/integration/ComputeFlow.test.ts b/test/integration/ComputeFlow.test.ts index 33bd3ed8..f48b5707 100644 --- a/test/integration/ComputeFlow.test.ts +++ b/test/integration/ComputeFlow.test.ts @@ -3,7 +3,12 @@ import { DataTokens } from '../../src/datatokens/Datatokens' import { Ocean } from '../../src/ocean/Ocean' import config from './config' import { assert } from 'console' - +import { ComputeJob } from '../../src/ocean/interfaces/ComputeJob' +import { + Service, + ServiceComputePrivacy, + ServiceCompute +} from '../../src/ddo/interfaces/Service' const Web3 = require('web3') const web3 = new Web3('http://127.0.0.1:8545') const factory = require('@oceanprotocol/contracts/artifacts/development/Factory.json') @@ -15,6 +20,9 @@ describe('Marketplace flow', () => { let ddo let alice let asset + let datasetNoRawAlgo + let datasetWithTrustedAlgo + let algorithmAsset let marketplace let contracts let datatoken @@ -22,13 +30,35 @@ describe('Marketplace flow', () => { let service1 let price let ocean - let accessService + let computeService let data let blob + let jobId + + let cluster + let servers + let containers + let provider + + const dateCreated = new Date(Date.now()).toISOString().split('.')[0] + 'Z' // remove milliseconds const marketplaceAllowance = 20 const tokenAmount = 100 + const timeout = 86400 + const algorithmMeta = { + language: 'js', + format: 'docker-image', + version: '0.1', + url: + 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', + container: { + entrypoint: 'node $ALGO', + image: 'node', + tag: '10' + } + } + describe('#MarketplaceComputeFlow-Test', () => { it('Initialize Ocean contracts v3', async () => { contracts = new TestContractHandler( @@ -60,20 +90,304 @@ describe('Marketplace flow', () => { assert(tokenAddress != null) }) - // it('Alice publishes dataset with a compute service', async () => {}) + it('Generates metadata', async () => { + asset = { + main: { + type: 'dataset', + name: 'UK Weather information 2011', + dateCreated: dateCreated, + author: 'Met Office', + license: 'CC-BY', + files: [ + { + url: + 'https://raw.githubusercontent.com/tbertinmahieux/MSongsDB/master/Tasks_Demos/CoverSongs/shs_dataset_test.txt', + checksum: 'efb2c764274b745f5fc37f97c6b0e764', + contentLength: '4535431', + contentType: 'text/csv', + encoding: 'UTF-8', + compression: 'zip' + } + ] + } + } + }) - // it('Alice mints 100 DTs and tranfers them to the compute marketplace', async () => {}) + it('Alice publishes dataset with a compute service that allows Raw Algo', async () => { + price = 2 // in datatoken + cluster = ocean.compute.createClusterAttributes( + 'Kubernetes', + 'http://10.0.0.17/xxx' + ) + servers = [ + ocean.compute.createServerAttributes( + '1', + 'xlsize', + '50', + '16', + '0', + '128gb', + '160gb', + timeout + ) + ] + containers = [ + ocean.compute.createContainerAttributes( + 'tensorflow/tensorflow', + 'latest', + 'sha256:cb57ecfa6ebbefd8ffc7f75c0f00e57a7fa739578a429b6f72a0df19315deadc' + ) + ] + provider = ocean.compute.createProviderAttributes( + 'Azure', + 'Compute service with 16gb ram for each node.', + cluster, + containers, + servers + ) + const origComputePrivacy = { + allowRawAlgorithm: true, + allowNetworkAccess: false, + trustedAlgorithms: [] + } + const computeService = ocean.compute.createComputeService( + alice, + price, + dateCreated, + provider, + origComputePrivacy as ServiceComputePrivacy + ) + ddo = await ocean.assets.create(asset, alice, [computeService], tokenAddress) + assert(ddo.dataToken === tokenAddress) + }) - // it('Markeplace post compute service for sale', async () => {}) + // alex + it('should publish a dataset with a compute service object that does not allow rawAlgo', async () => { + const origComputePrivacy = { + allowRawAlgorithm: false, + allowNetworkAccess: false, + trustedAlgorithms: [] + } - // it('Bob buys datatokens from open market and order a compute service', async () => {}) + const computeService = ocean.compute.createComputeService( + alice, + '1000', + dateCreated, + provider, + origComputePrivacy as ServiceComputePrivacy + ) + datasetNoRawAlgo = await ocean.assets.create( + asset, + alice, + [computeService], + tokenAddress + ) + assert(datasetNoRawAlgo.dataToken === tokenAddress) + }) - // it('Bob starts compute job', async () => {}) + it('should publish a dataset with a compute service object that allows only algo with did:op:1234', async () => { + const origComputePrivacy = { + allowRawAlgorithm: false, + allowNetworkAccess: false, + trustedAlgorithms: ['did:op:1234'] + } - // it('Bob gets the compute job status', async () => {}) + const computeService = ocean.compute.createComputeService( + alice, + '1000', + dateCreated, + provider, + origComputePrivacy as ServiceComputePrivacy + ) + datasetWithTrustedAlgo = await ocean.assets.create( + asset, + alice, + [computeService], + tokenAddress + ) + assert(datasetWithTrustedAlgo.dataToken === tokenAddress) + }) + + it('should publish an algorithm', async () => { + const algoAsset = { + main: { + type: 'algorithm', + name: 'Test Algo', + dateCreated: dateCreated, + author: 'DevOps', + license: 'CC-BY', + files: [ + { + url: + 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', + contentType: 'text/js', + encoding: 'UTF-8' + } + ], + algorithm: { + language: 'js', + format: 'docker-image', + version: '0.1', + container: { + entrypoint: 'node $ALGO', + image: 'node', + tag: '10' + } + } + } + } + const service1 = await ocean.assets.createAccessServiceAttributes( + alice, + price, + dateCreated, + 0 + ) + algorithmAsset = await ocean.assets.create( + algoAsset, + alice, + [service1], + tokenAddress + ) + assert(algorithmAsset.dataToken === tokenAddress) + }) + + it('Alice mints 100 DTs and tranfers them to the compute marketplace', async () => { + await datatoken.mint(tokenAddress, alice.getId(), tokenAmount) + }) + + it('Marketplace posts compute service for sale', async () => { + computeService = await ocean.assets.getServiceByType(ddo.id, 'compute') + assert(computeService.attributes.main.cost === price) + }) + + it('Bob gets datatokens from Alice to be able to try the compute service', async () => { + const dTamount = 20 + await datatoken + .transfer(tokenAddress, bob.getId(), dTamount, alice.getId()) + .then(async () => { + const balance = await datatoken.balance(tokenAddress, bob.getId()) + assert(balance.toString() === dTamount.toString()) + }) + }) + + it('Bob starts compute job with a raw Algo', async () => { + const output = {} + const order = await ocean.compute.order( + bob.getId(), + ddo.id, + computeService.index, + undefined, + algorithmMeta + ) + assert(order != null) + const computeOrder = JSON.parse(order) + const tx = await datatoken.transfer( + computeOrder['dataToken'], + computeOrder['to'], + computeOrder['numTokens'], + computeOrder['from'] + ) + const response = await ocean.compute.start( + ddo.id, + tx.transactionHash, + tokenAddress, + bob, + undefined, + algorithmMeta, + output, + computeService.index, + computeService.type + ) + jobId = response.jobId + assert(response.status >= 10) + }) + + it('Bob should get status of a compute job', async () => { + const response = await ocean.compute.status(bob, ddo.id, jobId) + assert(response[0].jobId === jobId) + }) + + it('should get status of all compute jobs for an address', async () => { + const response = await ocean.compute.status(bob, undefined, undefined) + assert(response.length > 0) + }) + it('Bob should stop compute job', async () => { + await ocean.compute.stop(bob, ddo.id, jobId) + const response = await ocean.compute.status(bob, ddo.id, jobId) + assert(response[0].stopreq === 1) + }) + it('should not allow order the compute service with raw algo for dataset that does not allow raw algo', async () => { + const service1 = datasetNoRawAlgo.findServiceByType('compute') + assert(service1 !== null) + const order = await ocean.compute.order( + bob.getId(), + datasetNoRawAlgo.id, + service1.index, + undefined, + algorithmMeta + ) + assert(order === null) + }) + it('should not allow order the compute service with algoDid != "did:op:1234" for dataset that allows only "did:op:1234" as algo', async () => { + const service1 = datasetWithTrustedAlgo.findServiceByType('compute') + assert(service1 !== null) + const order = await ocean.compute.order( + bob.getId(), + datasetWithTrustedAlgo.id, + service1.index, + 'did:op:77777', + undefined + ) + assert(order === null) + }) + it('should start a compute job with a published algo', async () => { + const output = {} + const serviceAlgo = algorithmAsset.findServiceByType('access') + const orderalgo = await ocean.assets.order( + algorithmAsset.id, + serviceAlgo.type, + bob.getId() + ) + const algoOrder = JSON.parse(orderalgo) + const algoTx = await datatoken.transfer( + algoOrder['dataToken'], + algoOrder['to'], + algoOrder['numTokens'], + algoOrder['from'] + ) + const order = await ocean.compute.order( + bob.getId(), + ddo.id, + computeService.index, + algorithmAsset.id, + undefined + ) + assert(order != null) + const computeOrder = JSON.parse(order) + const tx = await datatoken.transfer( + computeOrder['dataToken'], + computeOrder['to'], + computeOrder['numTokens'], + computeOrder['from'] + ) + const response = await ocean.compute.start( + ddo.id, + tx.transactionHash, + tokenAddress, + bob, + algorithmAsset.id, + undefined, + output, + computeService.index, + computeService.type, + algoTx.transactionHash, + algorithmAsset.dataToken + ) + jobId = response.jobId + assert(response.status >= 10) + }) // it('Bob restarts compute job', async () => {}) - // it('Bob gets outputs', async () => {}) }) })