All files / channel/src WebSocketChannel.ts

87.27% Statements 48/55
57.14% Branches 8/14
83.33% Functions 10/12
94.11% Lines 48/51

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 1122x 2x                                       2x                 36x 36x 36x 36x 36x   36x         36x 36x 35x 35x 17x 17x 17x 17x   35x 15x 15x 15x 15x 15x   35x 2x   2x 2x   35x 5x 5x 5x 5x   35x 15x 15x     1x 1x 1x 1x         3x 3x       7x 7x       9x       8x 8x 8x 8x                        
import {Subject, ReplaySubject, BehaviorSubject, type Observable, map} from 'rxjs';
import {toUint8Array} from '@jsonjoy.com/buffers/lib/toUint8Array';
import {ChannelState} from './constants';
import type {CloseEventBase, PhysicalChannel} from './types';
 
export type WebSocketBase = Pick<
  WebSocket,
  'binaryType' | 'readyState' | 'bufferedAmount' | 'onopen' | 'onclose' | 'onerror' | 'onmessage' | 'close' | 'send'
>;
 
export interface WebSocketChannelParams {
  /**
   * Should return a new WebSocket instance. The binary type of the WebSocket
   * will be automatically changed to "arraybuffer".
   */
  newSocket: () => WebSocketBase;
}
 
/**
 * A client WebSocket wrapper.
 */
export class WebSocketChannel<T extends string | Uint8Array<any> = string | Uint8Array<any>>
  implements PhysicalChannel<T>
{
  /**
   * Native WebSocket reference, or `undefined` if construction of WebSocket
   * failed.
   */
  public readonly ws: WebSocketBase | undefined;
 
  public readonly state$ = new BehaviorSubject<ChannelState>(ChannelState.CONNECTING);
  public readonly open$ = new ReplaySubject<PhysicalChannel<T>>(1);
  public readonly close$ = new ReplaySubject<[self: PhysicalChannel<T>, event: CloseEventBase]>(1);
  public readonly error$ = new Subject<Error>();
  public readonly message$ = new Subject<T>();
 
  public closed: boolean = true;
  public onmessage?: (data: T, isUtf8: boolean) => void;
  public onclose?: (code: number, reason: string, wasClean: boolean) => void;
 
  constructor({newSocket}: WebSocketChannelParams) {
    try {
      const ws = (this.ws = newSocket());
      ws.binaryType = 'arraybuffer';
      ws.onopen = () => {
        this.state$.next(ChannelState.OPEN);
        this.closed = false;
        this.open$.next(this);
        this.open$.complete();
      };
      ws.onclose = (event) => {
        this.state$.next(ChannelState.CLOSED);
        this.closed = true;
        this.close$.next([this, event]);
        this.close$.complete();
        this.message$.complete();
      };
      ws.onerror = (event: Event) => {
        const errorEvent: Partial<ErrorEvent> = event as unknown as Partial<ErrorEvent>;
        const error: Error =
          errorEvent.error instanceof Error ? errorEvent.error : new Error(String(errorEvent.message || 'ERROR'));
        this.error$.next(error);
      };
      ws.onmessage = (event) => {
        const data = event.data;
        const message: T = (typeof data === 'string' ? data : toUint8Array(data)) as unknown as T;
        this.message$.next(message);
        this.onmessage?.(message, typeof data === 'string');
      };
      this.close$.subscribe(([, {code, reason, wasClean}]) => {
        this.closed = true;
        this.onclose?.(code, reason, wasClean);
      });
    } catch (error) {
      this.state$.next(ChannelState.CLOSED);
      this.error$.next(error as Error);
      this.close$.next([this, {code: 0, wasClean: true, reason: 'INIT'}]);
      this.close$.complete();
    }
  }
 
  public buffer(): number {
    Iif (!this.ws) return 0;
    return this.ws.bufferedAmount;
  }
 
  public close(code?: number, reason?: string): void {
    Iif (!this.ws) return;
    this.ws.close(code, reason);
  }
 
  public isOpen(): boolean {
    return this.state$.getValue() === ChannelState.OPEN;
  }
 
  public send(data: T): number {
    Iif (!this.ws) return -1;
    const buffered = this.ws.bufferedAmount;
    this.ws.send(data);
    return this.ws.bufferedAmount - buffered;
  }
 
  public send$(data: T): Observable<number> {
    return this.open$.pipe(
      map(() => {
        if (!this.isOpen()) throw new Error('CLOSED');
        return this.send(data);
      }),
    );
  }
}