All files / collaborative-presence/src PresenceManager.ts

71.33% Statements 112/157
70.23% Branches 59/84
46.42% Functions 13/28
75% Lines 102/136

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 3192x 2x 2x 2x           2x   65x 65x 65x                                               2x 43x   43x                         43x 43x     43x 43x 43x 43x 43x 43x                                   5x                           60x 60x 60x 60x 54x 54x       12x       11x         5x 5x 5x 5x 7x 4x 4x     5x 5x       7x       4x 3x 3x 3x       5x 5x 5x 5x 5x                 43x                                             21x 21x 21x 21x 19x 18x 18x 18x 18x 18x   18x 18x 17x 17x 17x 17x 13x 13x       17x 15x 15x   17x 10x 10x   17x 17x 17x   17x     21x 21x 18x 18x 18x   21x                   16x 16x 16x 16x 16x 16x 16x 16x 16x 16x 16x 16x 16x 16x 17x 17x 17x 17x       10x                                           1x                                                                                                  
import {FanOut} from 'thingies/lib/fanout';
import {NodeType, UserPresenceIdx} from './constants';
import * as id from './id';
import {ResolvedSelection} from './ResolvedSelection';
import type {JsonCrdtSelection, PresenceIdShorthand, RgaSelection, UserPresence} from './types';
import type {Model} from 'json-joy/lib/json-crdt';
import type {JsonCrdtDataType} from 'json-joy/lib/json-crdt-patch/constants';
import type {ITimestampStruct} from 'json-joy/lib/json-crdt-patch';
 
export class PresenceEvent {
  constructor(
    public readonly added: string[],
    public readonly updated: string[],
    public readonly removed: string[],
  ) {}
}
 
export type PeerEntry<Meta extends object = object> = [presence: UserPresence<Meta>, receivedAt: number];
 
export interface PresenceManagerOpts {
  /** Milliseconds after which peers are considered stale. Default 30000. */
  timeout?: number;
  /** Heartbeat interval in ms — presence is pushed at least this often. Default 5000. */
  heartbeat?: number;
  /** Minimum ms between pushes triggered by local changes (throttle). Default 50. */
  throttle?: number;
  /** GC interval in ms for removing outdated peers. Default 5000. Pass 0 to disable. */
  gcInterval?: number;
}
 
/**
 * Reactive in-memory presence store. Tracks remote peer states keyed by
 * `processId`. LWW by `seq` — stale updates are silently ignored.
 *
 * When {@link start} is called, the manager runs internal timers for heartbeat
 * pushes and peer GC. Call {@link stop} (or {@link destroy}) to tear down.
 */
export class PresenceManager<Meta extends object = object> {
  public peers: Record<string, PeerEntry<Meta>> = {};
  public local: UserPresence;
  public readonly onChange: FanOut<PresenceEvent> = new FanOut<PresenceEvent>();
 
  /** Called by the manager when local presence should be sent to the server. */
  public onpush: ((data: UserPresence) => void) | undefined;
 
  public readonly timeout: number;
  public readonly heartbeat: number;
  public readonly throttleMs: number;
  public readonly gcInterval: number;
 
  private _heartbeatTimer: ReturnType<typeof setInterval> | undefined;
  private _gcTimer: ReturnType<typeof setInterval> | undefined;
  private _throttleTimer: ReturnType<typeof setTimeout> | undefined;
  private _pushedSeq: number = -1;
  private _started = false;
 
  constructor(opts?: PresenceManagerOpts | number) {
    const o: PresenceManagerOpts = typeof opts === 'number' ? {timeout: opts} : (opts ?? {});
    this.timeout = o.timeout ?? 30_000;
    this.heartbeat = o.heartbeat ?? 5_000;
    this.throttleMs = o.throttle ?? 50;
    this.gcInterval = o.gcInterval ?? 5_000;
    this.local = ['', Math.random().toString(36).slice(2), 0, Math.floor(Date.now() / 1000), [], {} as Meta];
  }
 
  // ---------------------------------------------------------------- lifecycle
 
  /** Start heartbeat and GC timers. Safe to call multiple times. */
  start(): void {
    if (this._started) return;
    this._started = true;
    this._push();
    this._heartbeatTimer = setInterval(() => this._push(), this.heartbeat);
    if (this.gcInterval > 0) {
      this._gcTimer = setInterval(() => this.removeOutdated(), this.gcInterval);
    }
  }
 
