Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/curvy-showers-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-local": patch
---

Create a copy of the data when resolving a stream to rpevent detachment
56 changes: 56 additions & 0 deletions packages/world-local/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,62 @@ describe('streamer', () => {

expect(chunks.join('')).toBe('123');
});

it('should read from stream starting at specified startIndex', async () => {
const { streamer } = await setupStreamer();
const streamName = 'startindex-stream';

// Write 5 chunks with delays to ensure different ULID timestamps
for (let i = 0; i < 5; i++) {
await streamer.writeToStream(streamName, TEST_RUN_ID, `${i}`);
await new Promise((resolve) => setTimeout(resolve, 2));
}
await streamer.closeStream(streamName, TEST_RUN_ID);

// Read from startIndex=2 (should get chunks 2, 3, 4)
const stream = await streamer.readFromStream(streamName, 2);
const reader = stream.getReader();

const chunks: string[] = [];
let done = false;

while (!done) {
const result = await reader.read();
done = result.done;
if (result.value) {
chunks.push(Buffer.from(result.value).toString());
}
}

expect(chunks.join('')).toBe('234');
});

it('should read from stream starting from first chunk with startIndex=0', async () => {
const { streamer } = await setupStreamer();
const streamName = 'startindex-zero-stream';

await streamer.writeToStream(streamName, TEST_RUN_ID, 'first');
await new Promise((resolve) => setTimeout(resolve, 2));
await streamer.writeToStream(streamName, TEST_RUN_ID, 'second');
await streamer.closeStream(streamName, TEST_RUN_ID);

// Read from startIndex=0 (should get all chunks)
const stream = await streamer.readFromStream(streamName, 0);
const reader = stream.getReader();

const chunks: string[] = [];
let done = false;

while (!done) {
const result = await reader.read();
done = result.done;
if (result.value) {
chunks.push(Buffer.from(result.value).toString());
}
}

expect(chunks.join('')).toBe('firstsecond');
});
});

describe('integration scenarios', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/world-local/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export function createStreamer(basedir: string): Streamer {
break;
}
if (chunk.chunk.byteLength) {
controller.enqueue(chunk.chunk);
controller.enqueue(new Uint8Array(chunk.chunk));
}
}

Expand Down
Loading