All files / rpc-calls/src/caller RxPersistentCaller.ts

86.48% Statements 32/37
41.66% Branches 5/12
83.33% Functions 10/12
93.54% Lines 29/31

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101  1x 1x 1x 1x   1x     1x                                                             1x     30x     30x 30x 30x 120x 30x 30x 30x 30x 30x 30x       30x               30x 30x 30x               87x 87x           29x       2x       30x       30x 30x      
import type * as msg from '@jsonjoy.com/rpc-messages';
import {firstValueFrom, type Observable, ReplaySubject, timer} from 'rxjs';
import {filter, share, switchMap, takeUntil} from 'rxjs/operators';
import {RxLogicalChannelCaller} from './RxLogicalChannelCaller';
import {PersistentPhysicalChannel, type PersistentPhysicalChannelOptions} from '@jsonjoy.com/channel';
import {RxBatchCodecLogicalChannel} from '../channel/RxBatchCodecLogicalChannel';
import {MessageLogicalChannel} from '../channel/MessageLogicalChannel';
import type {Caller, CallerMethods} from './types';
import type {RpcCodec} from '@jsonjoy.com/rpc-codec';
import {BufferedLogicalChannel} from '../channel';
 
export interface RxPersistentCallerOptions {
  physical: PersistentPhysicalChannelOptions<Uint8Array>;
  codec: RpcCodec<msg.RxMessage>;
 
  /**
   * Number of milliseconds to periodically send keep-alive ".ping" notification
   * messages. If not specified, will default to 15,000 (15 seconds). If 0, will
   * not send ping messages.
   */
  ping?: number;
 
  /**
   * The notification method name that is used for ping keep-alive messages, if
   * not specified, defaults to ".ping".
   */
  pingMethod?: string;
}
 
/**
 * Persistent Reactive (Rx) JSON RPC client, with the following features:
 *
 * - Automatically reconnects if disconnected.
 * - Sends periodic keep-alive ".ping" notifications to keep the connection alive.
 *
 * Uses a {@link PersistentPhysicalChannel} to maintain a physical connection. On
 * each new connection a {@link RxBatchCodecLogicalChannel} is constructed from the
 * physical channel and the provided codec, then an {@link RxCaller} is created
 * on top of it.
 */
export class RxPersistentCaller<Methods extends CallerMethods<any> = CallerMethods> implements Caller<Methods> {
  public channel: PersistentPhysicalChannel<Uint8Array>;
  public rpc?: RxLogicalChannelCaller<Methods>;
  public readonly rpc$ = new ReplaySubject<RxLogicalChannelCaller<Methods>>(1);
 
  constructor(params: RxPersistentCallerOptions) {
    const ping = params.ping ?? 15000;
    this.channel = new PersistentPhysicalChannel(params.physical);
    const codec = params.codec;
    this.channel.open$.pipe(filter((open) => open)).subscribe(() => {
      const close$ = this.channel.open$.pipe(filter((open) => !open));
      const physicalChannel = this.channel.channel$.value;
      Iif (!physicalChannel) return;
      const channel = new MessageLogicalChannel<msg.RxMessage>(physicalChannel, codec);
      const channelBuffered = new BufferedLogicalChannel({channel});
      const caller = new RxLogicalChannelCaller<Methods>({
        channel: channelBuffered as any,
      });
      // Send ping notifications to keep the connection alive.
      Iif (ping) {
        timer(ping, ping)
          .pipe(takeUntil(close$))
          .subscribe(() => {
            caller.notify(params.pingMethod || '.ping', undefined as any);
          });
      }
 
      Iif (this.rpc) this.rpc.disconnect();
      this.rpc = caller;
      this.rpc$.next(caller);
    });
  }
 
  public call$<K extends keyof Methods>(
    method: K,
    data: Observable<Methods[K][0]> | Methods[K][0],
  ): Observable<Methods[K][1]> {
    return this.rpc$.pipe(
      switchMap((rpc) => rpc.call$(method, data as any)),
      share(),
    );
  }
 
  public async call<K extends keyof Methods>(method: K, request: Methods[K][0]): Promise<Methods[K][1]> {
    return firstValueFrom(this.call$(method, request));
  }
 
  public notify<K extends keyof Methods>(method: K, data: Methods[K][0]): void {
    this.rpc$.subscribe((rpc) => rpc.notify(method, data));
  }
 
  public start() {
    this.channel.start();
  }
 
  public stop() {
    this.channel.stop();
    Eif (this.rpc) this.rpc.stop();
  }
}