All files / rpc-calls/src/callee BufferSubject.ts

100% Statements 23/23
100% Branches 6/6
100% Functions 4/4
100% Lines 22/22

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 4210x 10x   10x 126x 126x   126x 126x         288x   288x 288x 288x 111x     288x 288x       148x 142x 1x 1x   141x   147x       105x 105x      
import {Subject, type Subscription, type Subscriber} from 'rxjs';
import {RpcError, RpcErrorCodes} from '@jsonjoy.com/rpc-error';
 
export class BufferSubject<T> extends Subject<T> {
  private buffer: T[] = [];
  private isBuffering = true;
 
  constructor(public readonly bufferSize: number) {
    super();
  }
 
  protected _subscribe(subscriber: Subscriber<T>): Subscription {
    // @ts-expect-error
    this._throwIfClosed();
    // @ts-expect-error
    const subscription = this._innerSubscribe(subscriber);
    const {buffer} = this;
    for (let i = 0; i < buffer.length && !subscriber.closed; i += 1) {
      subscriber.next(buffer[i] as T);
    }
    // @ts-expect-error
    this._checkFinalizedStatuses(subscriber);
    return subscription;
  }
 
  public next(value: T): void {
    if (this.isBuffering) {
      if (this.buffer.length >= this.bufferSize) {
        this.error(RpcError.fromErrno(RpcErrorCodes.OVERFLOW));
        return;
      }
      this.buffer.push(value);
    }
    super.next(value);
  }
 
  public flush(): void {
    this.isBuffering = false;
    this.buffer = [];
  }
}