All files / src/fsa-to-node FsaNodeReadStream.ts

82.14% Statements 46/56
50% Branches 6/12
66.66% Functions 10/15
89.79% Lines 44/49

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 962x 2x 2x           2x 6x 6x 6x 6x 6x     6x 6x 6x 6x   6x 6x   6x 6x 6x 6x           6x         4x 4x 4x 4x 4x 4x 4x 4x 4x 4x         4x 4x 4x 4x   4x 4x   4x                                     4x   4x 4x 4x 4x 4x 4x                  
import { Readable } from 'stream';
import { Defer } from '../thingies/Defer';
import { concurrency } from '../thingies/concurrency';
import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import type { IReadStream } from '../node/types/misc';
import type { IReadStreamOptions } from '../node/types/options';
import type { FsaNodeFs } from './FsaNodeFs';
 
export class FsaNodeReadStream extends Readable implements IReadStream {
  protected __pending__: boolean = true;
  protected __closed__: boolean = false;
  protected __bytes__: number = 0;
  protected readonly __mutex__ = concurrency(1);
  protected readonly __file__ = new Defer<FsaNodeFsOpenFile>();
 
  public constructor(
    protected readonly fs: FsaNodeFs,
    protected readonly handle: Promise<FsaNodeFsOpenFile>,
    public readonly path: string,
    protected readonly options: IReadStreamOptions,
  ) {
    super();
    handle
      .then(file => {
        Iif (this.__closed__) return;
        this.__file__.resolve(file);
        if (this.options.fd !== undefined) this.emit('open', file.fd);
        this.emit('ready');
      })
      .catch(error => {
        this.__file__.reject(error);
      })
      .finally(() => {
        this.__pending__ = false;
      });
  }
 
  private async __read__(): Promise<Uint8Array | undefined> {
    return await this.__mutex__<Uint8Array | undefined>(async () => {
      Iif (this.__closed__) return;
      const { file } = await this.__file__.promise;
      const blob = await file.getFile();
      const buffer = await blob.arrayBuffer();
      const start = this.options.start || 0;
      let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength;
      if (end > buffer.byteLength) end = buffer.byteLength;
      const uint8 = new Uint8Array(buffer, start, end - start);
      return uint8;
    });
  }
 
  private __close__(): void {
    Iif (this.__closed__) return;
    this.__closed__ = true;
    if (this.options.autoClose) {
      this.__file__.promise
        .then(file => {
          this.fs.close(file.fd, () => {
            this.emit('close');
          });
          return file.close();
        })
        .catch(error => {});
    }
  }
 
  // -------------------------------------------------------------- IReadStream
 
  public get bytesRead(): number {
    return this.__bytes__;
  }
 
  public get pending(): boolean {
    return this.__pending__;
  }
 
  // ----------------------------------------------------------------- Readable
 
  _read() {
    this.__read__().then(
      (uint8: Uint8Array) => {
        Iif (this.__closed__) return;
        Iif (!uint8) return this.push(null);
        this.__bytes__ += uint8.length;
        this.__close__();
        this.push(uint8);
        this.push(null);
      },
      error => {
        this.__close__();
        this.destroy(error);
      },
    );
  }
}