Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | 5x 5x 89x 89x 89x 89x 102x 102x 102x 102x 70x 70x 32x 9x 1x 1x 78x 78x 78x 70x 70x | import type {Observable} from 'rxjs';
import {microtask} from './microtask';
export interface CompleteObserver<T> {
next: (value: T) => void;
error: (error: unknown) => void;
complete: (value: undefined | T, hasValue: boolean) => void;
}
/**
* Subscribes `CompleteObserver` to observable. `CompleteObserver` attempts to
* receive the last emitted value in `.complete(value)` callback, instead of
* calling `.next(value)` followed by `.complete()`.
*
* @param observable Observable to which to subscribe.
* @param observer Observer which to subscribe to observable.
* @returns Subscription
*/
export function subscribeCompleteObserver<T>(observable: Observable<T>, observer: CompleteObserver<T>) {
let completed = false;
let completeCalled = false;
let tasks = 0;
return observable.subscribe({
next: (value: T) => {
tasks++;
microtask(() => {
tasks--;
if (completed && !tasks) {
completeCalled = true;
observer.complete(value, true);
} else {
observer.next(value);
}
});
},
error: (error: unknown) => {
if (!tasks) observer.error(error);
else
microtask(() => {
observer.error(error);
});
},
complete: () => {
completed = true;
Iif (completeCalled) return;
if (!tasks) observer.complete(undefined, false);
else {
microtask(() => {
Eif (completeCalled) return;
observer.complete(undefined, false);
});
}
},
});
}
|