All files / json-crdt/model/api fanout.ts

100% Statements 72/72
100% Branches 11/11
100% Functions 21/21
100% Lines 62/62

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129157x           157x 11887x   11887x 11887x       69x 23x 23x 7x 7x 21x 7x                 157x 23773x 23773x   23773x 23773x       24x 22x 51x 51x 37x 37x 37x     51x     24x 24x 6x 6x         6x 6x 6x 6x             157x   592x 592x   592x     592x     25x 13x 13x 6x 6x         587x 587x 587x               157x 592x     592x 592x   592x       13x 13x 26x     13x 13x 5x 5x         586x 586x 586x 586x      
import {FanOut, type FanOutUnsubscribe, type FanOutListener} from 'thingies/lib/fanout';
 
/**
 * Merges multiple fanouts into a single fanout. The merged fanout emits the
 * same data as the source fanouts.
 */
export class MergeFanOut<D> extends FanOut<D> {
  private unsubs: FanOutUnsubscribe[] = [];
 
  constructor(private readonly fanouts: FanOut<any>[]) {
    super();
  }
 
  public listen(listener: FanOutListener<D>): FanOutUnsubscribe {
    if (!this.listeners.size) this.unsubs = this.fanouts.map((fanout) => fanout.listen((data) => this.emit(data)));
    const unsub = super.listen(listener);
    return () => {
      unsub();
      if (!this.listeners.size) {
        for (const unsub of this.unsubs) unsub();
        this.unsubs = [];
      }
    };
  }
}
 
/**
 * Buffers data from a fanout and emits the buffered data once per microtask.
 */
export class MicrotaskBufferFanOut<I> extends FanOut<I[]> {
  private buffer: I[] = [];
  private unsub?: FanOutUnsubscribe = undefined;
 
  constructor(private readonly source: FanOut<I>) {
    super();
  }
 
  public listen(listener: FanOutListener<I[]>): FanOutUnsubscribe {
    if (!this.unsub) {
      this.unsub = this.source.listen((data) => {
        const buffer = this.buffer;
        if (!buffer.length) {
          queueMicrotask(() => {
            this.emit(buffer);
            this.buffer = [];
          });
        }
        buffer.push(data);
      });
    }
    const unsub = super.listen(listener);
    return () => {
      unsub();
      if (!this.listeners.size) this.clear();
    };
  }
 
  public clear() {
    this.listeners.clear();
    this.buffer = [];
    this.unsub?.();
    this.unsub = undefined;
  }
}
 
/**
 * Maps the data from a fanout using a mapper function.
 */
export class MapFanOut<I, O> extends FanOut<O> {
  constructor(
    private readonly source: FanOut<I>,
    private readonly mapper: (data: I) => O,
  ) {
    super();
  }
 
  private unsub?: FanOutUnsubscribe = undefined;
 
  public listen(listener: FanOutListener<O>): FanOutUnsubscribe {
    if (!this.unsub) this.unsub = this.source.listen((data) => this.emit(this.mapper(data)));
    const unsub = super.listen(listener);
    return () => {
      unsub();
      if (!this.listeners.size) this.clear();
    };
  }
 
  public clear() {
    this.listeners.clear();
    this.unsub?.();
    this.unsub = undefined;
  }
}
 
/**
 * Emits only when the source fanout emits a new value. The first value is
 * emitted immediately.
 */
export class OnNewFanOut<D> extends FanOut<D> {
  private unsub?: FanOutUnsubscribe = undefined;
 
  constructor(
    private readonly source: FanOut<D>,
    private last: D | undefined = undefined,
  ) {
    super();
  }
 
  public listen(listener: FanOutListener<D>): FanOutUnsubscribe {
    if (!this.unsub) {
      this.unsub = this.source.listen((data) => {
        if (this.last !== data) this.emit((this.last = data));
      });
    }
    const unsub = super.listen(listener);
    return () => {
      unsub();
      if (!this.listeners.size) this.clear();
    };
  }
 
  public clear() {
    this.listeners.clear();
    this.last = undefined;
    this.unsub?.();
    this.unsub = undefined;
  }
}