Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | 2x 2x 2x 36x 36x 36x 36x 36x 36x 36x 35x 35x 17x 17x 17x 35x 15x 15x 15x 15x 35x 2x 2x 2x 35x 5x 5x 5x 1x 1x 1x 1x 3x 3x 7x 7x 9x 8x 8x 8x 8x | import {Subject, ReplaySubject, BehaviorSubject, type Observable, map} from 'rxjs';
import {toUint8Array} from '@jsonjoy.com/buffers/lib/toUint8Array';
import {ChannelState} from './constants';
import type {CloseEventBase, PhysicalChannel} from './types';
export type WebSocketBase = Pick<
WebSocket,
'binaryType' | 'readyState' | 'bufferedAmount' | 'onopen' | 'onclose' | 'onerror' | 'onmessage' | 'close' | 'send'
>;
export interface WebSocketChannelParams {
/**
* Should return a new WebSocket instance. The binary type of the WebSocket
* will be automatically changed to "arraybuffer".
*/
newSocket: () => WebSocketBase;
}
/**
* A `Channel` interface using WebSocket implementation.
*/
export class WebSocketChannel<T extends string | Uint8Array = string | Uint8Array> implements PhysicalChannel<T> {
/**
* Native WebSocket reference, or `undefined` if construction of WebSocket
* failed.
*/
public readonly ws: WebSocketBase | undefined;
public readonly state$ = new BehaviorSubject<ChannelState>(ChannelState.CONNECTING);
public readonly open$ = new ReplaySubject<PhysicalChannel>(1);
public readonly close$ = new ReplaySubject<[self: PhysicalChannel, event: CloseEventBase]>(1);
public readonly error$ = new Subject<Error>();
public readonly message$ = new Subject<T>();
constructor({newSocket}: WebSocketChannelParams) {
try {
const ws = (this.ws = newSocket());
ws.binaryType = 'arraybuffer';
ws.onopen = () => {
this.state$.next(ChannelState.OPEN);
this.open$.next(this);
this.open$.complete();
};
ws.onclose = (event) => {
this.state$.next(ChannelState.CLOSED);
this.close$.next([this, event]);
this.close$.complete();
this.message$.complete();
};
ws.onerror = (event: Event) => {
const errorEvent: Partial<ErrorEvent> = event as unknown as Partial<ErrorEvent>;
const error: Error =
errorEvent.error instanceof Error ? errorEvent.error : new Error(String(errorEvent.message || 'ERROR'));
this.error$.next(error);
};
ws.onmessage = (event) => {
const data = event.data;
const message: T = (typeof data === 'string' ? data : toUint8Array(data)) as unknown as T;
this.message$.next(message);
};
} catch (error) {
this.state$.next(ChannelState.CLOSED);
this.error$.next(error as Error);
this.close$.next([this, {code: 0, wasClean: true, reason: 'INIT'}]);
this.close$.complete();
}
}
public buffer(): number {
Iif (!this.ws) return 0;
return this.ws.bufferedAmount;
}
public close(code?: number, reason?: string): void {
Iif (!this.ws) return;
this.ws.close(code, reason);
}
public isOpen(): boolean {
return this.state$.getValue() === ChannelState.OPEN;
}
public send(data: T): number {
Iif (!this.ws) return -1;
const buffered = this.ws.bufferedAmount;
this.ws.send(data);
return this.ws.bufferedAmount - buffered;
}
public send$(data: T): Observable<number> {
return this.open$.pipe(
map(() => {
if (!this.isOpen()) throw new Error('CLOSED');
return this.send(data);
}),
);
}
}
|