All files / rpc-calls/src/channel RxLogicalChannelBase.ts

70.58% Statements 12/17
100% Branches 0/0
42.85% Functions 3/7
70.58% Lines 12/17

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42                2x 42x 42x       42x 42x   42x 35x 35x 35x             42x           51x 51x              
import type * as msg from '@jsonjoy.com/rpc-messages';
import type {PhysicalChannelBase} from '@jsonjoy.com/channel';
import type {RpcCodec} from '@jsonjoy.com/rpc-codec';
import type {LogicalChannelBase} from './types';
 
type Incoming = msg.RxMessage;
type Outgoing = msg.RxMessage;
 
export class RxLogicalChannelBase implements LogicalChannelBase<Incoming[], Outgoing[]> {
  public onmsg: (msg: Incoming[]) => void = () => {};
  public onerr: (err: unknown) => void = () => {};
  public onclose?: (code: number, reason: string, wasClean: boolean) => void;
 
  constructor(
    private readonly channel: PhysicalChannelBase<Uint8Array>,
    private readonly codec: RpcCodec<msg.RxMessage>,
  ) {
    channel.onmessage = (data: Uint8Array) => {
      try {
        const messages = codec.decode(data) as Incoming[];
        this.onmsg(messages);
      } catch (error) {
        channel.close();
        this.onerr(error);
        return;
      }
    };
    channel.onclose = (code: number, reason: string, wasClean: boolean) => {
      this.onclose?.(code, reason, wasClean);
    };
  }
 
  public send(messages: Outgoing[]): void {
    const encoded = this.codec.encode(messages);
    this.channel.send(encoded);
  }
 
  public close(code?: number, reason?: string): void {
    this.channel.close(code, reason);
  }
}