All files / rpc-calls/src/caller/util subscribeCompleteObserver.ts

92.3% Statements 24/26
83.33% Branches 10/12
100% Functions 7/7
95.45% Lines 21/22

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56  5x                                 5x 89x 89x 89x 89x   102x 102x 102x 102x 70x 70x   32x         9x   1x 1x       78x 78x 78x   70x 70x              
import type {Observable} from 'rxjs';
import {microtask} from './microtask';
 
export interface CompleteObserver<T> {
  next: (value: T) => void;
  error: (error: unknown) => void;
  complete: (value: undefined | T, hasValue: boolean) => void;
}
 
/**
 * Subscribes `CompleteObserver` to observable. `CompleteObserver` attempts to
 * receive the last emitted value in `.complete(value)` callback, instead of
 * calling `.next(value)` followed by `.complete()`.
 *
 * @param observable Observable to which to subscribe.
 * @param observer Observer which to subscribe to observable.
 * @returns Subscription
 */
export function subscribeCompleteObserver<T>(observable: Observable<T>, observer: CompleteObserver<T>) {
  let completed = false;
  let completeCalled = false;
  let tasks = 0;
  return observable.subscribe({
    next: (value: T) => {
      tasks++;
      microtask(() => {
        tasks--;
        if (completed && !tasks) {
          completeCalled = true;
          observer.complete(value, true);
        } else {
          observer.next(value);
        }
      });
    },
    error: (error: unknown) => {
      if (!tasks) observer.error(error);
      else
        microtask(() => {
          observer.error(error);
        });
    },
    complete: () => {
      completed = true;
      Iif (completeCalled) return;
      if (!tasks) observer.complete(undefined, false);
      else {
        microtask(() => {
          Eif (completeCalled) return;
          observer.complete(undefined, false);
        });
      }
    },
  });
}