All files / src/fsa-to-node FsaNodeReadStream.ts

82.75% Statements 48/58
53.84% Branches 7/13
66.66% Functions 10/15
90% Lines 45/50

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 971x 1x 1x           1x 8x 8x 8x 8x 8x     8x 8x 8x 8x   8x 8x   8x 8x 8x 8x           8x         6x 6x 6x 6x 6x 6x 6x 6x 6x 4x 4x         6x 6x 6x 6x   6x 6x   6x                                     6x   6x 6x 6x 6x 6x 6x                  
import { Readable } from '../vendor/node/stream';
import { Defer } from 'thingies/lib/Defer';
import { concurrency } from 'thingies/lib/concurrency';
import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import type { IReadStream } from '../node/types/misc';
import type { IReadStreamOptions } from '../node/types/options';
import type { FsaNodeFs } from './FsaNodeFs';
 
export class FsaNodeReadStream extends Readable implements IReadStream {
  protected __pending__: boolean = true;
  protected __closed__: boolean = false;
  protected __bytes__: number = 0;
  protected readonly __mutex__ = concurrency(1);
  protected readonly __file__ = new Defer<FsaNodeFsOpenFile>();
 
  public constructor(
    protected readonly fs: FsaNodeFs,
    protected readonly handle: Promise<FsaNodeFsOpenFile>,
    public readonly path: string,
    protected readonly options: IReadStreamOptions,
  ) {
    super();
    handle
      .then(file => {
        Iif (this.__closed__) return;
        this.__file__.resolve(file);
        if (this.options.fd !== undefined) this.emit('open', file.fd);
        this.emit('ready');
      })
      .catch(error => {
        this.__file__.reject(error);
      })
      .finally(() => {
        this.__pending__ = false;
      });
  }
 
  private async __read__(): Promise<Uint8Array | undefined> {
    return await this.__mutex__<Uint8Array | undefined>(async () => {
      Iif (this.__closed__) return;
      const { file } = await this.__file__.promise;
      const blob = await file.getFile();
      const buffer = await blob.arrayBuffer();
      const start = this.options.start || 0;
      let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength;
      if (end > buffer.byteLength) end = buffer.byteLength;
      if (start >= buffer.byteLength) return new Uint8Array(0);
      const uint8 = new Uint8Array(buffer, start, end - start);
      return uint8;
    });
  }
 
  private __close__(): void {
    Iif (this.__closed__) return;
    this.__closed__ = true;
    if (this.options.autoClose) {
      this.__file__.promise
        .then(file => {
          this.fs.close(file.fd, () => {
            this.emit('close');
          });
          return file.close();
        })
        .catch(error => {});
    }
  }
 
  // -------------------------------------------------------------- IReadStream
 
  public get bytesRead(): number {
    return this.__bytes__;
  }
 
  public get pending(): boolean {
    return this.__pending__;
  }
 
  // ----------------------------------------------------------------- Readable
 
  _read() {
    this.__read__().then(
      (uint8: Uint8Array) => {
        Iif (this.__closed__) return;
        Iif (!uint8) return this.push(null);
        this.__bytes__ += uint8.length;
        this.__close__();
        this.push(uint8);
        this.push(null);
      },
      error => {
        this.__close__();
        this.destroy(error);
      },
    );
  }
}