Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | 1x 1x 30x 30x 30x 30x 40x 40x 30x 35x 35x | import {Subject, type Observable} from 'rxjs';
import type {RpcCodec} from '@jsonjoy.com/rpc-codec';
import type {PhysicalChannel} from '@jsonjoy.com/channel';
import type {LogicalChannel} from './types';
export class MessageLogicalChannel<Message> implements LogicalChannel<Message[], Message[]> {
public readonly msg$: Observable<Message[]> = new Subject<Message[]>();
public readonly err$: Observable<unknown>;
constructor(
private channel: PhysicalChannel<Uint8Array>,
private codec: RpcCodec<Message>,
) {
const subscription = channel.message$.subscribe({
next: (chunk) => {
const messages = codec.decode(chunk);
(this.msg$ as Subject<Message[]>).next(messages);
},
error: () => {
subscription?.unsubscribe();
},
});
this.err$ = channel.error$;
}
public async send(outgoing: Message[]): Promise<void> {
const buf = this.codec.encode(outgoing);
this.channel.send(buf);
}
}
|