Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | 9x 9x 9x 78x 36x 61x 61x 61x 61x 61x 36x 9x 196x 9x 100x 332x 332x 9x 196x 196x 196x 196x 196x 312x 9x 136x 136x 136x 136x 136x 136x 51x 98x | import {firstValueFrom, from, of, switchMap, take, type Observable, isObservable} from 'rxjs';
export type Procedures<Ctx = unknown> = Record<string, Procedure<any, any, Ctx>>;
export type ProceduresCtx<P extends Procedures> = P extends Procedures<infer Ctx> ? Ctx : unknown;
export type ProcedureReq<P> = P extends Procedure<infer Req, any, any> ? Req : never;
export type ProcedureRes<P> = P extends Procedure<any, infer Res, any> ? Res : never;
/**
* A low-level abstraction for an RPC procedure. This class is transport and
* serialization agnostic, it specifies a single method that can be executed
* by an RPC server.
*/
export abstract class Procedure<Req = unknown, Res = unknown, Ctx = unknown> {
/**
* Factory method for creating a procedure. Automatically detects if the
* provided function is a unary or reactive one and creates the appropriate
* procedure type.
*/
public static readonly new = <Req = unknown, Res = unknown, Ctx = unknown>(
fn: Res | ((request: Req, ctx: Ctx) => Res | Promise<Res> | Observable<Res>),
validate: ((request: Req) => void) | undefined = undefined,
preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined = undefined,
) => {
if (typeof fn !== 'function') return Procedure.unary(async () => fn as Res, validate, preCall);
const streamingCall = (req: Observable<Req>, ctx: Ctx) => {
return req.pipe(
take(1),
switchMap((r) => {
const res = (fn as (request: Req, ctx: Ctx) => Res | Promise<Res> | Observable<Res>)(r, ctx);
Iif (isObservable(res)) return res;
Iif (res instanceof Promise) return res;
return Promise.resolve(res);
}),
);
};
return new RxProcedure<Req, Res, Ctx>(streamingCall, validate, preCall);
};
/**
* Convenience method for creating a *unary procedure*, which receives a
* single request value and returns a single response value.
*/
public static readonly unary = <Req = unknown, Res = unknown, Ctx = unknown>(
fn: (request: Req, ctx: Ctx) => Promise<Res>,
validate: ((request: Req) => void) | undefined = undefined,
preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined = undefined,
): UnaryProcedure<Req, Res, Ctx> => new UnaryProcedure<Req, Res, Ctx>(fn, validate, preCall);
/**
* Convenience method for creating a *reactive procedure*, which receives a
* stream of request values and returns a stream of response values.
*/
public static readonly rx = <Req = unknown, Res = unknown, Ctx = unknown>(
fn: (request$: Observable<Req>, ctx: Ctx) => Observable<Res>,
validate: ((request: Req) => void) | undefined = undefined,
preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined = undefined,
): RxProcedure<Req, Res, Ctx> => new RxProcedure<Req, Res, Ctx>(fn, validate, preCall);
/**
* Specifies if request or response of the method could be a stream.
*/
rx: boolean = false;
/**
* Whether to pretty print the response.
*/
pretty: boolean = false;
/**
* Validation logic. Should throw if request is invalid, not throw otherwise.
* In case request is a stream, validation method is executed for every
* emitted value.
*/
abstract validate: ((request: Req) => void) | undefined;
/**
* Method which is executed before an actual call to an RPC method. Pre-call
* checks should execute all necessary checks (such as authentication,
* authorization, throttling, etc.) before allowing the real method to
* proceed. Pre-call checks should throw, if for any reason the call should
* not proceed. Return void to allow execution of the actual call.
*
* @param ctx Request context object.
* @param request Request payload, the first emitted value in case of
* streaming request.
*/
abstract preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined;
/**
* Method which is executed to perform the actual call. When request is a
* single and response is a single value.
*
* @param request Request payload.
* @param ctx Request context object.
* @return Response data.
*/
abstract call(request: Req, ctx: Ctx): Promise<Res>;
/**
* Method which is executed to perform the actual call. When request is a
* stream and response is a stream.
*
* @param request$ Request payload as an observable.
* @param ctx Request context object.
* @return Response data as an observable.
*/
abstract call$(request$: Observable<Req>, ctx: Ctx): Observable<Res>;
}
/**
* Procedure which receives a single request value, executes asynchronously
* and returns a single response value.
*/
export class UnaryProcedure<Req = unknown, Res = unknown, Ctx = unknown> extends Procedure<Req, Res, Ctx> {
rx: boolean = false;
constructor(
private readonly fn: (request: Req, ctx: Ctx) => Promise<Res>,
public readonly validate: ((request: Req) => void) | undefined,
public readonly preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined,
) {
super();
}
async call(request: Req, ctx: Ctx): Promise<Res> {
return await this.fn(request, ctx);
}
call$(request$: Observable<Req>, ctx: Ctx): Observable<Res> {
return from((async () => this.call(await firstValueFrom(request$), ctx))());
}
}
/**
* Procedure which receives a stream of request values and returns a stream of
* response values.
*/
export class RxProcedure<Req = unknown, Res = unknown, Ctx = unknown> extends Procedure<Req, Res, Ctx> {
rx: boolean = true;
/**
* When call `request$` is a multi-value observable and request data is coming
* in while pre-call check is still being executed, this property determines
* how many `request$` values to buffer in memory before raising an error
* and stopping the streaming call. Defaults to the `preCallBufferSize` param
* set on the `RpcServer`.
*/
preCallBufferSize: number = 0;
constructor(
private readonly fn: (request$: Observable<Req>, ctx: Ctx) => Observable<Res>,
public readonly validate: ((request: Req) => void) | undefined,
public readonly preCall: ((ctx: Ctx, request: Req) => Promise<void>) | undefined,
) {
super();
}
async call(request: Req, ctx: Ctx): Promise<Res> {
return await firstValueFrom(this.fn(of(request), ctx));
}
call$(request$: Observable<Req>, ctx: Ctx): Observable<Res> {
return this.fn(request$, ctx);
}
}
|