All files / json-crdt-server/src/services PubSubService.ts

92.59% Statements 25/27
80% Branches 8/10
88.88% Functions 8/9
95% Lines 19/20

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 376x   6x 105x     21x 21x 21x 21x 21x 3x 3x 3x 3x 3x 2x             1449x 1447x 1447x 32x       1x            
import {Observable, type Observer} from 'rxjs';
 
export class PubsubService {
  private readonly observers = new Map<string, Observer<unknown>[]>();
 
  public listen$(channel: string): Observable<unknown> {
    return new Observable<unknown>((observer) => {
      if (!this.observers.has(channel)) this.observers.set(channel, []);
      const observers: Observer<unknown>[] = this.observers.get(channel)!;
      observers.push(observer);
      return () => {
        const observers: Observer<unknown>[] = this.observers.get(channel)!;
        Iif (!observers) return;
        const index = observers.findIndex((o) => o === observer);
        Eif (index > -1) observers.splice(index, 1);
        if (!observers.length) {
          this.observers.delete(channel);
        }
      };
    });
  }
 
  public async publish(channel: string, message: unknown): Promise<void> {
    await new Promise((resolve) => setImmediate(resolve));
    const observers = this.observers.get(channel);
    if (!observers) return;
    for (const observer of observers) observer.next(message);
  }
 
  public stats(): {channels: number; observers: number} {
    return {
      channels: this.observers.size,
      observers: [...this.observers.values()].reduce((acc, v) => acc + v.length, 0),
    };
  }
}