All files / rpc-calls/src/caller/__tests__ LoopbackChannel.ts

76.38% Statements 55/72
44.11% Branches 15/34
83.33% Functions 15/18
81.53% Lines 53/65

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 1361x 1x 1x                   1x 30x 30x     30x     30x 30x               9x 9x 45x 9x           167x 167x         167x 87x 80x   80x   80x 2x 78x 78x         87x 87x 87x 87x 87x                                   2x 2x       78x 78x 1x 1x         30x             87x 87x   87x 87x 87x 87x   80x 80x       9x 9x 9x   9x     78x 78x   78x       87x       87x 87x      
import {Subject} from 'rxjs';
import * as msg from '@jsonjoy.com/rpc-messages';
import {unknown} from '@jsonjoy.com/json-type';
import type {LogicalChannel} from '../../channel/types';
import type {Callee} from '../../callee/types';
import type {Call} from '../../callee/Call';
 
/**
 * A loopback logical channel that processes RPC client messages through
 * a local {@link Callee} and sends responses back. Useful for testing
 * {@link RxCaller} and {@link PersistentCaller} without network transport.
 */
export class LoopbackChannel implements LogicalChannel<msg.RxServerMessage[], msg.RxClientMessage[]> {
  public readonly msg$ = new Subject<msg.RxServerMessage[]>();
  public readonly err$ = new Subject<unknown>();
 
  /** Active server-side calls keyed by request id. */
  private readonly calls = new Map<number, Call>();
 
  constructor(
    private readonly callee: Callee<any>,
    private readonly ctx: any,
  ) {}
 
  /**
   * Convert Error instances to plain objects so that non-enumerable
   * properties like `message` survive JSON round-trips through codecs.
   */
  private static serializeError(error: unknown): unknown {
    Eif (error instanceof Error) {
      const obj: Record<string, unknown> = {message: error.message};
      for (const key of Object.keys(error)) obj[key] = (error as any)[key];
      return obj;
    }
    return error;
  }
 
  public async send(outgoing: msg.RxClientMessage[]): Promise<void> {
    for (const message of outgoing) {
      this.processMessage(message);
    }
  }
 
  private processMessage(message: msg.RxClientMessage): void {
    if (message instanceof msg.RequestCompleteMessage) {
      this.onRequestComplete(message);
    } else Iif (message instanceof msg.RequestDataMessage) {
      this.onRequestData(message);
    } else Iif (message instanceof msg.RequestErrorMessage) {
      this.onRequestError(message);
    } else if (message instanceof msg.NotificationMessage) {
      this.onNotification(message);
    } else Eif (message instanceof msg.ResponseUnsubscribeMessage) {
      this.onResponseUnsubscribe(message);
    }
  }
 
  private onRequestComplete({id, method, value}: msg.RequestCompleteMessage): void {
    const methodName = method || this.getMethodForCall(id);
    Iif (!methodName) return;
    const call = this.getOrCreateCall(id, methodName);
    Eif (value) call.req$.next(value.data);
    call.req$.complete();
  }
 
  private onRequestData({id, method, value}: msg.RequestDataMessage): void {
    const methodName = method || this.getMethodForCall(id);
    if (!methodName) return;
    const call = this.getOrCreateCall(id, methodName);
    if (value) call.req$.next(value.data);
  }
 
  private onRequestError({id, method, value}: msg.RequestErrorMessage): void {
    const methodName = method || this.getMethodForCall(id);
    if (!methodName) return;
    const call = this.getOrCreateCall(id, methodName);
    call.req$.error(value.data);
  }
 
  private onNotification({method, value}: msg.NotificationMessage): void {
    const data = value ? value.data : undefined;
    this.callee.notify(method, data, this.ctx);
  }
 
  private onResponseUnsubscribe({id}: msg.ResponseUnsubscribeMessage): void {
    const call = this.calls.get(id);
    if (call) {
      call.stop$.next(null);
      this.calls.delete(id);
    }
  }
 
  /** Track method names for streaming calls (first message has method, subsequent have ''). */
  private readonly methodNames = new Map<number, string>();
 
  private getMethodForCall(id: number): string | undefined {
    return this.methodNames.get(id);
  }
 
  private getOrCreateCall(id: number, method: string): Call {
    let call = this.calls.get(id);
    Iif (call) return call;
 
    this.methodNames.set(id, method);
    call = this.callee.createCall(method, this.ctx);
    this.calls.set(id, call);
    call.res$.subscribe({
      next: (value) => {
        queueMicrotask(() => {
          this.msg$.next([new msg.ResponseDataMessage(id, unknown(value))]);
        });
      },
      error: (error) => {
        const data = LoopbackChannel.serializeError(error);
        queueMicrotask(() => {
          this.msg$.next([new msg.ResponseErrorMessage(id, unknown(data))]);
        });
        this.cleanup(id);
      },
      complete: () => {
        queueMicrotask(() => {
          this.msg$.next([new msg.ResponseCompleteMessage(id, undefined)]);
        });
        this.cleanup(id);
      },
    });
 
    return call;
  }
 
  private cleanup(id: number): void {
    this.calls.delete(id);
    this.methodNames.delete(id);
  }
}