1
0
mirror of https://github.com/oceanprotocol-archive/squid-js.git synced 2024-02-02 15:31:51 +01:00
squid-js/src/utils/SubscribableObserver.ts

54 lines
1.4 KiB
TypeScript
Raw Normal View History

2019-04-24 13:25:37 +02:00
export class SubscribableObserver<T, P> {
public completed: boolean = false
2019-08-19 13:21:19 +02:00
2019-06-20 00:20:09 +02:00
private subscriptions = new Set<{
onNext?: (next: T) => void
onComplete?: (complete: P) => void
onError?: (error: any) => void
}>()
2019-04-24 13:25:37 +02:00
2019-06-20 00:20:09 +02:00
public subscribe(
onNext?: (next: T) => void,
onComplete?: (complete: P) => void,
onError?: (error: any) => void
) {
2019-04-24 13:25:37 +02:00
if (this.completed) {
2019-06-20 00:20:09 +02:00
throw new Error('Observer completed.')
2019-04-24 13:25:37 +02:00
}
2019-06-20 00:20:09 +02:00
const subscription = { onNext, onComplete, onError }
2019-04-24 13:25:37 +02:00
this.subscriptions.add(subscription)
return {
2019-06-20 00:20:09 +02:00
unsubscribe: () => this.subscriptions.delete(subscription)
2019-04-24 13:25:37 +02:00
}
}
public next(next?: T): void {
2019-06-20 00:20:09 +02:00
this.emit('onNext', next)
2019-04-24 13:25:37 +02:00
}
public complete(resolve?: P): void {
2019-06-20 00:20:09 +02:00
this.emit('onComplete', resolve)
2019-04-24 13:25:37 +02:00
this.unsubscribe()
}
public error(error?: any): void {
2019-06-20 00:20:09 +02:00
this.emit('onError', error)
2019-04-24 13:25:37 +02:00
this.unsubscribe()
}
2019-06-20 00:20:09 +02:00
private emit(type: 'onNext' | 'onComplete' | 'onError', value: any) {
2019-04-24 13:25:37 +02:00
Array.from(this.subscriptions)
2019-06-20 00:20:09 +02:00
.map(subscription => subscription[type])
.filter(
(callback: any) => callback && typeof callback === 'function'
)
2019-04-24 13:25:37 +02:00
.forEach((callback: any) => callback(value))
}
private unsubscribe() {
this.completed = true
this.subscriptions.clear()
}
}