All files / channel/src WebSocketChannel.ts

85.41% Statements 41/48
57.14% Branches 8/14
81.81% Functions 9/11
93.18% Lines 41/44

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 992x 2x                                       2x             36x 36x 36x 36x 36x     36x 36x 35x 35x 17x 17x 17x   35x 15x 15x 15x 15x   35x 2x   2x 2x   35x 5x 5x 5x     1x 1x 1x 1x         3x 3x       7x 7x       9x       8x 8x 8x 8x                        
import {Subject, ReplaySubject, BehaviorSubject, type Observable, map} from 'rxjs';
import {toUint8Array} from '@jsonjoy.com/buffers/lib/toUint8Array';
import {ChannelState} from './constants';
import type {CloseEventBase, PhysicalChannel} from './types';
 
export type WebSocketBase = Pick<
  WebSocket,
  'binaryType' | 'readyState' | 'bufferedAmount' | 'onopen' | 'onclose' | 'onerror' | 'onmessage' | 'close' | 'send'
>;
 
export interface WebSocketChannelParams {
  /**
   * Should return a new WebSocket instance. The binary type of the WebSocket
   * will be automatically changed to "arraybuffer".
   */
  newSocket: () => WebSocketBase;
}
 
/**
 * A `Channel` interface using WebSocket implementation.
 */
export class WebSocketChannel<T extends string | Uint8Array = string | Uint8Array> implements PhysicalChannel<T> {
  /**
   * Native WebSocket reference, or `undefined` if construction of WebSocket
   * failed.
   */
  public readonly ws: WebSocketBase | undefined;
 
  public readonly state$ = new BehaviorSubject<ChannelState>(ChannelState.CONNECTING);
  public readonly open$ = new ReplaySubject<PhysicalChannel>(1);
  public readonly close$ = new ReplaySubject<[self: PhysicalChannel, event: CloseEventBase]>(1);
  public readonly error$ = new Subject<Error>();
  public readonly message$ = new Subject<T>();
 
  constructor({newSocket}: WebSocketChannelParams) {
    try {
      const ws = (this.ws = newSocket());
      ws.binaryType = 'arraybuffer';
      ws.onopen = () => {
        this.state$.next(ChannelState.OPEN);
        this.open$.next(this);
        this.open$.complete();
      };
      ws.onclose = (event) => {
        this.state$.next(ChannelState.CLOSED);
        this.close$.next([this, event]);
        this.close$.complete();
        this.message$.complete();
      };
      ws.onerror = (event: Event) => {
        const errorEvent: Partial<ErrorEvent> = event as unknown as Partial<ErrorEvent>;
        const error: Error =
          errorEvent.error instanceof Error ? errorEvent.error : new Error(String(errorEvent.message || 'ERROR'));
        this.error$.next(error);
      };
      ws.onmessage = (event) => {
        const data = event.data;
        const message: T = (typeof data === 'string' ? data : toUint8Array(data)) as unknown as T;
        this.message$.next(message);
      };
    } catch (error) {
      this.state$.next(ChannelState.CLOSED);
      this.error$.next(error as Error);
      this.close$.next([this, {code: 0, wasClean: true, reason: 'INIT'}]);
      this.close$.complete();
    }
  }
 
  public buffer(): number {
    Iif (!this.ws) return 0;
    return this.ws.bufferedAmount;
  }
 
  public close(code?: number, reason?: string): void {
    Iif (!this.ws) return;
    this.ws.close(code, reason);
  }
 
  public isOpen(): boolean {
    return this.state$.getValue() === ChannelState.OPEN;
  }
 
  public send(data: T): number {
    Iif (!this.ws) return -1;
    const buffered = this.ws.bufferedAmount;
    this.ws.send(data);
    return this.ws.bufferedAmount - buffered;
  }
 
  public send$(data: T): Observable<number> {
    return this.open$.pipe(
      map(() => {
        if (!this.isOpen()) throw new Error('CLOSED');
        return this.send(data);
      }),
    );
  }
}