All files / rpc-calls/src/channel BufferedLogicalChannel.ts

94.11% Statements 16/17
100% Branches 4/4
75% Functions 3/4
93.33% Lines 14/15

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 542x                                                     2x           32x 32x 32x 32x 32x 32x 37x 37x 35x             96x 96x 96x      
import {TimedQueue} from 'thingies';
import type {Observable} from 'rxjs/internal/Observable';
import type {LogicalChannel} from './types';
 
export interface BufferedLogicalChannelOpts<Incoming, Outgoing> {
  channel: LogicalChannel<Incoming[], Outgoing[]>;
 
  /**
   * Number of messages to keep in buffer before sending them to the server.
   * The buffer is flushed when the message reaches this limit or when the
   * buffering time has reached the time specified in `bufferTime` parameter.
   * Defaults to 100 messages.
   */
  bufferSize?: number;
 
  /**
   * Time in milliseconds for how long to buffer messages before sending them
   * to the server. Defaults to 10 milliseconds.
   */
  bufferTime?: number;
}
 
/**
 * Adds buffering capabilities to a {@link LogicalChannel}, where messages are
 * sent every {@link BufferedLogicalChannelOpts.bufferTime} milliseconds or when the buffer
 * reaches {@link BufferedLogicalChannelOpts.bufferSize} messages, whichever comes first.
 */
export class BufferedLogicalChannel<Incoming, Outgoing> implements LogicalChannel<Incoming[], Outgoing[]> {
  public readonly msg$: Observable<Incoming[]>;
  public readonly err$: Observable<unknown>;
  public readonly buffer: TimedQueue<Outgoing>;
 
  constructor({channel, bufferSize = 100, bufferTime = 5}: BufferedLogicalChannelOpts<Incoming, Outgoing>) {
    this.msg$ = channel.msg$;
    this.err$ = channel.err$;
    this.buffer = new TimedQueue();
    this.buffer.itemLimit = bufferSize;
    this.buffer.timeLimit = bufferTime;
    this.buffer.onFlush = (list: Outgoing[]) => {
      const promise = channel.send(list);
      if (promise instanceof Promise)
        promise.catch((error) => {
          console.error(error);
        });
    };
  }
 
  public async send(outgoing: Outgoing[]): Promise<void> {
    const buffer = this.buffer;
    const length = outgoing.length;
    for (let i = 0; i < length; i++) buffer.push(outgoing[i]);
  }
}