diff --git a/integration/ocean/ConsumeAssetBrizo.test.ts b/integration/ocean/ConsumeAssetBrizo.test.ts index 44d4197..6f50886 100644 --- a/integration/ocean/ConsumeAssetBrizo.test.ts +++ b/integration/ocean/ConsumeAssetBrizo.test.ts @@ -38,7 +38,7 @@ describe("Consume Asset (Brizo)", () => { const steps = [] agreementId = await ocean.assets.order(ddo.id, accessService.serviceDefinitionId, consumer) - .next(step => steps.push(step)) + .next((step) => steps.push(step)) assert.isDefined(agreementId) assert.deepEqual(steps, [0, 1, 2, 3]) diff --git a/src/ocean/OceanAssets.ts b/src/ocean/OceanAssets.ts index 753af1f..1b81b64 100644 --- a/src/ocean/OceanAssets.ts +++ b/src/ocean/OceanAssets.ts @@ -212,7 +212,7 @@ export class OceanAssets extends Instantiable { consumer: Account, ): SubscribablePromise { - return new SubscribablePromise(async observer => { + return new SubscribablePromise(async (observer) => { const oceanAgreements = this.ocean.agreements this.logger.log("Asking for agreement signature") diff --git a/src/utils/SubscribableObserver.ts b/src/utils/SubscribableObserver.ts new file mode 100644 index 0000000..3d96317 --- /dev/null +++ b/src/utils/SubscribableObserver.ts @@ -0,0 +1,42 @@ +export class SubscribableObserver { + public completed: boolean = false + private subscriptions = new Set<{onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void}>() + + public subscribe(onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void) { + if (this.completed) { + throw new Error("Observer completed.") + } + const subscription = {onNext, onComplete, onError} + this.subscriptions.add(subscription) + + return { + unsubscribe: () => this.subscriptions.delete(subscription), + } + } + + public next(next?: T): void { + this.emit("onNext", next) + } + + public complete(resolve?: P): void { + this.emit("onComplete", resolve) + this.unsubscribe() + } + + public error(error?: any): void { + this.emit("onError", error) + this.unsubscribe() + } + + private emit(type: "onNext" | "onComplete" | "onError", value: any) { + Array.from(this.subscriptions) + .map((subscription) => subscription[type]) + .filter((callback: any) => callback && typeof callback === "function") + .forEach((callback: any) => callback(value)) + } + + private unsubscribe() { + this.completed = true + this.subscriptions.clear() + } +} diff --git a/src/utils/SubscribablePromise.ts b/src/utils/SubscribablePromise.ts index bc0b240..15f3d4f 100644 --- a/src/utils/SubscribablePromise.ts +++ b/src/utils/SubscribablePromise.ts @@ -1,45 +1,4 @@ -export class SubscribableObserver { - public completed: boolean = false - private subscriptions = new Set<{onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void}>() - - subscribe(onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void) { - if (this.completed) { - throw new Error("Observer completed.") - } - const subscription = {onNext, onComplete, onError} - this.subscriptions.add(subscription) - - return { - unsubscribe: () => this.subscriptions.delete(subscription), - } - } - - next(next?: T): void { - this.emit('onNext', next) - } - - complete(resolve?: P): void { - this.emit('onComplete', resolve) - this.unsubscribe() - } - - error(error?: any): void { - this.emit('onError', error) - this.unsubscribe() - } - - private emit(type: 'onNext' | 'onComplete' | 'onError', value: any) { - Array.from(this.subscriptions) - .map(subscription => subscription[type]) - .filter(callback => callback && typeof callback === 'function') - .forEach(callback => callback(value)) - } - - private unsubscribe() { - this.completed = true - this.subscriptions.clear() - } -} +import { SubscribableObserver } from "./SubscribableObserver" export class SubscribablePromise { private observer = new SubscribableObserver() @@ -48,7 +7,7 @@ export class SubscribablePromise { setTimeout(() => { this.observer .subscribe(undefined, resolve, reject) - }) + }, 0) }), this, ) @@ -57,31 +16,31 @@ export class SubscribablePromise { const execution = executor(this.observer) Promise.resolve(execution as any) - .then(result => { - if (Promise.resolve(execution as any) == execution) { + .then((result) => { + if (Promise.resolve(execution as any) === execution) { this.observer.complete(result) } }) } - subscribe(onNext: (next: T) => void) { + public subscribe(onNext: (next: T) => void) { return this.observer.subscribe(onNext) } - next(onNext: (next: T) => void) { + public next(onNext: (next: T) => void) { this.observer.subscribe(onNext) return this } - then(onfulfilled?: (value: P) => any, onrejected?: (error: any) => any) { + public then(onfulfilled?: (value: P) => any, onrejected?: (error: any) => any) { return Object.assign(this.promise.then(onfulfilled, onrejected), this) } - catch(onrejected?: (error: any) => any) { + public catch(onrejected?: (error: any) => any) { return Object.assign(this.promise.catch(onrejected), this) } - finally(onfinally?: () => any) { + public finally(onfinally?: () => any) { return Object.assign(this.promise.finally(onfinally), this) } } diff --git a/src/utils/index.ts b/src/utils/index.ts index 52920bd..d8cf9bb 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -4,3 +4,4 @@ export * from "./ConversionTypeHelpers" export * from "./GeneratorHelpers" export * from "./DDOHelpers" export * from "./SubscribablePromise" +export * from "./SubscribableObserver" diff --git a/test/utils/SubscribableObserver.test.ts b/test/utils/SubscribableObserver.test.ts new file mode 100644 index 0000000..2681f05 --- /dev/null +++ b/test/utils/SubscribableObserver.test.ts @@ -0,0 +1,76 @@ +import { assert, expect, spy, use } from "chai" +import * as spies from "chai-spies" + +import { SubscribableObserver} from "../../src/utils/SubscribableObserver" + +use(spies) + +describe("SubscribableObserver", () => { + + describe("#subscribe()", () => { + + it("should be able to add a subcription", async () => { + const observer = new SubscribableObserver() + const subscription = observer.subscribe() + + assert.isDefined(subscription.unsubscribe) + assert.typeOf(subscription.unsubscribe, "function") + }) + + it("should be able to unsubscribe", async () => { + const observer = new SubscribableObserver() + const subscription = observer.subscribe() + + subscription.unsubscribe() + }) + }) + + describe("#next()", () => { + + it("should be able to emit next value", async () => { + const onNextSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(onNextSpy) + + observer.next("test") + expect(onNextSpy).to.has.been.called.with("test") + + observer.next("test") + expect(onNextSpy).to.has.been.called.exactly(2) + }) + }) + + describe("#complete()", () => { + + it("should be able to complete", async () => { + const onCompleteSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(undefined, onCompleteSpy) + + observer.complete("test") + expect(onCompleteSpy).to.has.been.called.with("test") + + observer.complete("test") + expect(onCompleteSpy).to.has.been.called.exactly(1) + + assert.isTrue(observer.completed) + }) + }) + + describe("#error()", () => { + + it("should be able to emit a error", async () => { + const onErrorSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(undefined, undefined, onErrorSpy) + + observer.error("test") + expect(onErrorSpy).to.has.been.called.with("test") + + observer.error("test") + expect(onErrorSpy).to.has.been.called.exactly(1) + + assert.isTrue(observer.completed) + }) + }) +}) diff --git a/test/utils/SubscribablePromise.test.ts b/test/utils/SubscribablePromise.test.ts index edfbfab..5948a2f 100644 --- a/test/utils/SubscribablePromise.test.ts +++ b/test/utils/SubscribablePromise.test.ts @@ -1,80 +1,10 @@ import { assert, expect, spy, use } from "chai" import * as spies from "chai-spies" -import { SubscribableObserver, SubscribablePromise } from "../../src/utils/SubscribablePromise" +import { SubscribablePromise } from "../../src/utils/SubscribablePromise" use(spies) -describe("SubscribableObserver", () => { - - describe("#subscribe()", () => { - - it("should be able to add a subcription", async () => { - const observer = new SubscribableObserver() - const subscription = observer.subscribe() - - assert.isDefined(subscription.unsubscribe) - assert.typeOf(subscription.unsubscribe, 'function') - }) - - it("should be able to unsubscribe", async () => { - const observer = new SubscribableObserver() - const subscription = observer.subscribe() - - subscription.unsubscribe() - }) - }) - - describe("#next()", () => { - - it("should be able to emit next value", async () => { - const onNextSpy = spy() - const observer = new SubscribableObserver() - observer.subscribe(onNextSpy) - - observer.next('test') - expect(onNextSpy).to.has.been.called.with('test') - - observer.next('test') - expect(onNextSpy).to.has.been.called.exactly(2) - }) - }) - - describe("#complete()", () => { - - it("should be able to complete", async () => { - const onCompleteSpy = spy() - const observer = new SubscribableObserver() - observer.subscribe(undefined, onCompleteSpy) - - observer.complete('test') - expect(onCompleteSpy).to.has.been.called.with('test') - - observer.complete('test') - expect(onCompleteSpy).to.has.been.called.exactly(1) - - assert.isTrue(observer.completed) - }) - }) - - describe("#error()", () => { - - it("should be able to emit a error", async () => { - const onErrorSpy = spy() - const observer = new SubscribableObserver() - observer.subscribe(undefined, undefined, onErrorSpy) - - observer.error('test') - expect(onErrorSpy).to.has.been.called.with('test') - - observer.error('test') - expect(onErrorSpy).to.has.been.called.exactly(1) - - assert.isTrue(observer.completed) - }) - }) -}) - describe("SubscribablePromise", () => { it("should work", async () => { @@ -91,19 +21,19 @@ describe("SubscribablePromise", () => { assert.isDefined(subscription) assert.isDefined(subscription.unsubscribe) - assert.typeOf(subscription.unsubscribe, 'function') + assert.typeOf(subscription.unsubscribe, "function") }) it("should listen the next values", (done) => { const onNextSpy = spy() - const subscribible = new SubscribablePromise(observer => { - setTimeout(() => observer.next('test'), 10) - setTimeout(() => observer.next('test'), 20) + const subscribible = new SubscribablePromise((observer) => { + setTimeout(() => observer.next("test"), 10) + setTimeout(() => observer.next("test"), 20) }) subscribible.subscribe(onNextSpy) setTimeout(() => { - expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.with("test") expect(onNextSpy).to.has.been.called.exactly(2) done() }, 100) @@ -115,9 +45,9 @@ describe("SubscribablePromise", () => { it("should resolve", (done) => { const onCompleteSpy = spy() const onFinallySpy = spy() - const subscribible = new SubscribablePromise(observer => { - setTimeout(() => observer.next('test'), 10) - setTimeout(() => observer.complete('test'), 20) + const subscribible = new SubscribablePromise((observer) => { + setTimeout(() => observer.next("test"), 10) + setTimeout(() => observer.complete("test"), 20) }) subscribible @@ -125,7 +55,7 @@ describe("SubscribablePromise", () => { .finally(onFinallySpy) setTimeout(() => { - expect(onCompleteSpy).to.has.been.called.with('test') + expect(onCompleteSpy).to.has.been.called.with("test") expect(onCompleteSpy).to.has.been.called.exactly(1) expect(onFinallySpy).to.has.been.called.exactly(1) done() @@ -138,9 +68,9 @@ describe("SubscribablePromise", () => { it("should catch the error", (done) => { const onErrorSpy = spy() const onFinallySpy = spy() - const subscribible = new SubscribablePromise(observer => { - setTimeout(() => observer.next('test'), 10) - setTimeout(() => observer.error('test'), 20) + const subscribible = new SubscribablePromise((observer) => { + setTimeout(() => observer.next("test"), 10) + setTimeout(() => observer.error("test"), 20) }) subscribible @@ -148,7 +78,7 @@ describe("SubscribablePromise", () => { .finally(onFinallySpy) setTimeout(() => { - expect(onErrorSpy).to.has.been.called.with('test') + expect(onErrorSpy).to.has.been.called.with("test") expect(onErrorSpy).to.has.been.called.exactly(1) expect(onFinallySpy).to.has.been.called.exactly(1) done() @@ -158,38 +88,38 @@ describe("SubscribablePromise", () => { it("should be able to subscribe and wait for a promise", async () => { const onNextSpy = spy() - const subscribible = new SubscribablePromise(observer => { - setTimeout(() => observer.next('test'), 10) - setTimeout(() => observer.next('test'), 20) - setTimeout(() => observer.complete('completed'), 30) + const subscribible = new SubscribablePromise((observer) => { + setTimeout(() => observer.next("test"), 10) + setTimeout(() => observer.next("test"), 20) + setTimeout(() => observer.complete("completed"), 30) }) const result = await subscribible .next(onNextSpy) - expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.with("test") expect(onNextSpy).to.has.been.called.exactly(2) - assert.equal(result, 'completed') + assert.equal(result, "completed") }) it("should use the result of a the promise as executor to complete the observer", async () => { const onNextSpy = spy() - const subscribible = new SubscribablePromise(async observer => { - await new Promise(resolve => setTimeout(resolve, 10)) - observer.next('test') - await new Promise(resolve => setTimeout(resolve, 10)) - observer.next('test') - await new Promise(resolve => setTimeout(resolve, 10)) - return 'completed' + const subscribible = new SubscribablePromise(async (observer) => { + await new Promise((resolve) => setTimeout(resolve, 10)) + observer.next("test") + await new Promise((resolve) => setTimeout(resolve, 10)) + observer.next("test") + await new Promise((resolve) => setTimeout(resolve, 10)) + return "completed" }) const result = await subscribible .next(onNextSpy) - expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.with("test") expect(onNextSpy).to.has.been.called.exactly(2) - assert.equal(result, 'completed') + assert.equal(result, "completed") }) }) diff --git a/tslint.json b/tslint.json index 0371ac2..23eb96a 100644 --- a/tslint.json +++ b/tslint.json @@ -12,6 +12,7 @@ true, "never" ], + "no-empty": [true, "allow-empty-catch", "allow-empty-functions"], "ordered-imports": false }, "rulesDirectory": []