All files / src/fsa-to-node FsaNodeWriteStream.ts

77.21% Statements 61/79
68.96% Branches 20/29
63.63% Functions 14/22
83.07% Lines 54/65

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 1532x 2x 2x 2x     2x                                                   2x 14x 14x 14x   14x       14x 14x   14x 14x 6x 1x   5x 1x     12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 5x 5x   12x 12x             17x 17x   17x 17x 19x 19x           12x 12x 12x       12x 12x 12x 12x 12x                     6x                             15x   15x               4x 2x   2x               12x   12x              
import { Writable } from 'stream';
import { Defer } from '../thingies/Defer';
import { concurrency } from '../thingies/concurrency';
import { flagsToNumber } from '../node/util';
import { FLAG } from '../consts/FLAG';
import { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import queueMicrotask from '../queueMicrotask';
import type { IFileSystemWritableFileStream } from '../fsa/types';
import type { IWriteStream } from '../node/types/misc';
import type { IWriteStreamOptions } from '../node/types/options';
 
/**
 * This WriteStream implementation does not build on top of the `fs` module,
 * but instead uses the lower-level `FileSystemFileHandle` interface. The reason
 * is the different semantics in `fs` and FSA (File System Access API) write streams.
 *
 * When data is written to an FSA file, a new FSA stream is created, it copies
 * the file to a temporary swap file. After each written chunk, that swap file
 * is closed and the original file is replaced with the swap file. This means,
 * if WriteStream was built on top of `fs`, each chunk write would result in
 * a file copy, write, close, rename operations, which is not what we want.
 *
 * Instead this implementation hooks into the lower-level and closes the swap
 * file only once the stream is closed. The downside is that the written data
 * is not immediately visible to other processes (because it is written to the
 * swap file), but that is the trade-off we have to make.
 *
 * @todo Could make this flush the data to the original file periodically, so that
 *       the data is visible to other processes.
 * @todo This stream could work through `FileSystemSyncAccessHandle.write` in a
 *       Worker thread instead.
 */
export class FsaNodeWriteStream extends Writable implements IWriteStream {
  protected __pending__: boolean = true;
  protected __closed__: boolean = false;
  protected __bytes__: number = 0;
  protected readonly __stream__: Promise<IFileSystemWritableFileStream>;
  protected readonly __mutex__ = concurrency(1);
 
  public constructor(
    handle: Promise<FsaNodeFsOpenFile>,
    public readonly path: string,
    protected readonly options: IWriteStreamOptions,
  ) {
    super();
    if (options.start !== undefined) {
      if (typeof options.start !== 'number') {
        throw new TypeError('"start" option must be a Number');
      }
      if (options.start < 0) {
        throw new TypeError('"start" must be >= zero');
      }
    }
    const stream = new Defer<IFileSystemWritableFileStream>();
    this.__stream__ = stream.promise;
    (async () => {
      const fsaHandle = await handle;
      const fileWasOpened = !options.fd;
      if (fileWasOpened) this.emit('open', fsaHandle.fd);
      const flags = flagsToNumber(options.flags ?? 'w');
      const keepExistingData = flags & FLAG.O_APPEND ? true : false;
      const writable = await fsaHandle.file.createWritable({ keepExistingData });
      if (keepExistingData) {
        const start = Number(options.start ?? 0);
        if (start) await writable.seek(start);
      }
      this.__pending__ = false;
      stream.resolve(writable);
    })().catch(error => {
      stream.reject(error);
    });
  }
 
  private async ___write___(buffers: Buffer[]): Promise<void> {
    await this.__mutex__(async () => {
      Iif (this.__closed__) return;
      // if (this.__closed__) throw new Error('WriteStream is closed');
      const writable = await this.__stream__;
      for (const buffer of buffers) {
        await writable.write(buffer);
        this.__bytes__ += buffer.byteLength;
      }
    });
  }
 
  private async __close__(): Promise<void> {
    const emitClose = this.options.emitClose;
    await this.__mutex__(async () => {
      Iif (this.__closed__ && emitClose) {
        queueMicrotask(() => this.emit('close'));
        return;
      }
      try {
        const writable = await this.__stream__;
        this.__closed__ = true;
        await writable.close();
        if (emitClose) this.emit('close');
      } catch (error) {
        this.emit('error', error);
        Iif (emitClose) this.emit('close', error);
      }
    });
  }
 
  // ------------------------------------------------------------- IWriteStream
 
  public get bytesWritten(): number {
    return this.__bytes__;
  }
 
  public get pending(): boolean {
    return this.__pending__;
  }
 
  public close(cb): void {
    Iif (cb) this.once('close', cb);
    this.__close__().catch(() => {});
  }
 
  // ----------------------------------------------------------------- Writable
 
  _write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {
    this.___write___([chunk])
      .then(() => {
        if (callback) callback(null);
      })
      .catch(error => {
        Iif (callback) callback(error);
      });
  }
 
  _writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {
    const buffers = chunks.map(({ chunk }) => chunk);
    this.___write___(buffers)
      .then(() => {
        if (callback) callback(null);
      })
      .catch(error => {
        Iif (callback) callback(error);
      });
  }
 
  _final(callback: (error?: Error | null) => void): void {
    this.__close__()
      .then(() => {
        if (callback) callback(null);
      })
      .catch(error => {
        Iif (callback) callback(error);
      });
  }
}