All files / rpc-calls/src/channel BufferedLogicalChannelBase.ts

90% Statements 18/20
100% Branches 2/2
50% Functions 4/8
93.75% Lines 15/16

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 552x                                                   2x 42x 42x         42x 42x 42x 42x 42x 42x 42x 51x         102x 102x 102x              
import {TimedQueue} from 'thingies';
import type {LogicalChannelBase} from './types';
 
export interface BufferedLogicalChannelBaseOptions<Incoming, Outgoing> {
  channel: LogicalChannelBase<Incoming[], Outgoing[]>;
 
  /**
   * Number of messages to keep in buffer before sending them to the server.
   * The buffer is flushed when the message reaches this limit or when the
   * buffering time has reached the time specified in `bufferTime` parameter.
   * Defaults to 100 messages.
   */
  bufferSize?: number;
 
  /**
   * Time in milliseconds for how long to buffer messages before sending them
   * to the server. Defaults to 10 milliseconds.
   */
  bufferTime?: number;
}
 
/**
 * Adds buffering capabilities to a {@link LogicalChannelBase}, where messages are
 * sent every {@link BufferedLogicalChannelBaseOptions.bufferTime} milliseconds or when the buffer
 * reaches {@link BufferedLogicalChannelBaseOptions.bufferSize} messages, whichever comes first.
 */
export class BufferedLogicalChannelBase<Incoming, Outgoing> implements LogicalChannelBase<Incoming[], Outgoing[]> {
  public onmsg: (msg: Incoming[]) => void = () => {};
  public onerr: (err: unknown) => void = () => {};
  public readonly buffer: TimedQueue<Outgoing>;
  private readonly channel: LogicalChannelBase<Incoming[], Outgoing[]>;
 
  constructor({channel, bufferSize = 100, bufferTime = 5}: BufferedLogicalChannelBaseOptions<Incoming, Outgoing>) {
    this.channel = channel;
    channel.onmsg = (msg: Incoming[]) => this.onmsg(msg);
    channel.onerr = (err: unknown) => this.onerr(err);
    this.buffer = new TimedQueue();
    this.buffer.itemLimit = bufferSize;
    this.buffer.timeLimit = bufferTime;
    this.buffer.onFlush = (list: Outgoing[]) => {
      channel.send(list);
    };
  }
 
  public send(outgoing: Outgoing[]): void {
    const buffer = this.buffer;
    const length = outgoing.length;
    for (let i = 0; i < length; i++) buffer.push(outgoing[i]);
  }
 
  public close(code?: number, reason?: string): void {
    this.channel.close(code, reason);
  }
}