Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | 1x 1x 1x 1x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 6x 6x 6x 6x 6x 6x 6x 6x 6x 4x 4x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x | import { Readable } from '../vendor/node/stream';
import { Defer } from 'thingies/lib/Defer';
import { concurrency } from 'thingies/lib/concurrency';
import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import type { IReadStream } from '../node/types/misc';
import type { IReadStreamOptions } from '../node/types/options';
import type { FsaNodeFs } from './FsaNodeFs';
export class FsaNodeReadStream extends Readable implements IReadStream {
protected __pending__: boolean = true;
protected __closed__: boolean = false;
protected __bytes__: number = 0;
protected readonly __mutex__ = concurrency(1);
protected readonly __file__ = new Defer<FsaNodeFsOpenFile>();
public constructor(
protected readonly fs: FsaNodeFs,
protected readonly handle: Promise<FsaNodeFsOpenFile>,
public readonly path: string,
protected readonly options: IReadStreamOptions,
) {
super();
handle
.then(file => {
Iif (this.__closed__) return;
this.__file__.resolve(file);
if (this.options.fd !== undefined) this.emit('open', file.fd);
this.emit('ready');
})
.catch(error => {
this.__file__.reject(error);
})
.finally(() => {
this.__pending__ = false;
});
}
private async __read__(): Promise<Uint8Array | undefined> {
return await this.__mutex__<Uint8Array | undefined>(async () => {
Iif (this.__closed__) return;
const { file } = await this.__file__.promise;
const blob = await file.getFile();
const buffer = await blob.arrayBuffer();
const start = this.options.start || 0;
let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength;
if (end > buffer.byteLength) end = buffer.byteLength;
if (start >= buffer.byteLength) return new Uint8Array(0);
const uint8 = new Uint8Array(buffer, start, end - start);
return uint8;
});
}
private __close__(): void {
Iif (this.__closed__) return;
this.__closed__ = true;
if (this.options.autoClose) {
this.__file__.promise
.then(file => {
this.fs.close(file.fd, () => {
this.emit('close');
});
return file.close();
})
.catch(error => {});
}
}
// -------------------------------------------------------------- IReadStream
public get bytesRead(): number {
return this.__bytes__;
}
public get pending(): boolean {
return this.__pending__;
}
// ----------------------------------------------------------------- Readable
_read() {
this.__read__().then(
(uint8: Uint8Array) => {
Iif (this.__closed__) return;
Iif (!uint8) return this.push(null);
this.__bytes__ += uint8.length;
this.__close__();
this.push(uint8);
this.push(null);
},
error => {
this.__close__();
this.destroy(error);
},
);
}
}
|