All files / json-crdt-server/src/services/blocks/store MemoryStore.ts

81.63% Statements 120/147
64.91% Branches 37/57
85% Functions 17/20
89.34% Lines 109/122

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 2066x 6x     6x   6x   36x 36x 36x       6x 43x     531x 531x 531x 513x             57x 57x 57x 57x 57x 57x 57x 57x 7720x 7720x 7720x 7720x 7715x   57x       1x 1x       156x 156x               37x 37x 37x 36x 36x 36x 36x 36x 27x 27x 27x         27x 27x 27x   9x             478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x 478x           478x 478x       90x 90x 90x 90x 90x 90x   90x 181x 181x 181x 181x 91x     90x 90x 90x 90x       109x 109x 109x 109x 109x 109x 109x 15357x 15357x 15357x 15354x   109x       11x 11x       11x       1x                                             2x 2x 2x 9x 9x 3x 3x   6x 6x           2x 2x 3x 3x          
import {RpcError} from '@jsonjoy.com/rpc-error';
import {AvlMap} from 'sonic-forest/lib/avl/AvlMap';
import type * as types from './types';
 
const tick = new Promise((resolve) => setImmediate(resolve));
 
export class MemoryBlock {
  constructor(
    public readonly start: types.StoreSnapshot,
    public readonly data: types.StoreBlock,
    public readonly history: types.StoreBatch[],
  ) {}
}
 
export class MemoryStore implements types.Store {
  protected readonly blocks = new Map<string, MemoryBlock>();
 
  public async get(id: string): Promise<types.StoreGetResult | undefined> {
    await tick;
    const block = this.blocks.get(id);
    if (!block) return;
    return {block: block.data};
  }
 
  public async getSnapshot(
    id: string,
    seq: number,
  ): Promise<{snapshot: types.StoreSnapshot; batches: types.StoreBatch[]}> {
    await tick;
    const block = this.blocks.get(id);
    Iif (!block) throw RpcError.notFound();
    const snapshot = block.start;
    const history = block.history;
    const length = history.length;
    const batches: types.StoreBatch[] = [];
    for (let i = 0; i < length; i++) {
      const batch = history[i];
      const seq2 = batch.seq;
      Iif (seq2 <= snapshot.seq) continue;
      if (seq2 > seq) break;
      batches.push(batch);
    }
    return {snapshot, batches};
  }
 
  public async exists(id: string): Promise<boolean> {
    await tick;
    return this.blocks.has(id);
  }
 
  public async seq(id: string): Promise<number | undefined> {
    await tick;
    return this.blocks.get(id)?.data.snapshot.seq;
  }
 
  public async create(
    start: types.StoreSnapshot,
    end: types.StoreSnapshot,
    batch?: types.StoreIncomingBatch,
  ): Promise<types.StoreCreateResult> {
    const {id} = end;
    await tick;
    if (this.blocks.has(id)) throw RpcError.conflict();
    const now = end.ts;
    const data = {id, snapshot: end, tip: [], ts: now, uts: now};
    const block = new MemoryBlock(start, data, []);
    this.blocks.set(id, block);
    if (batch) {
      const {cts, patches} = batch;
      Iif (!Array.isArray(patches)) throw new Error('NO_PATCHES');
      const batch2: types.StoreBatch = {
        seq: 0,
        ts: end.ts,
        patches,
      };
      Iif (cts !== void 0) batch2.cts = cts;
      block.history.push(batch2);
      return {block: block.data, batch: batch2};
    }
    return {block: block.data};
  }
 
  public async push(
    snapshot0: types.StoreIncomingSnapshot,
    batch0: types.StoreIncomingBatch,
  ): Promise<types.StorePushResult> {
    const {id, seq} = snapshot0;
    const {patches} = batch0;
    await tick;
    Iif (!Array.isArray(patches) || !patches.length) throw new Error('NO_PATCHES');
    const block = this.blocks.get(id);
    Iif (!block) throw RpcError.notFound();
    const blockData = block.data;
    const snapshot = blockData.snapshot;
    Iif (snapshot.seq + 1 !== seq) throw new Error('PATCH_SEQ_INV');
    const history = block.history;
    const now = Date.now();
    blockData.uts = now;
    snapshot.seq = seq;
    snapshot.ts = now;
    snapshot.blob = snapshot0.blob;
    const batch1: types.StoreBatch = {
      seq,
      ts: now,
      ...(batch0.cts !== undefined ? {cts: batch0.cts} : undefined),
      patches,
    };
    history.push(batch1);
    return {snapshot, batch: batch1};
  }
 
  public async compact(id: string, to: number, advance: types.Advance): Promise<void> {
    const block = this.blocks.get(id);
    Iif (!block) throw RpcError.notFound();
    const start = block.start;
    const batches = block.history;
    const length = batches.length;
    let i = 0;
    async function* iterator() {
      for (; i < length; i++) {
        const batch = batches[i];
        const seq = batch.seq;
        Iif (seq <= start.seq) continue;
        if (seq > to) break;
        yield batch;
      }
    }
    start.blob = await advance(start.blob, iterator());
    start.ts = Date.now();
    start.seq = to;
    batches.splice(0, i);
  }
 
  public async scan(id: string, min: number, max: number): Promise<types.StoreBatch[]> {
    await tick;
    const block = this.blocks.get(id);
    Iif (!block) return [];
    const history = block.history;
    const length = history.length;
    const list: types.StoreBatch[] = [];
    for (let i = 0; i < length; i++) {
      const batch = history[i];
      const seq = batch.seq;
      if (seq > max) break;
      if (seq >= min && seq <= max) list.push(batch);
    }
    return list;
  }
 
  public async remove(id: string): Promise<boolean> {
    await tick;
    return this.removeSync(id);
  }
 
  private removeSync(id: string): boolean {
    return this.blocks.delete(id);
  }
 
  public stats(): {blocks: number; batches: number} {
    return {
      blocks: this.blocks.size,
      batches: [...this.blocks.values()].reduce((acc, v) => acc + v.history.length, 0),
    };
  }
 
  public async removeOlderThan(ts: number): Promise<void> {
    await tick;
    for (const [id, block] of this.blocks) if (block.data.ts < ts) this.removeSync(id);
  }
 
  public async removeAccessedBefore(ts: number, limit = 10): Promise<void> {
    await tick;
    let cnt = 0;
    for (const [id, block] of this.blocks)
      if (block.data.uts < ts) {
        this.removeSync(id);
        cnt++;
        if (cnt >= limit) return;
      }
  }
 
  public async removeOldest(x: number): Promise<void> {
    const heap = new AvlMap<number, string>((a, b) => b - a);
    let first = heap.first();
    for await (const [id, block] of this.blocks.entries()) {
      const time = block.data.uts;
      if (heap.size() < x) {
        heap.set(time, id);
        continue;
      }
      if (!first) first = heap.first();
      Iif (first && time < first.k) {
        heap.del(first.k);
        first = undefined;
        heap.set(time, id);
      }
    }
    Iif (!heap.size()) return;
    for (const {v} of heap.entries()) {
      try {
        await this.remove(v);
      } catch {}
    }
  }
}