All files / rpc-calls/src/channel MessageLogicalChannel.ts

91.66% Statements 11/12
100% Branches 0/0
75% Functions 3/4
91.66% Lines 11/12

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 311x         1x 30x       30x 30x   30x   40x 40x           30x       35x 35x      
import {Subject, type Observable} from 'rxjs';
import type {RpcCodec} from '@jsonjoy.com/rpc-codec';
import type {PhysicalChannel} from '@jsonjoy.com/channel';
import type {LogicalChannel} from './types';
 
export class MessageLogicalChannel<Message> implements LogicalChannel<Message[], Message[]> {
  public readonly msg$: Observable<Message[]> = new Subject<Message[]>();
  public readonly err$: Observable<unknown>;
 
  constructor(
    private channel: PhysicalChannel<Uint8Array>,
    private codec: RpcCodec<Message>,
  ) {
    const subscription = channel.message$.subscribe({
      next: (chunk) => {
        const messages = codec.decode(chunk);
        (this.msg$ as Subject<Message[]>).next(messages);
      },
      error: () => {
        subscription?.unsubscribe();
      },
    });
    this.err$ = channel.error$;
  }
 
  public async send(outgoing: Message[]): Promise<void> {
    const buf = this.codec.encode(outgoing);
    this.channel.send(buf);
  }
}