Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | 3x 3x 3x 3x 201x 201x 201x 3x 88x 88x 88x 88x 88x 206x 4x 228x 278x 278x 112x 20x 166x 166x 88x 88x 88x 88x 92x 92x 91x 20x 20x 20x 20x 201x 201x 201x 201x 201x 201x 201x 402x 402x 201x 201x 201x 201x 201x 43x 43x 4x 4x 4x 4x 2x 2x 2x 2x 41x 41x 41x 41x 43x 158x 158x 158x 158x 201x 198x 198x 198x 81x 81x 198x 29x 8x 8x 89x 89x 12x 12x 12x 12x 89x | import {firstValueFrom, isObservable, Observable, type Observer, of, Subject, type Subscription} from 'rxjs';
import {unknown} from '@jsonjoy.com/json-type';
import * as msg from '@jsonjoy.com/rpc-messages';
import {subscribeCompleteObserver} from './util/subscribeCompleteObserver';
import type {Caller, CallerMethods} from './types';
import type {LogicalChannel} from '../channel/types';
/**
* An in-flight RPC call record.
*/
class Call {
constructor(
/* In-between observable for request stream. */
public readonly req$: Subject<unknown>,
/* In-between observable for response stream. */
public readonly res$: Subject<unknown>,
/** Whether response stream was finalized by server. */
public resFinalized: boolean,
) {}
}
/**
* Configuration parameters for {@link RxLogicalChannelCaller}.
*/
export interface RxLogicalChannelCallerOptions {
/**
* Channel to send and receive messages.
*/
channel: LogicalChannel<msg.RxServerMessage[], msg.RxClientMessage[]>;
}
/**
* Implements {@link Caller} on top of a {@link LogicalChannel}, using
* Reactive RPC semantics.
*/
export class RxLogicalChannelCaller<Methods extends CallerMethods<any> = CallerMethods> implements Caller<Methods> {
/** In-flight RPC calls. */
private readonly calls = new Map<number, Call>();
/** Message ID counter. */
private id = 1;
private readonly _msgSub: Subscription | undefined;
public readonly channel: LogicalChannel<msg.RxServerMessage[], msg.RxClientMessage[]>;
public readonly notification$: Observable<msg.NotificationMessage> = new Subject<msg.NotificationMessage>();
constructor({channel}: RxLogicalChannelCallerOptions) {
this.channel = channel;
this._msgSub = channel.msg$.subscribe({
next: (messages) => this.onMessages(messages),
});
}
/**
* Returns the number of active in-flight calls. Useful for reporting and
* testing for memory leaks in unit tests.
*
* @returns Number of in-flight RPC calls.
*/
public getInflightCallCount(): number {
return this.calls.size;
}
/**
* Processes a batch of messages received from the server.
*
* @param messages List of messages from server.
*/
public onMessages(messages: msg.RxServerMessage[]): void {
const length = messages.length;
for (let i = 0; i < length; i++) this.onMessage(messages[i]);
}
/**
* Processes a message received from the server.
*
* @param messages A message from the server.
*/
public onMessage(message: msg.RxServerMessage): void {
if (message instanceof msg.ResponseCompleteMessage) this.onResponseComplete(message);
else if (message instanceof msg.ResponseDataMessage) this.onResponseData(message);
else if (message instanceof msg.ResponseErrorMessage) this.onResponseError(message);
else Eif (message instanceof msg.RequestUnsubscribeMessage) this.onRequestUnsubscribe(message);
else if (message instanceof msg.NotificationMessage)
(this.notification$ as Subject<msg.NotificationMessage>).next(message);
else console.warn('Unknown message type', message);
}
public onResponseComplete({id, value}: msg.ResponseCompleteMessage): void {
const call = this.calls.get(id);
if (!call) return;
call.resFinalized = true;
const data = value ? (value as any).data : undefined;
if (data !== void 0) call.res$.next(data);
call.res$.complete();
}
public onResponseData({id, value}: msg.ResponseDataMessage): void {
const call = this.calls.get(id);
if (!call) return;
call.res$.next(value.data);
}
public onResponseError({id, value}: msg.ResponseErrorMessage): void {
const call = this.calls.get(id);
Iif (!call) return;
call.resFinalized = true;
call.res$.error(value.data);
}
public onRequestUnsubscribe({id}: msg.RequestUnsubscribeMessage): void {
const call = this.calls.get(id);
if (!call) return;
call.req$.complete();
}
/**
* Execute remote RPC method. We use in-between `req$` and `res$` observables.
*
* ```
* +--------+ +--------+
* | data | -> | req$ | -> Server messages
* +--------+ +--------+
*
* +--------+ +-------------------+
* Server messages -> | res$ | -> | user observable |
* +--------+ +-------------------+
* ```
*
* @param method RPC method name.
* @param data RPC method static payload or stream of data.
*/
public call$<K extends keyof Methods>(
method: K,
data: Observable<Methods[K][0]> | Methods[K][0],
): Observable<Methods[K][1]> {
const id = this.id++;
Iif (this.id >= 0xffff) this.id = 1;
Iif (this.calls.has(id)) return this.call$(method, data as any);
const req$ = new Subject<unknown>();
const res$ = new Subject<unknown>();
let finalizedStreams = 0;
const cleanup = () => {
finalizedStreams++;
if (finalizedStreams === 2) this.calls.delete(id);
};
res$.subscribe({error: cleanup, complete: cleanup});
const entry = new Call(req$, res$, false);
this.calls.set(id, entry);
const channel = this.channel;
if (isObservable(data)) {
let firstMessageSent = false;
subscribeCompleteObserver<unknown>(req$, {
next: (value) => {
const messageMethod = firstMessageSent ? '' : method;
firstMessageSent = true;
const message = new msg.RequestDataMessage(id, messageMethod as string, unknown(value));
channel.send([message]);
},
error: (error) => {
cleanup();
const messageMethod = firstMessageSent ? '' : method;
const message = new msg.RequestErrorMessage(id, messageMethod as string, unknown(error));
channel.send([message]);
},
complete: (value) => {
cleanup();
const messageMethod = firstMessageSent ? '' : method;
const message = new msg.RequestCompleteMessage(id, messageMethod as string, unknown(value));
channel.send([message]);
},
});
data.subscribe(req$);
} else {
const message = new msg.RequestCompleteMessage(id, method as string, unknown(data));
channel.send([message]);
req$.complete();
cleanup();
}
return new Observable<unknown>((observer: Observer<unknown>) => {
res$.subscribe(observer);
return () => {
if (!entry.resFinalized) {
const message = new msg.ResponseUnsubscribeMessage(id);
channel.send([message]);
}
res$.complete();
};
});
}
public async call<K extends keyof Methods>(method: K, request: Methods[K][0]): Promise<Methods[K][1]> {
return await firstValueFrom(this.call$(method, of(request)));
}
/**
* Send a one-way notification message without expecting any response.
*
* @param method Remote method name.
* @param data Static payload data.
*/
public notify<K extends keyof Methods>(method: K, data: Methods[K][0]): void {
const message = new msg.NotificationMessage(method as string, unknown(data));
this.channel.send([message]);
}
/**
* Stop all in-flight RPC calls and disable buffer. This operation is not
* reversible, you cannot use the RPC client after this call.
*/
public stop(reason = 'STOP'): void {
this._msgSub?.unsubscribe();
// this.buffer.onFlush = () => {};
for (const call of this.calls.values()) {
Iif (!call.req$.closed && !call.req$.isStopped) call.req$.error(new Error(reason));
Eif (!call.resFinalized) {
call.resFinalized = true;
Eif (!call.res$.closed && !call.res$.isStopped) call.res$.complete();
}
}
this.calls.clear();
}
public disconnect() {
this.stop('DISCONNECT');
}
}
|