diff --git a/src/common/persistence/FSExtentStore.ts b/src/common/persistence/FSExtentStore.ts index 199a8012d..6f3ba45ab 100644 --- a/src/common/persistence/FSExtentStore.ts +++ b/src/common/persistence/FSExtentStore.ts @@ -1,6 +1,5 @@ import { close, - createReadStream, createWriteStream, fdatasync, mkdir, @@ -30,6 +29,7 @@ import IExtentStore, { } from "./IExtentStore"; import IOperationQueue from "./IOperationQueue"; import OperationQueue from "./OperationQueue"; +import FileLazyReadStream from "./FileLazyReadStream"; const statAsync = promisify(stat); const mkdirAsync = promisify(mkdir); @@ -76,6 +76,10 @@ export default class FSExtentStore implements IExtentStore { private persistencyPath: Map; + private circularStreamsBuffer: FileLazyReadStream[]; + + private maxStreams = 100000; + public constructor( metadata: IExtentMetadataStore, private readonly persistencyConfiguration: StoreDestinationArray, @@ -104,6 +108,8 @@ export default class FSExtentStore implements IExtentStore { logger ); this.readQueue = new OperationQueue(DEFAULT_READ_CONCURRENCY, logger); + + this.circularStreamsBuffer = []; } public isInitialized(): boolean { @@ -340,19 +346,23 @@ export default class FSExtentStore implements IExtentStore { } end:${extentChunk.offset + extentChunk.count - 1}`, contextId ); - const stream = createReadStream(path, { - start: extentChunk.offset, - end: extentChunk.offset + extentChunk.count - 1 - }).on("close", () => { - this.logger.verbose( - `FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${ - extentChunk.id - } path:${path} offset:${extentChunk.offset} count:${ - extentChunk.count - } end:${extentChunk.offset + extentChunk.count - 1}`, - contextId - ); - }); + + if(this.circularStreamsBuffer.length >= this.maxStreams){ + let streamToDestroy = this.circularStreamsBuffer.shift(); + streamToDestroy?.destroy(); + } + + const stream = new FileLazyReadStream( + path, + extentChunk.offset, + extentChunk.offset + extentChunk.count - 1, + this.logger, + persistencyId, + extentChunk.id, + contextId); + + this.circularStreamsBuffer.push(stream); + resolve(stream); }); diff --git a/src/common/persistence/FileLazyReadStream.ts b/src/common/persistence/FileLazyReadStream.ts new file mode 100644 index 000000000..687d6eb84 --- /dev/null +++ b/src/common/persistence/FileLazyReadStream.ts @@ -0,0 +1,72 @@ +import { ReadStream, createReadStream } from "fs"; +import { Readable } from "stream"; +import ILogger from "../ILogger"; + + +export default class FileLazyReadStream extends Readable { + private extentStream: ReadStream | undefined; + constructor( + private readonly extentPath: string, + private readonly start: number, + private readonly end: number, + private readonly logger: ILogger, + private readonly persistencyId: string, + private readonly extentId: string, + private readonly contextId?: string) { + super(); + } + + public _read(): void { + if (this.extentStream === undefined) { + this.extentStream = createReadStream(this.extentPath, { + start: this.start, + end: this.end + }).on("close", () => { + this.logger.verbose( + `FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId + } path:${this.extentPath} offset:${this.start} end:${this.end}`, + this.contextId + ); + }); + this.setSourceEventHandlers(); + } + this.extentStream?.resume(); + } + + private setSourceEventHandlers() { + this.extentStream?.on("data", this.sourceDataHandler); + this.extentStream?.on("end", this.sourceErrorOrEndHandler); + this.extentStream?.on("error", this.sourceErrorOrEndHandler); + } + + private removeSourceEventHandlers() { + this.extentStream?.removeListener("data", this.sourceDataHandler); + this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler); + this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler); + } + + private sourceDataHandler = (data: Buffer) => { + if (!this.push(data)) { + this.extentStream?.pause(); + } + } + + private sourceErrorOrEndHandler = (err?: Error) => { + if (err && err.name === "AbortError") { + this.destroy(err); + return; + } + + this.removeSourceEventHandlers(); + this.push(null); + this.destroy(err); + } + + _destroy(error: Error | null, callback: (error?: Error) => void): void { + // remove listener from source and release source + //this.removeSourceEventHandlers(); + (this.extentStream as Readable)?.destroy(); + + callback(error === null ? undefined : error); + } +} \ No newline at end of file