All files / src/fsa-to-node/worker SyncMessenger.ts

5.12% Statements 2/39
0% Branches 0/3
0% Functions 0/7
5.55% Lines 2/36

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74            1x                                         1x                                                                                            
export type AsyncCallback = (request: Uint8Array) => Promise<Uint8Array>;
 
/**
 * @param condition Condition to wait for, when true, the function returns.
 * @param ms Maximum time to wait in milliseconds.
 */
const sleepUntil = (condition: () => boolean, ms: number = 100) => {
  const start = Date.now();
  while (!condition()) {
    const now = Date.now();
    Iif (now - start > ms) throw new Error('Timeout');
  }
};
 
/**
 * `SyncMessenger` allows to execute asynchronous code synchronously. The
 * asynchronous code is executed in a Worker thread, while the main thread is
 * blocked until the asynchronous code is finished.
 *
 * First four 4-byte words is the header, where the first word is used for Atomics
 * notifications. The second word is used for spin-locking the main thread until
 * the asynchronous code is finished. The third word is used to specify payload
 * length. The fourth word is currently unused.
 *
 * The maximum payload size is the size of the SharedArrayBuffer minus the
 * header size.
 */
export class SyncMessenger {
  protected readonly int32: Int32Array;
  protected readonly uint8: Uint8Array;
  protected readonly headerSize;
  protected readonly dataSize;
 
  public constructor(protected readonly sab: SharedArrayBuffer) {
    this.int32 = new Int32Array(sab);
    this.uint8 = new Uint8Array(sab);
    this.headerSize = 4 * 4;
    this.dataSize = sab.byteLength - this.headerSize;
  }
 
  public callSync(data: Uint8Array): Uint8Array {
    const requestLength = data.length;
    const headerSize = this.headerSize;
    const int32 = this.int32;
    int32[1] = 0;
    int32[2] = requestLength;
    this.uint8.set(data, headerSize);
    Atomics.notify(int32, 0);
    sleepUntil(() => int32[1] === 1);
    const responseLength = int32[2];
    const response = this.uint8.slice(headerSize, headerSize + responseLength);
    return response;
  }
 
  public serveAsync(callback: AsyncCallback): void {
    const headerSize = this.headerSize;
    (async () => {
      try {
        const int32 = this.int32;
        const res = Atomics.wait(int32, 0, 0);
        Iif (res !== 'ok') throw new Error(`Unexpected Atomics.wait result: ${res}`);
        const requestLength = this.int32[2];
        const request = this.uint8.slice(headerSize, headerSize + requestLength);
        const response = await callback(request);
        const responseLength = response.length;
        int32[2] = responseLength;
        this.uint8.set(response, headerSize);
        int32[1] = 1;
      } catch {}
      this.serveAsync(callback);
    })().catch(() => {});
  }
}