From c8411e29d2ec61e0bfc08eadd19d485d6c06a314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Guti=C3=A9rrez?= Date: Thu, 25 Apr 2019 14:15:32 +0200 Subject: [PATCH] Add more events on order execution. --- integration/ocean/ConsumeAssetBrizo.test.ts | 2 +- src/ocean/OceanAssets.ts | 8 ++++-- src/utils/SubscribablePromise.ts | 32 ++++++++++++--------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/integration/ocean/ConsumeAssetBrizo.test.ts b/integration/ocean/ConsumeAssetBrizo.test.ts index 6f50886..83ff9b9 100644 --- a/integration/ocean/ConsumeAssetBrizo.test.ts +++ b/integration/ocean/ConsumeAssetBrizo.test.ts @@ -41,7 +41,7 @@ describe("Consume Asset (Brizo)", () => { .next((step) => steps.push(step)) assert.isDefined(agreementId) - assert.deepEqual(steps, [0, 1, 2, 3]) + assert.deepEqual(steps, [0, 1, 2, 3, 4, 5]) }) it("should consume and store the assets", async () => { diff --git a/src/ocean/OceanAssets.ts b/src/ocean/OceanAssets.ts index 1b81b64..67c4884 100644 --- a/src/ocean/OceanAssets.ts +++ b/src/ocean/OceanAssets.ts @@ -8,9 +8,11 @@ import { fillConditionsWithDDO, noZeroX, SubscribablePromise } from "../utils" import { Instantiable, InstantiableConfig } from "../Instantiable.abstract" export enum OrderProgressStep { + Preparing, Prepared, - AgreementSent, + SendingAgreement, AgreementInitialized, + LockingPayment, LockedPayment, } @@ -215,6 +217,7 @@ export class OceanAssets extends Instantiable { return new SubscribablePromise(async (observer) => { const oceanAgreements = this.ocean.agreements + observer.next(OrderProgressStep.Preparing) this.logger.log("Asking for agreement signature") const {agreementId, signature} = await oceanAgreements.prepare(did, serviceDefinitionId, consumer) this.logger.log(`Agreement ${agreementId} signed`) @@ -239,6 +242,7 @@ export class OceanAssets extends Instantiable { const accessGranted = accessCondition.getConditionFulfilledEvent(agreementId).once() + observer.next(OrderProgressStep.LockingPayment) const paid = await oceanAgreements.conditions.lockReward(agreementId, metadata.base.price, consumer) observer.next(OrderProgressStep.LockedPayment) @@ -257,7 +261,7 @@ export class OceanAssets extends Instantiable { resolve() }) - observer.next(OrderProgressStep.AgreementSent) + observer.next(OrderProgressStep.SendingAgreement) this.logger.log("Sending agreement request") await oceanAgreements.send(did, agreementId, serviceDefinitionId, signature, consumer) this.logger.log("Agreement request sent") diff --git a/src/utils/SubscribablePromise.ts b/src/utils/SubscribablePromise.ts index 8796d55..4f85347 100644 --- a/src/utils/SubscribablePromise.ts +++ b/src/utils/SubscribablePromise.ts @@ -13,20 +13,8 @@ export class SubscribablePromise { ) constructor(executor: (observer: SubscribableObserver) => void | Promise

) { - const execution = executor(this.observer) - - Promise.resolve(execution as any) - .then((result) => { - if (Promise.resolve(execution as any) === execution) { - this.observer.complete(result) - } - }) - .catch((result) => { - if (Promise.resolve(execution as any) === execution) { - this.observer.error(result) - } - }) - + // Defear + setTimeout(() => this.init(executor), 1) } public subscribe(onNext: (next: T) => void) { @@ -49,4 +37,20 @@ export class SubscribablePromise { public finally(onfinally?: () => any) { return Object.assign(this.promise.finally(onfinally), this) } + + private init(executor: (observer: SubscribableObserver) => void | Promise

) { + const execution = executor(this.observer) + + Promise.resolve(execution as any) + .then((result) => { + if (Promise.resolve(execution as any) === execution) { + this.observer.complete(result) + } + }) + .catch((result) => { + if (Promise.resolve(execution as any) === execution) { + this.observer.error(result) + } + }) + } }