diff --git a/src/ocean/OceanAssets.ts b/src/ocean/OceanAssets.ts index c3a7242..9193500 100644 --- a/src/ocean/OceanAssets.ts +++ b/src/ocean/OceanAssets.ts @@ -365,7 +365,7 @@ export class OceanAssets extends Instantiable { } /** - * Returns the owner of a asset. + * Returns the owner of an asset. * @param {string} did Decentralized ID. * @return {Promise} Returns Account ID */ diff --git a/src/ocean/OceanCompute.ts b/src/ocean/OceanCompute.ts index 8060460..9eb8860 100644 --- a/src/ocean/OceanCompute.ts +++ b/src/ocean/OceanCompute.ts @@ -2,6 +2,8 @@ import { Instantiable, InstantiableConfig } from '../Instantiable.abstract' import { MetaData } from '../ddo/MetaData' import Account from './Account' import { DDO } from '../ddo/DDO' +import { SubscribablePromise, generateId, zeroX } from '../utils' +import { OrderProgressStep } from './OceanAssets' export interface ComputeJobStatus { owner: string @@ -38,19 +40,79 @@ export class OceanCompute extends Instantiable { * Starts an order of a compute service that is defined in an asset's services. * @param {Account} consumerAccount The account of the consumer ordering the service. * @param {string} datasetDid The DID of the dataset asset (of type `dataset`) to run the algorithm on. - * @return {Promise} Returns The service agreement ID, representation of `bytes32` ID. + * @return {Promise} Returns the Service Agreement ID, representation of `bytes32` ID. */ - public async order(consumerAccount: Account, datasetDid: string): Promise { - const ddo: DDO = await this.ocean.assets.resolve(datasetDid) - const { index } = ddo.findServiceByType('compute') + public order( + consumerAccount: Account, + datasetDid: string + ): SubscribablePromise { + return new SubscribablePromise(async observer => { + const { keeper, assets, agreements } = this.ocean - const agreementId = await this.ocean.assets.order( - datasetDid, - index, - consumerAccount - ) + const agreementId = zeroX(generateId()) + const ddo: DDO = await assets.resolve(datasetDid) + const { index, attributes } = ddo.findServiceByType('compute') - return agreementId + const templateName = attributes.main.serviceAgreementTemplate.contractName + const template = keeper.getTemplateByName(templateName) + const computeCondition = keeper.conditions.computeExecutionCondition + + // eslint-disable-next-line no-async-promise-executor + const paymentFlow = new Promise(async (resolve, reject) => { + await template.getAgreementCreatedEvent(agreementId).once() + + this.logger.log('Agreement initialized') + observer.next(OrderProgressStep.AgreementInitialized) + + this.logger.log('Locking payment') + + const computeGranted = computeCondition + .getConditionFulfilledEvent(agreementId) + .once() + + observer.next(OrderProgressStep.LockingPayment) + const paid = await agreements.conditions.lockReward( + agreementId, + attributes.main.price, + consumerAccount + ) + observer.next(OrderProgressStep.LockedPayment) + + if (paid) { + this.logger.log('Payment was OK') + } else { + this.logger.error('Payment was KO') + this.logger.error('Agreement ID: ', agreementId) + this.logger.error('DID: ', ddo.id) + reject(new Error('Error on payment')) + } + + await computeGranted + + this.logger.log('Compute granted') + resolve() + }) + + observer.next(OrderProgressStep.CreatingAgreement) + this.logger.log('Creating agreement') + await agreements.create( + datasetDid, + agreementId, + index, + undefined, + consumerAccount, + consumerAccount + ) + this.logger.log('Agreement created') + + try { + await paymentFlow + } catch (e) { + throw new Error('Error paying the compute service.') + } + + return agreementId + }) } /**