Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | 2x 2x 32x 32x 32x 32x 32x 32x 37x 37x 35x 96x 96x 96x | import {TimedQueue} from 'thingies';
import type {Observable} from 'rxjs/internal/Observable';
import type {LogicalChannel} from './types';
export interface BufferedLogicalChannelOpts<Incoming, Outgoing> {
channel: LogicalChannel<Incoming[], Outgoing[]>;
/**
* Number of messages to keep in buffer before sending them to the server.
* The buffer is flushed when the message reaches this limit or when the
* buffering time has reached the time specified in `bufferTime` parameter.
* Defaults to 100 messages.
*/
bufferSize?: number;
/**
* Time in milliseconds for how long to buffer messages before sending them
* to the server. Defaults to 10 milliseconds.
*/
bufferTime?: number;
}
/**
* Adds buffering capabilities to a {@link LogicalChannel}, where messages are
* sent every {@link BufferedLogicalChannelOpts.bufferTime} milliseconds or when the buffer
* reaches {@link BufferedLogicalChannelOpts.bufferSize} messages, whichever comes first.
*/
export class BufferedLogicalChannel<Incoming, Outgoing> implements LogicalChannel<Incoming[], Outgoing[]> {
public readonly msg$: Observable<Incoming[]>;
public readonly err$: Observable<unknown>;
public readonly buffer: TimedQueue<Outgoing>;
constructor({channel, bufferSize = 100, bufferTime = 5}: BufferedLogicalChannelOpts<Incoming, Outgoing>) {
this.msg$ = channel.msg$;
this.err$ = channel.err$;
this.buffer = new TimedQueue();
this.buffer.itemLimit = bufferSize;
this.buffer.timeLimit = bufferTime;
this.buffer.onFlush = (list: Outgoing[]) => {
const promise = channel.send(list);
if (promise instanceof Promise)
promise.catch((error) => {
console.error(error);
});
};
}
public async send(outgoing: Outgoing[]): Promise<void> {
const buffer = this.buffer;
const length = outgoing.length;
for (let i = 0; i < length; i++) buffer.push(outgoing[i]);
}
}
|