Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | 1x 1x 1x 1x 1x 18x 9x 9x 9x 9x 14x 14x 2x 2x 12x 12x 9x 9x 9x 9x 12x 12x 9x 5x 5x 5x 18x 18x 18x 18x 18x 18x 18x 18x 14x 14x 4x | import * as msg from '@jsonjoy.com/rpc-messages';
import {validateId, validateMethod} from '@jsonjoy.com/rpc-messages/lib/validation';
import {unknown, Value} from '@jsonjoy.com/json-type';
import {TypedRpcError} from '../callee/error/typed';
import type {Callee} from '../callee';
import type {Procedures} from '../procedures';
export type IncomingBatchMessage =
| msg.RequestDataMessage
| msg.RequestCompleteMessage
| msg.RequestErrorMessage
| msg.NotificationMessage;
export type OutgoingBatchMessage = msg.ResponseCompleteMessage | msg.ResponseErrorMessage;
export interface BatchDispatcherOptions<Ctx = unknown> {
callee: Callee<Ctx, Procedures>;
}
/**
* Batch dispatcher receives a batch of messages, processes them, and returns a
* batch of responses.
*/
export class BatchDispatcher<Ctx = unknown> {
protected readonly callee: Callee<Ctx, Procedures>;
constructor({callee}: BatchDispatcherOptions<Ctx>) {
this.callee = callee;
}
public async onBatch(list: IncomingBatchMessage[], ctx: Ctx): Promise<OutgoingBatchMessage[]> {
try {
const promises: Promise<OutgoingBatchMessage>[] = [];
const length = list.length;
for (let i = 0; i < length; i++) {
const message = list[i];
switch (message.constructor) {
case msg.NotificationMessage:
this.onNotification(message as msg.NotificationMessage, ctx);
break;
case msg.RequestDataMessage:
case msg.RequestCompleteMessage:
case msg.RequestErrorMessage:
promises.push(
this.onRequest(
message as msg.RequestDataMessage | msg.RequestCompleteMessage | msg.RequestErrorMessage,
ctx,
),
);
break;
}
}
const settled = await Promise.allSettled(promises);
const result: OutgoingBatchMessage[] = [];
const settledLength = settled.length;
for (let i = 0; i < settledLength; i++) {
const item = settled[i];
result.push(item.status === 'fulfilled' ? item.value : item.reason);
}
return result;
} catch (error) {
const value = TypedRpcError.internalErrorValue(error);
return [new msg.ResponseErrorMessage(-1, value)];
}
}
public onNotification(message: msg.NotificationMessage, ctx: Ctx): void {
const method = message.method;
validateMethod(method);
this.callee.notify(method, message.value?.data, ctx).catch(() => {});
}
public async onRequest(
message: msg.RequestDataMessage | msg.RequestCompleteMessage | msg.RequestErrorMessage,
ctx: Ctx,
): Promise<OutgoingBatchMessage> {
const id = message.id;
validateId(id);
const method = message.method;
validateMethod(method);
try {
const value = message.value;
const data = value ? value.data : undefined;
const result = await this.callee.call(method, data, ctx);
const typedResult = result instanceof Value ? result : unknown(result);
return new msg.ResponseCompleteMessage(id, typedResult);
} catch (error) {
throw new msg.ResponseErrorMessage(id, TypedRpcError.valueFrom(error));
}
}
}
|