  /** Stop timers. Does NOT destroy peer state. */
  stop(): void {
    Eif (!this._started) return;
    this._started = false;
    clearInterval(this._heartbeatTimer);
    clearInterval(this._gcTimer);
    clearTimeout(this._throttleTimer);
    this._heartbeatTimer = undefined;
    this._gcTimer = undefined;
    this._throttleTimer = undefined;
  }
 
  // ---------------------------------------------------------- remote presence
 
  /** LWW by `seq` per `processId` — stale updates are silently ignored. */
  receive(incoming: UserPresence<Meta>): void {
    const processId: string = incoming[UserPresenceIdx.ProcessId];
    const incomingSeq: number = incoming[UserPresenceIdx.Seq];
    const existing = this.peers[processId];
    if (existing && existing[0][UserPresenceIdx.Seq] >= incomingSeq) return;
    this.peers[processId] = [incoming, Date.now()];
    this.onChange.emit(new PresenceEvent(existing ? [] : [processId], existing ? [processId] : [], []));
  }
 
  get(processId: string): UserPresence<Meta> | undefined {
    return this.peers[processId]?.[0];
  }
 
  size(): number {
    return Object.keys(this.peers).length;
  }
 
  /** Remove peers whose `receivedAt` exceeds `timeout`. */
  removeOutdated(timeout: number = this.timeout): string[] {
    const now = Date.now();
    const removed: string[] = [];
    const peers = this.peers;
    for (const processId in peers) {
      if (now - peers[processId][1] > timeout) {
        delete peers[processId];
        removed.push(processId);
      }
    }
    if (removed.length) this.onChange.emit(new PresenceEvent([], [], removed));
    return removed;
  }
 
  merge(snapshot: UserPresence<Meta>[]): void {
    for (const incoming of snapshot) this.receive(incoming);
  }
 
  remove(processId: string): boolean {
    if (!(processId in this.peers)) return false;
    delete this.peers[processId];
    this.onChange.emit(new PresenceEvent([], [], [processId]));
    return true;
  }
 
  destroy(): void {
    this.stop();
    const removed = Object.keys(this.peers);
    this.peers = {};
    this._resolved.clear();
    if (removed.length) this.onChange.emit(new PresenceEvent([], [], removed));
  }
 
  // --------------------------------------------------------------- resolution
 
  /**
   * Per-slot resolution state, keyed by `${processId}\0${documentId}\0${uiLocationId}`.
   * Survives across `resolve()` calls.
   */
  private _resolved: Map<string, ResolvedSelection> = new Map();
 
  /**
   * Reconcile remote presence against the given `model`. For every peer whose
   * selection targets `documentId`, the manager:
   *
   *   1. Updates `desired` if the incoming presence is newer than what we
   *      previously processed for this `(peer, documentId, uiLocationId)`
   *      (LWW by `peer.seq`).
   *   2. Attempts to resolve `desired` against `model`. On success the
   *      selection is promoted to `displayed` and `desired` is cleared.
   *      On failure (anchor characters not yet received) `desired` stays
   *      pending and `displayed` continues to be the source of truth for
   *      rendering.
   *
   * This method *mutates* the manager's internal resolver state and returns
   * a grouping of the current resolved selections by `processId`. Editors
   * should read only `displayed` to render — `desired` is internal bookkeeping.
   *
   * The integration must invoke `resolve()` both when remote presence arrives
   * and when the model changes.
   */
  public resolve(documentId: string, model: Model<any>): Map<string, ResolvedSelection[]> {
    const localProcessId = this.local[UserPresenceIdx.ProcessId] as string;
    const resolved = this._resolved;
    const out = new Map<string, ResolvedSelection[]>();
    for (const processId in this.peers) {
      if (processId === localProcessId) continue;
      const entry = this.peers[processId];
      const presence = entry[0];
      const peerSeq = presence[UserPresenceIdx.Seq] as number;
      const selections = presence[UserPresenceIdx.Selections] as JsonCrdtSelection[] | undefined;
      Iif (!selections || !selections.length) continue;
      let bucket: ResolvedSelection[] | undefined;
      for (const sel of selections) {
        if (sel[0] !== documentId) continue;
        const uiLocationId = sel[1] as string;
        const key = `${processId}\0${documentId}\0${uiLocationId}`;
        let rs = resolved.get(key);
        if (!rs) {
          rs = new ResolvedSelection();
          resolved.set(key, rs);
        }
        // LWW: only accept this incoming sel as `desired` if newer than what
        // we previously processed for this slot pair.
        if (peerSeq > rs.seq) {
          rs.desired = sel;
          rs.seq = peerSeq;
        }
        if (rs.desired && this._canResolve(rs.desired, model)) {
          rs.displayed = rs.desired;
          rs.desired = null;
        }
        Eif (!bucket) {
          bucket = [];
          out.set(processId, bucket);
        }
        bucket.push(rs);
      }
    }
    const peers = this.peers;
    for (const key of resolved.keys()) {
      const sep = key.indexOf('\0');
      const processId = key.slice(0, sep);
      if (!(processId in peers)) resolved.delete(key);
    }
    return out;
  }
 
