Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | 2x 2x 2x 2x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x | import { Readable } from 'stream'; import { Defer } from '../thingies/Defer'; import { concurrency } from '../thingies/concurrency'; import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile'; import type { IReadStream } from '../node/types/misc'; import type { IReadStreamOptions } from '../node/types/options'; import type { FsaNodeFs } from './FsaNodeFs'; export class FsaNodeReadStream extends Readable implements IReadStream { protected __pending__: boolean = true; protected __closed__: boolean = false; protected __bytes__: number = 0; protected readonly __mutex__ = concurrency(1); protected readonly __file__ = new Defer<FsaNodeFsOpenFile>(); public constructor( protected readonly fs: FsaNodeFs, protected readonly handle: Promise<FsaNodeFsOpenFile>, public readonly path: string, protected readonly options: IReadStreamOptions, ) { super(); handle .then(file => { Iif (this.__closed__) return; this.__file__.resolve(file); if (this.options.fd !== undefined) this.emit('open', file.fd); this.emit('ready'); }) .catch(error => { this.__file__.reject(error); }) .finally(() => { this.__pending__ = false; }); } private async __read__(): Promise<Uint8Array | undefined> { return await this.__mutex__<Uint8Array | undefined>(async () => { Iif (this.__closed__) return; const { file } = await this.__file__.promise; const blob = await file.getFile(); const buffer = await blob.arrayBuffer(); const start = this.options.start || 0; let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength; if (end > buffer.byteLength) end = buffer.byteLength; const uint8 = new Uint8Array(buffer, start, end - start); return uint8; }); } private __close__(): void { Iif (this.__closed__) return; this.__closed__ = true; if (this.options.autoClose) { this.__file__.promise .then(file => { this.fs.close(file.fd, () => { this.emit('close'); }); return file.close(); }) .catch(error => {}); } } // -------------------------------------------------------------- IReadStream public get bytesRead(): number { return this.__bytes__; } public get pending(): boolean { return this.__pending__; } // ----------------------------------------------------------------- Readable _read() { this.__read__().then( (uint8: Uint8Array) => { Iif (this.__closed__) return; Iif (!uint8) return this.push(null); this.__bytes__ += uint8.length; this.__close__(); this.push(uint8); this.push(null); }, error => { this.__close__(); this.destroy(error); }, ); } } |