From 5bad5fa9ecd5835ca7c4791fe6d36fada3973cf2 Mon Sep 17 00:00:00 2001 From: Matthias Kretschmann Date: Tue, 21 Jan 2020 12:45:50 +0100 Subject: [PATCH] new ocean.compute interface --- src/brizo/Brizo.ts | 63 ++++++++---- src/ocean/OceanAssets.ts | 2 +- src/ocean/OceanCompute.ts | 136 ++++++++++++++++++++++--- src/ocean/utils/WebServiceConnector.ts | 9 ++ 4 files changed, 173 insertions(+), 37 deletions(-) diff --git a/src/brizo/Brizo.ts b/src/brizo/Brizo.ts index 771b72d..078fae8 100644 --- a/src/brizo/Brizo.ts +++ b/src/brizo/Brizo.ts @@ -36,7 +36,7 @@ export class Brizo extends Instantiable { } public getComputeEndpoint() { - return `${this.url}${apiPath}/exec` + return `${this.url}${apiPath}/compute` } public async initializeServiceAgreement( @@ -96,45 +96,64 @@ export class Brizo extends Instantiable { } public async computeService( - agreementId: string, - serviceEndpoint: string, - account: Account, - algorithmDid: string, - algorithm: string, - algorithmMeta?: MetaData - ): Promise { - const signature = await this.createSignature(account, agreementId) + method: string, + serviceAgreementId: string, + consumerAccount: Account, + algorithmDid?: string, + algorithmMeta?: MetaData, + jobId?: string + ): Promise { + const signature = await this.createSignature(consumerAccount, serviceAgreementId) + const address = consumerAccount.getId() - let url = serviceEndpoint + // construct Brizo URL + let url = this.getComputeEndpoint() url += `&signature=${signature}` - url += `&serviceAgreementId=${noZeroX(agreementId)}` - url += `&consumerAddress=${account.getId()}` - url += `&algorithmDID=${algorithmDid}` - url += `&algorithm=${algorithm}` - url += `&algorithmMeta=${algorithmMeta}` + url += `&consumerAddress=${address}` + url += `&serviceAgreementId=${noZeroX(serviceAgreementId)}` + url += algorithmDid && `&algorithmDid=${algorithmDid}` + url += algorithmMeta && `&algorithmMeta=${algorithmMeta}` + url += jobId && `&jobId=${jobId}` - const result: { jobId: string } = await this.ocean.utils.fetch - .post(url, '') + // switch fetch method + let fetch + + switch (method) { + case 'post': + fetch = this.ocean.utils.fetch.post(url, '') + break + case 'put': + fetch = this.ocean.utils.fetch.put(url, '') + break + case 'delete': + fetch = this.ocean.utils.fetch.delete(url) + break + default: + fetch = this.ocean.utils.fetch.get(url) + break + } + + const result = await fetch .then((response: any) => { if (response.ok) { return response.json() } this.logger.error( - 'Executing compute job failed:', + 'Compute job failed:', response.status, response.statusText ) return null }) - .catch(error => { - this.logger.error('Error executing compute job') - this.logger.error(error) + .catch((error: Error) => { + this.logger.error('Error with compute job') + this.logger.error(error.message) throw error }) - return result.jobId + return result } public async createSignature(account: Account, agreementId: string): Promise { diff --git a/src/ocean/OceanAssets.ts b/src/ocean/OceanAssets.ts index ec38e2f..c3a7242 100644 --- a/src/ocean/OceanAssets.ts +++ b/src/ocean/OceanAssets.ts @@ -367,7 +367,7 @@ export class OceanAssets extends Instantiable { /** * Returns the owner of a asset. * @param {string} did Decentralized ID. - * @return {Promise} Returns Agreement ID + * @return {Promise} Returns Account ID */ public async owner(did: string): Promise { const ddo = await this.resolve(did) diff --git a/src/ocean/OceanCompute.ts b/src/ocean/OceanCompute.ts index d159a85..3d22611 100644 --- a/src/ocean/OceanCompute.ts +++ b/src/ocean/OceanCompute.ts @@ -3,6 +3,22 @@ import { MetaData } from '../ddo/MetaData' import Account from './Account' import { DDO } from '../ddo/DDO' +export interface ComputeJobStatus { + owner: string + agreementId: string + jobId: string + dateCreated: string + dateFinished: string + status: boolean + statusText: string + configlogUrl: string + publishlogUrl: string + algologUrl: string + outputsUrl: string[] + ddo?: DDO + did?: string +} + /** * Compute submodule of Ocean Protocol. */ @@ -18,38 +34,130 @@ export class OceanCompute extends Instantiable { return instance } + /** + * Starts an order of a compute service that is defined in an asset's services. + * @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on. + * @param {Account} consumerAccount The account of the consumer ordering the service. + * @return {Promise} Returns The service agreement ID, representation of `bytes32` ID. + */ + public async order(datasetDid: string, consumerAccount: Account): Promise { + const ddo: DDO = await this.ocean.assets.resolve(datasetDid) + const { index } = ddo.findServiceByType('compute') + + const agreementId = await this.ocean.assets.order( + datasetDid, + index, + consumerAccount + ) + + return agreementId + } + /** * Start the execution of a compute job. * @param {string} agreementId The service agreement ID. - * @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on. - * @param {number} serviceIndex ID of the compute service within the dataset DDO. * @param {Account} consumerAccount The account of the consumer ordering the service. * @param {string} algorithmDid The DID of the algorithm asset (of type `algorithm`) to run on the asset. - * @param {string} algorithmRaw The raw text of the algorithm to run in the compute job (e.g. a jupyter notebook) or a valid URL to fetch the algorithm. * @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDid` is specified. * @return {Promise} Returns compute job ID */ - public async run( + public async start( agreementId: string, - datasetDid: string, - serviceIndex: number, consumerAccount: Account, - algorithmDid: string, - algorithmRaw?: string, + algorithmDid?: string, algorithmMeta?: MetaData ): Promise { - const ddo: DDO = await this.ocean.assets.resolve(datasetDid) - const { serviceEndpoint } = ddo.findServiceById(serviceIndex) - - const jobId = await this.ocean.brizo.computeService( + const { jobId } = await this.ocean.brizo.computeService( + 'post', agreementId, - serviceEndpoint, consumerAccount, algorithmDid, - algorithmRaw, algorithmMeta ) return jobId } + + /** + * Ends a running compute job. + * @param {string} agreementId The service agreement ID. + * @param {Account} consumerAccount The account of the consumer ordering the service. + * @param {string} jobId The ID of the compute job to be stopped + * @return {Promise} Returns the new status of a job + */ + public async stop( + agreementId: string, + consumerAccount: Account, + jobId: string + ): Promise { + const status = await this.ocean.brizo.computeService( + 'put', + agreementId, + consumerAccount, + jobId + ) + + return status + } + + /** + * Deletes a compute job and all resources associated with the job. If job is running it will be stopped first. + * @param {string} agreementId The service agreement ID. + * @param {Account} consumerAccount The account of the consumer ordering the service. + * @param {string} jobId The ID of the compute job to be stopped + * @return {Promise} Returns the new status of a job + */ + public async delete( + agreementId: string, + consumerAccount: Account, + jobId: string + ): Promise { + const status = await this.ocean.brizo.computeService( + 'delete', + agreementId, + consumerAccount, + jobId + ) + + return status + } + + /** + * Ends a running compute job and starts it again. + * @param {string} agreementId The service agreement ID. + * @param {Account} consumerAccount The account of the consumer ordering the service. + * @param {string} jobId The ID of the compute job to be stopped + * @return {Promise} Returns the new status of a job + */ + public async restart( + agreementId: string, + jobId: string, + consumerAccount: Account + ): Promise { + await this.stop(agreementId, consumerAccount, jobId) + const result = await this.start(agreementId, consumerAccount, jobId) + return result + } + + /** + * Returns information about the status of all compute jobs, or a single compute job. + * @param {Account} consumerAccount The account of the consumer ordering the service. + * @param {string} agreementId The service agreement ID. + * @param {string} jobId The ID of the compute job to be stopped + * @return {Promise} Returns the status + */ + public async status( + consumerAccount: Account, + agreementId?: string, + jobId?: string + ): Promise { + const status = await this.ocean.brizo.computeService( + 'get', + agreementId, + consumerAccount, + jobId + ) + + return status + } } diff --git a/src/ocean/utils/WebServiceConnector.ts b/src/ocean/utils/WebServiceConnector.ts index 37a07b9..001322d 100644 --- a/src/ocean/utils/WebServiceConnector.ts +++ b/src/ocean/utils/WebServiceConnector.ts @@ -43,6 +43,15 @@ export class WebServiceConnector extends Instantiable { }) } + public delete(url: string): Promise { + return this.fetch(url, { + method: 'DELETE', + headers: { + 'Content-type': 'application/json' + } + }) + } + public async downloadFile( url: string, destination?: string,