  /**
   * Returns `true` when the selection's node and all anchor IDs exist in the
   * model — i.e. the local replica has received the characters the cursor
   * points at. RGA selections (`str`, `bin`, `arr`) additionally check each
   * anchor/focus point via `findById`.
   */
  private _canResolve(sel: JsonCrdtSelection, model: Model<any>): boolean {
    const senderSid = sel[2] as number;
    const nodeIdDto = sel[6] as PresenceIdShorthand;
    const nodeId: ITimestampStruct = id.fromDto(senderSid, nodeIdDto);
    const node = model.index.get(nodeId);
    Iif (!node) return false;
    const type = sel[5] as JsonCrdtDataType;
    Eif (type === NodeType.str || type === NodeType.bin || type === NodeType.arr) {
      const cursors = (sel as RgaSelection)[7];
      Iif (!cursors) return true;
      const rga = node as unknown as {findById: (ts: ITimestampStruct) => unknown};
      Iif (typeof rga.findById !== 'function') return true;
      const strNodeId = node.id;
      for (const cursor of cursors) {
        for (const point of cursor) {
          Iif (!point) continue;
          const pointId = id.fromDto(senderSid, point[0]);
          Iif (pointId.sid === strNodeId.sid && pointId.time === strNodeId.time) continue;
          if (!rga.findById(pointId)) return false;
        }
      }
    }
    return true;
  }
 
  public clearResolved(): void {
    this._resolved.clear();
  }
 
  // ----------------------------------------------------------- local presence
 
  setUserId(userId: string): void {
    this.local[UserPresenceIdx.UserId] = userId;
  }
 
  getUserId(): string {
    return this.local[UserPresenceIdx.UserId];
  }
 
  setProcessId(processId: string): void {
    this.local[UserPresenceIdx.ProcessId] = processId;
  }
 
  getProcessId(): string {
    return this.local[UserPresenceIdx.ProcessId];
  }
 
  setMeta(meta: Meta): void {
    this.local[UserPresenceIdx.Meta] = meta;
    this._schedulePush();
  }
 
  getMeta(): Meta {
    return this.local[UserPresenceIdx.Meta] as Meta;
  }
 
  setSelections(selections: JsonCrdtSelection[]): void {
    this.local[UserPresenceIdx.Seq]++;
    this.local[UserPresenceIdx.Ts] = Math.floor(Date.now() / 1000);
    this.local[UserPresenceIdx.Selections] = selections;
    this._schedulePush();
  }
 
  getSelections(): JsonCrdtSelection[] {
    return (this.local[UserPresenceIdx.Selections] as JsonCrdtSelection[]) || [];
  }
 
  clearSelections(): void {
    this.setSelections([]);
  }
 
  // --------------------------------------------------------------------- push
 
  /**
   * Push local presence immediately if it has changed since the last push,
   * or unconditionally when `force` is true (used by heartbeat).
   */
  private _push(force?: boolean): void {
    const seq = this.local[UserPresenceIdx.Seq] as number;
    if (!force && seq === this._pushedSeq) return;
    this._pushedSeq = seq;
    this.onpush?.(this.local);
  }
 
  /** Schedule a throttled push (coalesces rapid local changes). */
  private _schedulePush(): void {
    if (!this._started || this._throttleTimer) return;
    this._throttleTimer = setTimeout(() => {
      this._throttleTimer = undefined;
      this._push();
    }, this.throttleMs);
  }
}