Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | 1x 1x 1x 1x 30x 30x 30x 30x 30x 9x 9x 45x 9x 167x 167x 167x 87x 80x 80x 80x 2x 78x 78x 87x 87x 87x 87x 87x 2x 2x 78x 78x 1x 1x 30x 87x 87x 87x 87x 87x 87x 80x 80x 9x 9x 9x 9x 78x 78x 78x 87x 87x 87x | import {Subject} from 'rxjs';
import * as msg from '@jsonjoy.com/rpc-messages';
import {unknown} from '@jsonjoy.com/json-type';
import type {LogicalChannel} from '../../channel/types';
import type {Callee} from '../../callee/types';
import type {Call} from '../../callee/Call';
/**
* A loopback logical channel that processes RPC client messages through
* a local {@link Callee} and sends responses back. Useful for testing
* {@link RxCaller} and {@link PersistentCaller} without network transport.
*/
export class LoopbackChannel implements LogicalChannel<msg.RxServerMessage[], msg.RxClientMessage[]> {
public readonly msg$ = new Subject<msg.RxServerMessage[]>();
public readonly err$ = new Subject<unknown>();
/** Active server-side calls keyed by request id. */
private readonly calls = new Map<number, Call>();
constructor(
private readonly callee: Callee<any>,
private readonly ctx: any,
) {}
/**
* Convert Error instances to plain objects so that non-enumerable
* properties like `message` survive JSON round-trips through codecs.
*/
private static serializeError(error: unknown): unknown {
Eif (error instanceof Error) {
const obj: Record<string, unknown> = {message: error.message};
for (const key of Object.keys(error)) obj[key] = (error as any)[key];
return obj;
}
return error;
}
public async send(outgoing: msg.RxClientMessage[]): Promise<void> {
for (const message of outgoing) {
this.processMessage(message);
}
}
private processMessage(message: msg.RxClientMessage): void {
if (message instanceof msg.RequestCompleteMessage) {
this.onRequestComplete(message);
} else Iif (message instanceof msg.RequestDataMessage) {
this.onRequestData(message);
} else Iif (message instanceof msg.RequestErrorMessage) {
this.onRequestError(message);
} else if (message instanceof msg.NotificationMessage) {
this.onNotification(message);
} else Eif (message instanceof msg.ResponseUnsubscribeMessage) {
this.onResponseUnsubscribe(message);
}
}
private onRequestComplete({id, method, value}: msg.RequestCompleteMessage): void {
const methodName = method || this.getMethodForCall(id);
Iif (!methodName) return;
const call = this.getOrCreateCall(id, methodName);
Eif (value) call.req$.next(value.data);
call.req$.complete();
}
private onRequestData({id, method, value}: msg.RequestDataMessage): void {
const methodName = method || this.getMethodForCall(id);
if (!methodName) return;
const call = this.getOrCreateCall(id, methodName);
if (value) call.req$.next(value.data);
}
private onRequestError({id, method, value}: msg.RequestErrorMessage): void {
const methodName = method || this.getMethodForCall(id);
if (!methodName) return;
const call = this.getOrCreateCall(id, methodName);
call.req$.error(value.data);
}
private onNotification({method, value}: msg.NotificationMessage): void {
const data = value ? value.data : undefined;
this.callee.notify(method, data, this.ctx);
}
private onResponseUnsubscribe({id}: msg.ResponseUnsubscribeMessage): void {
const call = this.calls.get(id);
if (call) {
call.stop$.next(null);
this.calls.delete(id);
}
}
/** Track method names for streaming calls (first message has method, subsequent have ''). */
private readonly methodNames = new Map<number, string>();
private getMethodForCall(id: number): string | undefined {
return this.methodNames.get(id);
}
private getOrCreateCall(id: number, method: string): Call {
let call = this.calls.get(id);
Iif (call) return call;
this.methodNames.set(id, method);
call = this.callee.createCall(method, this.ctx);
this.calls.set(id, call);
call.res$.subscribe({
next: (value) => {
queueMicrotask(() => {
this.msg$.next([new msg.ResponseDataMessage(id, unknown(value))]);
});
},
error: (error) => {
const data = LoopbackChannel.serializeError(error);
queueMicrotask(() => {
this.msg$.next([new msg.ResponseErrorMessage(id, unknown(data))]);
});
this.cleanup(id);
},
complete: () => {
queueMicrotask(() => {
this.msg$.next([new msg.ResponseCompleteMessage(id, undefined)]);
});
this.cleanup(id);
},
});
return call;
}
private cleanup(id: number): void {
this.calls.delete(id);
this.methodNames.delete(id);
}
}
|