All files / json-crdt-server/src/services PresenceService.ts

87.67% Statements 64/73
71.87% Branches 23/32
85.71% Functions 12/14
91.37% Lines 53/58

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 1026x         6x 105x 105x     4x 4x 4x 4x           4x 4x 4x 4x 4x 4x 4x 4x 4x       2x 2x 2x 2x 2x 2x 2x 2x                     6x 6x 6x 6x 6x 6x 6x 2x 2x   6x 6x 1x 1x       1x 1x 1x               12x 12x 5x 5x 5x       10x 10x 6x 6x 6x   6x       1x              
import {Observable, type Observer} from 'rxjs';
import type {TPresenceEntry} from '../routes/presence/schema';
 
export type PresenceRoom = Map<string, TPresenceEntry>;
 
export class PresenceService {
  private readonly rooms = new Map<string, PresenceRoom>();
  private readonly observers = new Map<string, Observer<TPresenceEntry[]>[]>();
 
  public async update(roomId: string, entryId: string, ttl: number, data: unknown): Promise<TPresenceEntry> {
    const now = Date.now();
    const room = this.getRoom(roomId);
    Iif (!data || typeof data !== 'object') throw new Error('ROOM_ENTRY_MUST_BE_OBJECT');
    const entry: TPresenceEntry = room.get(entryId) ?? {
      id: entryId,
      lastSeen: now,
      validUntil: now + ttl,
      data: {},
    };
    entry.lastSeen = now;
    entry.validUntil = now + ttl;
    Object.assign(entry.data, data);
    room.set(entryId, entry);
    this.cleanUpRoom(roomId);
    await new Promise((resolve) => setImmediate(resolve));
    const observers = this.observers.get(roomId);
    if (observers) for (const observer of observers) observer.next([entry]);
    return entry;
  }
 
  public async remove(roomId: string, entryId: string): Promise<void> {
    const room = this.getRoom(roomId);
    room.delete(entryId);
    Eif (!room.size) this.rooms.delete(roomId);
    await new Promise((resolve) => setImmediate(resolve));
    const observers = this.observers.get(roomId);
    Eif (observers)
      for (const observer of observers)
        observer.next([
          {
            id: entryId,
            lastSeen: Date.now(),
            validUntil: 0,
            data: {},
          },
        ]);
  }
 
  public listen$(roomId: string): Observable<TPresenceEntry[]> {
    return new Observable<TPresenceEntry[]>((observer) => {
      this.cleanUpRoom(roomId);
      if (!this.observers.has(roomId)) this.observers.set(roomId, []);
      this.observers.get(roomId)!.push(observer);
      const room = this.getRoom(roomId);
      const entries: TPresenceEntry[] = [];
      for (const entry of room.values()) {
        entries.push(entry);
        Iif (entries.length === 100) break;
      }
      if (entries.length) observer.next(entries);
      return () => {
        const observers: Observer<TPresenceEntry[]>[] = this.observers.get(roomId)!;
        Iif (!observers) {
          this.cleanUpRoom(roomId);
          return;
        }
        const index = observers.findIndex((o) => o === observer);
        Eif (index > -1) observers.splice(index, 1);
        Iif (!observers.length) {
          this.observers.delete(roomId);
        }
      };
    });
  }
 
  private getRoom(roomId: string): PresenceRoom {
    const room = this.rooms.get(roomId);
    if (room) return room;
    const newRoom = new Map<string, TPresenceEntry>();
    this.rooms.set(roomId, newRoom);
    return newRoom;
  }
 
  private cleanUpRoom(roomId: string) {
    const room = this.rooms.get(roomId);
    if (!room) return;
    const now = Date.now();
    for (const [entry, presence] of room.entries()) {
      Iif (presence.validUntil < now) room.delete(entry);
    }
    Iif (!room.size) this.rooms.delete(roomId);
  }
 
  public stats(): {rooms: number; entries: number; observers: number} {
    return {
      rooms: this.rooms.size,
      entries: [...this.rooms.values()].reduce((acc, v) => acc + v.size, 0),
      observers: [...this.observers.values()].reduce((acc, v) => acc + v.length, 0),
    };
  }
}