diff --git a/src/utils/SubscribablePromise.ts b/src/utils/SubscribablePromise.ts new file mode 100644 index 0000000..bc0b240 --- /dev/null +++ b/src/utils/SubscribablePromise.ts @@ -0,0 +1,87 @@ +export class SubscribableObserver { + public completed: boolean = false + private subscriptions = new Set<{onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void}>() + + subscribe(onNext?: (next: T) => void, onComplete?: (complete: P) => void, onError?: (error: any) => void) { + if (this.completed) { + throw new Error("Observer completed.") + } + const subscription = {onNext, onComplete, onError} + this.subscriptions.add(subscription) + + return { + unsubscribe: () => this.subscriptions.delete(subscription), + } + } + + next(next?: T): void { + this.emit('onNext', next) + } + + complete(resolve?: P): void { + this.emit('onComplete', resolve) + this.unsubscribe() + } + + error(error?: any): void { + this.emit('onError', error) + this.unsubscribe() + } + + private emit(type: 'onNext' | 'onComplete' | 'onError', value: any) { + Array.from(this.subscriptions) + .map(subscription => subscription[type]) + .filter(callback => callback && typeof callback === 'function') + .forEach(callback => callback(value)) + } + + private unsubscribe() { + this.completed = true + this.subscriptions.clear() + } +} + +export class SubscribablePromise { + private observer = new SubscribableObserver() + private promise = Object.assign( + new Promise

((resolve, reject) => { + setTimeout(() => { + this.observer + .subscribe(undefined, resolve, reject) + }) + }), + this, + ) + + constructor(executor: (observer: SubscribableObserver) => void | Promise

) { + const execution = executor(this.observer) + + Promise.resolve(execution as any) + .then(result => { + if (Promise.resolve(execution as any) == execution) { + this.observer.complete(result) + } + }) + } + + subscribe(onNext: (next: T) => void) { + return this.observer.subscribe(onNext) + } + + next(onNext: (next: T) => void) { + this.observer.subscribe(onNext) + return this + } + + then(onfulfilled?: (value: P) => any, onrejected?: (error: any) => any) { + return Object.assign(this.promise.then(onfulfilled, onrejected), this) + } + + catch(onrejected?: (error: any) => any) { + return Object.assign(this.promise.catch(onrejected), this) + } + + finally(onfinally?: () => any) { + return Object.assign(this.promise.finally(onfinally), this) + } +} diff --git a/src/utils/index.ts b/src/utils/index.ts index 81fe379..52920bd 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -3,3 +3,4 @@ export * from "./Logger" export * from "./ConversionTypeHelpers" export * from "./GeneratorHelpers" export * from "./DDOHelpers" +export * from "./SubscribablePromise" diff --git a/test/utils/SubscribablePromise.test.ts b/test/utils/SubscribablePromise.test.ts new file mode 100644 index 0000000..edfbfab --- /dev/null +++ b/test/utils/SubscribablePromise.test.ts @@ -0,0 +1,195 @@ +import { assert, expect, spy, use } from "chai" +import * as spies from "chai-spies" + +import { SubscribableObserver, SubscribablePromise } from "../../src/utils/SubscribablePromise" + +use(spies) + +describe("SubscribableObserver", () => { + + describe("#subscribe()", () => { + + it("should be able to add a subcription", async () => { + const observer = new SubscribableObserver() + const subscription = observer.subscribe() + + assert.isDefined(subscription.unsubscribe) + assert.typeOf(subscription.unsubscribe, 'function') + }) + + it("should be able to unsubscribe", async () => { + const observer = new SubscribableObserver() + const subscription = observer.subscribe() + + subscription.unsubscribe() + }) + }) + + describe("#next()", () => { + + it("should be able to emit next value", async () => { + const onNextSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(onNextSpy) + + observer.next('test') + expect(onNextSpy).to.has.been.called.with('test') + + observer.next('test') + expect(onNextSpy).to.has.been.called.exactly(2) + }) + }) + + describe("#complete()", () => { + + it("should be able to complete", async () => { + const onCompleteSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(undefined, onCompleteSpy) + + observer.complete('test') + expect(onCompleteSpy).to.has.been.called.with('test') + + observer.complete('test') + expect(onCompleteSpy).to.has.been.called.exactly(1) + + assert.isTrue(observer.completed) + }) + }) + + describe("#error()", () => { + + it("should be able to emit a error", async () => { + const onErrorSpy = spy() + const observer = new SubscribableObserver() + observer.subscribe(undefined, undefined, onErrorSpy) + + observer.error('test') + expect(onErrorSpy).to.has.been.called.with('test') + + observer.error('test') + expect(onErrorSpy).to.has.been.called.exactly(1) + + assert.isTrue(observer.completed) + }) + }) +}) + +describe("SubscribablePromise", () => { + + it("should work", async () => { + const subscribible = new SubscribablePromise(() => {}) + + assert.isDefined(subscribible) + }) + + describe("#subscribe()", () => { + + it("should return a subscription", async () => { + const subscribible = new SubscribablePromise(() => {}) + const subscription = subscribible.subscribe(() => {}) + + assert.isDefined(subscription) + assert.isDefined(subscription.unsubscribe) + assert.typeOf(subscription.unsubscribe, 'function') + }) + + it("should listen the next values", (done) => { + const onNextSpy = spy() + const subscribible = new SubscribablePromise(observer => { + setTimeout(() => observer.next('test'), 10) + setTimeout(() => observer.next('test'), 20) + }) + subscribible.subscribe(onNextSpy) + + setTimeout(() => { + expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.exactly(2) + done() + }, 100) + }) + }) + + describe("#then()", () => { + + it("should resolve", (done) => { + const onCompleteSpy = spy() + const onFinallySpy = spy() + const subscribible = new SubscribablePromise(observer => { + setTimeout(() => observer.next('test'), 10) + setTimeout(() => observer.complete('test'), 20) + }) + + subscribible + .then(onCompleteSpy) + .finally(onFinallySpy) + + setTimeout(() => { + expect(onCompleteSpy).to.has.been.called.with('test') + expect(onCompleteSpy).to.has.been.called.exactly(1) + expect(onFinallySpy).to.has.been.called.exactly(1) + done() + }, 100) + }) + }) + + describe("#error()", () => { + + it("should catch the error", (done) => { + const onErrorSpy = spy() + const onFinallySpy = spy() + const subscribible = new SubscribablePromise(observer => { + setTimeout(() => observer.next('test'), 10) + setTimeout(() => observer.error('test'), 20) + }) + + subscribible + .catch(onErrorSpy) + .finally(onFinallySpy) + + setTimeout(() => { + expect(onErrorSpy).to.has.been.called.with('test') + expect(onErrorSpy).to.has.been.called.exactly(1) + expect(onFinallySpy).to.has.been.called.exactly(1) + done() + }, 100) + }) + }) + + it("should be able to subscribe and wait for a promise", async () => { + const onNextSpy = spy() + const subscribible = new SubscribablePromise(observer => { + setTimeout(() => observer.next('test'), 10) + setTimeout(() => observer.next('test'), 20) + setTimeout(() => observer.complete('completed'), 30) + }) + + const result = await subscribible + .next(onNextSpy) + + expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.exactly(2) + + assert.equal(result, 'completed') + }) + + it("should use the result of a the promise as executor to complete the observer", async () => { + const onNextSpy = spy() + const subscribible = new SubscribablePromise(async observer => { + await new Promise(resolve => setTimeout(resolve, 10)) + observer.next('test') + await new Promise(resolve => setTimeout(resolve, 10)) + observer.next('test') + await new Promise(resolve => setTimeout(resolve, 10)) + return 'completed' + }) + + const result = await subscribible + .next(onNextSpy) + + expect(onNextSpy).to.has.been.called.with('test') + expect(onNextSpy).to.has.been.called.exactly(2) + + assert.equal(result, 'completed') + }) +})