diff --git a/.changeset/curvy-showers-count.md b/.changeset/curvy-showers-count.md new file mode 100644 index 000000000..4a61eddb3 --- /dev/null +++ b/.changeset/curvy-showers-count.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Create a copy of the data when resolving a stream to rpevent detachment diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index 0ad2e1d86..0745ba938 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -318,6 +318,62 @@ describe('streamer', () => { expect(chunks.join('')).toBe('123'); }); + + it('should read from stream starting at specified startIndex', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'startindex-stream'; + + // Write 5 chunks with delays to ensure different ULID timestamps + for (let i = 0; i < 5; i++) { + await streamer.writeToStream(streamName, TEST_RUN_ID, `${i}`); + await new Promise((resolve) => setTimeout(resolve, 2)); + } + await streamer.closeStream(streamName, TEST_RUN_ID); + + // Read from startIndex=2 (should get chunks 2, 3, 4) + const stream = await streamer.readFromStream(streamName, 2); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('234'); + }); + + it('should read from stream starting from first chunk with startIndex=0', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'startindex-zero-stream'; + + await streamer.writeToStream(streamName, TEST_RUN_ID, 'first'); + await new Promise((resolve) => setTimeout(resolve, 2)); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'second'); + await streamer.closeStream(streamName, TEST_RUN_ID); + + // Read from startIndex=0 (should get all chunks) + const stream = await streamer.readFromStream(streamName, 0); + const reader = stream.getReader(); + + const chunks: string[] = []; + let done = false; + + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + expect(chunks.join('')).toBe('firstsecond'); + }); }); describe('integration scenarios', () => { diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index 9667b6afe..e58df6a08 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -182,7 +182,7 @@ export function createStreamer(basedir: string): Streamer { break; } if (chunk.chunk.byteLength) { - controller.enqueue(chunk.chunk); + controller.enqueue(new Uint8Array(chunk.chunk)); } }