diff --git a/.changeset/kind-suns-wonder.md b/.changeset/kind-suns-wonder.md new file mode 100644 index 000000000..04d953bcd --- /dev/null +++ b/.changeset/kind-suns-wonder.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/sveltekit": patch +"@workflow/builders": patch +--- + +- export stepEntrypoint and workflowEntrypoint from build +- add abstract queue driver to world postgres +- add execution strategy to world postgres diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 9715d5e20..5024e91cb 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -320,11 +320,13 @@ export abstract class BaseBuilder { const entryContent = ` // Built in steps + import { stepEntrypoint } from 'workflow/runtime'; import '${builtInSteps}'; // User steps ${imports} // API entrypoint - export { stepEntrypoint as POST } from 'workflow/runtime';`; + export const __wkf_entrypoint = stepEntrypoint; + export const POST = stepEntrypoint;`; // Bundle with esbuild and our custom SWC plugin const esbuildCtx = await esbuild.context({ @@ -549,8 +551,8 @@ export abstract class BaseBuilder { import { workflowEntrypoint } from 'workflow/runtime'; const workflowCode = \`${workflowBundleCode.replace(/[\\`$]/g, '\\$&')}\`; - -export const POST = workflowEntrypoint(workflowCode);`; +export const __wkf_entrypoint = workflowEntrypoint(workflowCode); +export const POST = __wkf_entrypoint`; // we skip the final bundling step for Next.js so it can bundle itself if (!bundleFinalOutput) { diff --git a/packages/sveltekit/src/builder.ts b/packages/sveltekit/src/builder.ts index e26b568c5..6f02dec07 100644 --- a/packages/sveltekit/src/builder.ts +++ b/packages/sveltekit/src/builder.ts @@ -92,7 +92,7 @@ export class SvelteKitBuilder extends BaseBuilder { // Replace the default export with SvelteKit-compatible handler stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + /export\s*const\s*POST\s*=\s*stepEntrypoint\s*;$/m, `${SVELTEKIT_REQUEST_CONVERTER} export const POST = async ({request}) => { const normalRequest = await convertSvelteKitRequest(request); @@ -133,11 +133,11 @@ export const POST = async ({request}) => { // Replace the default export with SvelteKit-compatible handler workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + /export\s*const\s*POST\s*=\s*__wkf_entrypoint\s*;?$/m, `${SVELTEKIT_REQUEST_CONVERTER} export const POST = async ({request}) => { const normalRequest = await convertSvelteKitRequest(request); - return workflowEntrypoint(workflowCode)(normalRequest); + return __wkf_entrypoint(normalRequest); }` ); await writeFile(workflowsRouteFile, workflowsRouteContent); diff --git a/packages/world-postgres/DEV_NOTES.md b/packages/world-postgres/DEV_NOTES.md deleted file mode 100644 index 5efa73261..000000000 --- a/packages/world-postgres/DEV_NOTES.md +++ /dev/null @@ -1,7 +0,0 @@ -# Generate migrations - -The migrations are generated and managed by drizzle. When you perform schema changes you have to generate new migrations using the following command: - -``` -pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations -``` diff --git a/packages/world-postgres/HOW_IT_WORKS.md b/packages/world-postgres/HOW_IT_WORKS.md deleted file mode 100644 index a75c62dee..000000000 --- a/packages/world-postgres/HOW_IT_WORKS.md +++ /dev/null @@ -1,50 +0,0 @@ -# How PostgreSQL World Works - -This document explains the architecture and components of the PostgreSQL world implementation for workflow management. - -This implementation is using [Drizzle Schema](./src/drizzle/schema.ts) that can be pushed or migrated into your PostgreSQL schema and backed by Postgres.js. - -If you want to use any other ORM, query builder or underlying database client, you should be able to fork this implementation and replace the Drizzle parts with your own. - -## Job Queue System - -```mermaid -graph LR - Client --> PG[pg-boss queue] - PG --> Worker[Embedded Worker] - Worker --> HTTP[HTTP fetch] - HTTP --> EW[Embedded World] - - PG -.-> F["${prefix}flows
(workflows)"] - PG -.-> S["${prefix}steps
(steps)"] -``` - -Jobs include retry logic (3 attempts), idempotency keys, and configurable worker concurrency (default: 10). - -## Streaming - -Real-time data streaming via **PostgreSQL LISTEN/NOTIFY**: - -- Stream chunks stored in `workflow_stream_chunks` table -- `pg_notify` triggers sent on writes to `workflow_event_chunk` topic -- Subscribers receive notifications and fetch chunk data -- ULID-based ordering ensures correct sequence -- Single connection for listening to notifications, with an in-process EventEmitter for distributing events to multiple subscribers - -## Setup - -Call `world.start()` to initialize pg-boss workers. When `.start()` is called, workers begin listening to pg-boss queues. When a job arrives, workers make HTTP fetch calls to the embedded world endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) to execute the actual workflow logic. - -In **Next.js**, the `world.setup()` function needs to be added to `instrumentation.ts|js` to ensure workers start before request handling: - -```ts -// instrumentation.ts - -if (process.env.NEXT_RUNTIME !== "edge") { - import("workflow/api").then(async ({ getWorld }) => { - // start listening to the jobs. - await getWorld().start?.(); - }); -} -``` - diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index efdbee755..42027d2d5 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -1,6 +1,6 @@ # @workflow/world-postgres -An embedded worker/workflow system backed by PostgreSQL for multi-host self-hosted solutions. This is a reference implementation - a production-ready solution might run workers in separate processes with a more robust queuing system. +An embedded worker/workflow system backed by PostgreSQL for multi-host self-hosted solutions ## Installation @@ -16,10 +16,10 @@ yarn add @workflow/world-postgres ### Basic Setup -The postgres world can be configured by setting the `WORKFLOW_TARGET_WORLD` environment variable to the package name: +The postgres world can be configured by setting the WORKFLOW_TARGET_WORLD environment variable to the package name. ```bash -export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" +WORKFLOW_TARGET_WORLD="@workflow/world-postgres" ``` ### Configuration @@ -27,14 +27,9 @@ export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" Configure the PostgreSQL world using environment variables: ```bash -# Required: PostgreSQL connection string -export WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" - -# Optional: Job prefix for queue operations -export WORKFLOW_POSTGRES_JOB_PREFIX="myapp" - -# Optional: Worker concurrency (default: 10) -export WORKFLOW_POSTGRES_WORKER_CONCURRENCY="10" +WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" ``` ### Programmatic Usage @@ -42,31 +37,165 @@ export WORKFLOW_POSTGRES_WORKER_CONCURRENCY="10" You can also create a PostgreSQL world directly in your code: ```typescript -import { createWorld } from "@workflow/world-postgres"; +import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres"; const world = createWorld({ connectionString: "postgres://username:password@localhost:5432/database", - jobPrefix: "myapp", // optional - queueConcurrency: 10, // optional + securityToken: "your-secret-token-here", + queueFactory: createPgBossHttpProxyQueue({ + jobPrefix: "my-app", + queueConcurrency: 10, + }) +}); +``` + +**⚠️ IMPORTANT**: Always set a strong `WORKFLOW_POSTGRES_SECURITY_TOKEN` in production. This token authenticates queue workers when they call your workflow endpoints and prevents unauthorized access. + +## Architecture + +The package supports flexible queues and execution patterns, letting you choose how jobs are queued and where the steps and workflows execution will be happen. + +### Queue Strategy +- **Built-in pg-boss** (default): Reliable PostgreSQL-backed job queue +- **Custom queue**: Implement your own queue system (Redis, SQS, RabbitMQ, etc.) + +### Execution Proxy Strategy +- **HTTP Proxy**: Workers call workflow endpoints over HTTP (`/.well-known/workflow/v1/flow` and `/.well-known/workflow/v1/step`) +- **Function Proxy**: Workers invoke workflow/step functions directly in-process + +### Execution Environment +- **Same Process**: Workers run alongside your application (e.g., in Next.js `instrumentation.ts`) +- **Separate Process**: Dedicated worker process(es) for better isolation and scaling +- **Serverless**: Receive messages from your queue and call a proxy to execute workflows + +## Advanced Usage + +### pg-boss + HTTP Proxy (Default) + +The simplest setup - jobs are queued usning pg-boss and workers make HTTP calls to your application: + +```typescript +import { createWorld } from "@workflow/world-postgres"; + +const world = createWorld(); +await world.start(); +``` + +**Required Environment Variables:** +```bash +WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" +``` + +**Optional Environment Variables:** +```bash +WORKFLOW_POSTGRES_JOB_PREFIX="myapp_" +WORKFLOW_POSTGRES_WORKER_CONCURRENCY="10" +``` + +**Programmatic Configuration:** +```typescript +const world = createWorld({ + connectionString: "postgres://...", + securityToken: "your-secret-token", }); ``` -## Configuration Options +### pg-boss + Function Proxy -| Option | Type | Default | Description | -| ------------------ | -------- | -------------------------------------------------------------------------------------- | ----------------------------------- | -| `connectionString` | `string` | `process.env.WORKFLOW_POSTGRES_URL` or `'postgres://world:world@localhost:5432/world'` | PostgreSQL connection string | -| `jobPrefix` | `string` | `process.env.WORKFLOW_POSTGRES_JOB_PREFIX` | Optional prefix for queue job names | -| `queueConcurrency` | `number` | `10` | Number of concurrent queue workers | +Jobs are using pg-boss and workers directly call workflow functions in the same process -## Environment Variables +```typescript +const { setWorld } = await import('workflow/runtime'); +import { createWorld, createPgBossFunctionProxyQueue } from "@workflow/world-postgres"; -| Variable | Description | Default | -| -------------------------------------- | ------------------------------------------------------------ | ----------------------------------------------- | -| `WORKFLOW_TARGET_WORLD` | Set to `"@workflow/world-postgres"` to use this world | - | -| `WORKFLOW_POSTGRES_URL` | PostgreSQL connection string | `'postgres://world:world@localhost:5432/world'` | -| `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | - | -| `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` | +// Import entrypoints from your framework API routes +import { __wkf_entrypoint as workflowEntrypoint } from './app/.well-known/workflow/v1/flow/route'; +import { __wkf_entrypoint as stepEntrypoint } from './app/.well-known/workflow/v1/step/route'; + +const world = createWorld({ + queueFactory: () => + createPgBossFunctionProxyQueue({ + stepEntrypoint, + workflowEntrypoint, + }), +}); + +setWorld(world); + +await world.start(); +``` + +### Custom Queue Driver + HTTP Proxy + +Implement your own queue system for maximum flexibility: + +```typescript +const { setWorld } = await import('workflow/runtime'); +import { createWorld } from "@workflow/world-postgres"; +import type { QueueDriver, MessageData } from "@workflow/world-postgres/queue-drivers/types"; + +const myCustomQueue: QueueDriver = { + pushStep: async (message: MessageData) => { + // Push step execution message to your queue + await myQueue.push('steps', message); + }, + + pushFlow: async (message: MessageData) => { + // Push workflow execution message to your queue + await myQueue.push('workflows', message); + }, + + start: async () => { + // Start consuming from your queue and execute via proxy + const proxy = createHttpProxy({ + baseUrl: 'http://localhost:3000', + securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN!, + }); + + await myQueue.consume('steps', async (message) => { + await proxy.proxyStep(message); + }); + + await myQueue.consume('workflows', async (message) => { + await proxy.proxyWorkflow(message); + }); + }, +}; + +const world = createWorld({ + queueFactory: () => myCustomQueue, +}); + +setWorld(world); + +await world.start(); +``` + +### Serverless execution + +In a serverless environment, receive messages from your queue and execute them via proxy: + +```typescript +// queue-handler.ts +import { createHttpProxy } from "@workflow/world-postgres"; +import type { MessageData } from "@workflow/world-postgres/queue-drivers/types"; + +const proxy = createHttpProxy({ + baseUrl: process.env.APP_URL, + securityToken: process.env.SECURITY_TOKEN, +}); + +export async function handleQueueMessage(message: MessageData) { + // Determine if it's a step or workflow + if (message.queueName.includes('step')) { + await proxy.proxyStep(message); + } else { + await proxy.proxyWorkflow(message); + } +} +``` ## Database Setup @@ -93,7 +222,7 @@ The CLI automatically loads `.env` files and will use the connection string from ### Database Schema -The setup creates the following tables: +All workflow data is stored in its own PostgreSQL schema, keeping it isolated from your application data. The setup creates the following tables: - `workflow_runs` - Stores workflow execution runs - `workflow_events` - Stores workflow events @@ -109,15 +238,31 @@ import { runs, events, steps, hooks, streams } from '@workflow/world-postgres'; import * as schema from '@workflow/world-postgres/schema'; ``` -Make sure your PostgreSQL database is accessible and the user has sufficient permissions to create tables and manage jobs. +Make sure your PostgreSQL database is accessible and the user has sufficient permissions to create schemas, tables, and manage jobs. + +## Environment Variables Reference + +| Variable | Description | Default | Required For | +| -------------------------------------- | -------------------------------------------- | ----------------------------------------------- | -------------------------- | +| `WORKFLOW_TARGET_WORLD` | Package name to use as workflow world | - | All patterns | +| `WORKFLOW_POSTGRES_URL` | PostgreSQL connection string | `postgres://world:world@localhost:5432/world` | All patterns | +| `WORKFLOW_POSTGRES_SECURITY_TOKEN` | Security token for queue worker auth | `secret` | **Required in production** | +| `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | `workflow_` | Optional | +| `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` | Optional | +| `WORKFLOW_POSTGRES_APP_URL` | Base URL for HTTP proxy | - | Pattern 1 (HTTP proxy) | +| `WORKFLOW_POSTGRES_APP_PORT` | Port for HTTP proxy (if URL not provided) | `3000` | Pattern 1 (HTTP proxy) | + +All environment variables can be overridden by passing configuration programmatically to `createWorld()` or the queue factory functions. ## Features -- **Durable Storage**: Stores workflow runs, events, steps, hooks, and webhooks in PostgreSQL -- **Queue Processing**: Uses pg-boss for reliable job queue processing +- **Durable Storage**: Stores workflow runs, events, steps, hooks, and webhooks in PostgreSQL with schema isolation +- **Flexible Queue System**: Use built-in pg-boss or integrate any queue system (Redis, SQS, RabbitMQ, etc.) +- **Multiple Execution Strategies**: HTTP proxy for distributed systems, function proxy for co-located workers - **Streaming**: Real-time event streaming capabilities - **Health Checks**: Built-in connection health monitoring - **Configurable Concurrency**: Adjustable worker concurrency for queue processing +- **Type-Safe**: Full TypeScript support with exported types ## Development @@ -127,19 +272,20 @@ For local development, you can use the included Docker Compose configuration: # Start PostgreSQL database docker-compose up -d -# Create and run migrations -pnpm drizzle-kit generate -pnpm drizzle-kit migrate +# Run database setup +pnpm exec workflow-postgres-setup # Set environment variables for local development export WORKFLOW_POSTGRES_URL="postgres://world:world@localhost:5432/world" export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" ``` -## World Selection - -To use the PostgreSQL world, set the `WORKFLOW_TARGET_WORLD` environment variable to the package name: +### Creating Migrations ```bash -export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" +pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations ``` + +## License + +See [LICENSE.md](./LICENSE.md) diff --git a/packages/world-postgres/src/cli.ts b/packages/world-postgres/src/cli.ts index 627ac4cd1..209dd466c 100644 --- a/packages/world-postgres/src/cli.ts +++ b/packages/world-postgres/src/cli.ts @@ -4,6 +4,7 @@ import { config } from 'dotenv'; import { drizzle } from 'drizzle-orm/postgres-js'; import { migrate } from 'drizzle-orm/postgres-js/migrator'; import postgres from 'postgres'; +import { DEFAULT_PG_URL } from './config.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -14,7 +15,7 @@ async function setupDatabase() { const connectionString = process.env.WORKFLOW_POSTGRES_URL || process.env.DATABASE_URL || - 'postgres://world:world@localhost:5432/world'; + DEFAULT_PG_URL; console.log('🔧 Setting up database schema...'); console.log( diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index 369784f0e..f27c8698d 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -1,5 +1,58 @@ -export interface PostgresWorldConfig { - connectionString: string; - jobPrefix?: string; - queueConcurrency?: number; +import type { QueueDriver } from './queue-drivers/types.js'; + +export type BaseWorldConfig = { + connectionString?: string; + securityToken?: string; +}; + +export type PostgresWorldConfig = BaseWorldConfig & { + queueFactory?: () => QueueDriver; +}; + +export type ResolvedBaseWorldConfig = Required; + +export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world'; +export const DEFAULT_SECURITY_TOKEN = 'secret'; +export const DEFAULT_JOB_PREFIX = 'workflow_'; +export const DEFAULT_QUEUE_CONCURRENCY = 10; + +let worldConfig: ResolvedBaseWorldConfig | null = null; + +export function loadWorldConfig( + config: BaseWorldConfig = {} +): ResolvedBaseWorldConfig { + worldConfig = { + connectionString: + config.connectionString ?? + process.env.WORKFLOW_POSTGRES_URL ?? + process.env.DATABASE_URL ?? + DEFAULT_PG_URL, + + securityToken: + config.securityToken ?? + process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN ?? + DEFAULT_SECURITY_TOKEN, + }; + + return worldConfig; +} + +export function getWorldConfig(): ResolvedBaseWorldConfig { + if (!worldConfig) { + throw new Error( + 'World config not loaded. Call createWorld() or loadWorldConfig().' + ); + } + + return worldConfig; +} + +export function getQueueConfig() { + return { + jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX ?? DEFAULT_JOB_PREFIX, + queueConcurrency: + (process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY + ? parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY, 10) + : undefined) ?? DEFAULT_QUEUE_CONCURRENCY, + }; } diff --git a/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql b/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql new file mode 100644 index 000000000..38b4631eb --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql @@ -0,0 +1,2 @@ +ALTER TABLE "workflow"."workflow_runs" DROP COLUMN "error_code";--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" DROP COLUMN "error_code"; \ No newline at end of file diff --git a/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql b/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql new file mode 100644 index 000000000..4a1af6570 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql @@ -0,0 +1,7 @@ +-- Convert JSONB to bytea by serializing to text and encoding as UTF-8 +ALTER TABLE "workflow"."workflow_events" ALTER COLUMN "payload" SET DATA TYPE bytea USING convert_to("payload"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_hooks" ALTER COLUMN "metadata" SET DATA TYPE bytea USING convert_to("metadata"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_runs" ALTER COLUMN "output" SET DATA TYPE bytea USING convert_to("output"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_runs" ALTER COLUMN "input" SET DATA TYPE bytea USING convert_to("input"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" ALTER COLUMN "input" SET DATA TYPE bytea USING convert_to("input"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" ALTER COLUMN "output" SET DATA TYPE bytea USING convert_to("output"::text, 'UTF8'); \ No newline at end of file diff --git a/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json b/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json new file mode 100644 index 000000000..c75386cf7 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json @@ -0,0 +1,506 @@ +{ + "id": "a1cbf36d-8801-4509-80c4-8bab67191377", + "prevId": "fbf17a09-4b7e-4939-9ee9-89ada6e197b1", + "version": "7", + "dialect": "postgresql", + "tables": { + "workflow.workflow_events": { + "name": "workflow_events", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "type": { + "name": "type", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "correlation_id": { + "name": "correlation_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "payload": { + "name": "payload", + "type": "jsonb", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_events_run_id_index": { + "name": "workflow_events_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_events_correlation_id_index": { + "name": "workflow_events_correlation_id_index", + "columns": [ + { + "expression": "correlation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_hooks": { + "name": "workflow_hooks", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "hook_id": { + "name": "hook_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "token": { + "name": "token", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "owner_id": { + "name": "owner_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "project_id": { + "name": "project_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "environment": { + "name": "environment", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "metadata": { + "name": "metadata", + "type": "jsonb", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_hooks_run_id_index": { + "name": "workflow_hooks_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_hooks_token_index": { + "name": "workflow_hooks_token_index", + "columns": [ + { + "expression": "token", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_runs": { + "name": "workflow_runs", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "output": { + "name": "output", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "deployment_id": { + "name": "deployment_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "execution_context": { + "name": "execution_context", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_runs_name_index": { + "name": "workflow_runs_name_index", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_runs_status_index": { + "name": "workflow_runs_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_steps": { + "name": "workflow_steps", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "step_id": { + "name": "step_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "step_name": { + "name": "step_name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "step_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "output": { + "name": "output", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "attempt": { + "name": "attempt", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "retry_after": { + "name": "retry_after", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_steps_run_id_index": { + "name": "workflow_steps_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_steps_status_index": { + "name": "workflow_steps_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_stream_chunks": { + "name": "workflow_stream_chunks", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "stream_id": { + "name": "stream_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "data": { + "name": "data", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "eof": { + "name": "eof", + "type": "boolean", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "workflow_stream_chunks_stream_id_id_pk": { + "name": "workflow_stream_chunks_stream_id_id_pk", + "columns": ["stream_id", "id"] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.step_status": { + "name": "step_status", + "schema": "public", + "values": ["pending", "running", "completed", "failed", "cancelled"] + }, + "public.status": { + "name": "status", + "schema": "public", + "values": [ + "pending", + "running", + "completed", + "failed", + "paused", + "cancelled" + ] + } + }, + "schemas": { + "workflow": "workflow" + }, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json b/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json new file mode 100644 index 000000000..ea249d2a4 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json @@ -0,0 +1,506 @@ +{ + "id": "ccda4a09-c27c-4386-a806-d0520bbbcac6", + "prevId": "a1cbf36d-8801-4509-80c4-8bab67191377", + "version": "7", + "dialect": "postgresql", + "tables": { + "workflow.workflow_events": { + "name": "workflow_events", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "type": { + "name": "type", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "correlation_id": { + "name": "correlation_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "payload": { + "name": "payload", + "type": "bytea", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_events_run_id_index": { + "name": "workflow_events_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_events_correlation_id_index": { + "name": "workflow_events_correlation_id_index", + "columns": [ + { + "expression": "correlation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_hooks": { + "name": "workflow_hooks", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "hook_id": { + "name": "hook_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "token": { + "name": "token", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "owner_id": { + "name": "owner_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "project_id": { + "name": "project_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "environment": { + "name": "environment", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "metadata": { + "name": "metadata", + "type": "bytea", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_hooks_run_id_index": { + "name": "workflow_hooks_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_hooks_token_index": { + "name": "workflow_hooks_token_index", + "columns": [ + { + "expression": "token", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_runs": { + "name": "workflow_runs", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "output": { + "name": "output", + "type": "bytea", + "primaryKey": false, + "notNull": false + }, + "deployment_id": { + "name": "deployment_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "execution_context": { + "name": "execution_context", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "input": { + "name": "input", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_runs_name_index": { + "name": "workflow_runs_name_index", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_runs_status_index": { + "name": "workflow_runs_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_steps": { + "name": "workflow_steps", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "step_id": { + "name": "step_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "step_name": { + "name": "step_name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "step_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "output": { + "name": "output", + "type": "bytea", + "primaryKey": false, + "notNull": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "attempt": { + "name": "attempt", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "retry_after": { + "name": "retry_after", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_steps_run_id_index": { + "name": "workflow_steps_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_steps_status_index": { + "name": "workflow_steps_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_stream_chunks": { + "name": "workflow_stream_chunks", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "stream_id": { + "name": "stream_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "data": { + "name": "data", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "eof": { + "name": "eof", + "type": "boolean", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "workflow_stream_chunks_stream_id_id_pk": { + "name": "workflow_stream_chunks_stream_id_id_pk", + "columns": ["stream_id", "id"] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.step_status": { + "name": "step_status", + "schema": "public", + "values": ["pending", "running", "completed", "failed", "cancelled"] + }, + "public.status": { + "name": "status", + "schema": "public", + "values": [ + "pending", + "running", + "completed", + "failed", + "paused", + "cancelled" + ] + } + }, + "schemas": { + "workflow": "workflow" + }, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json index c5099ff85..2eb16aa7e 100644 --- a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json +++ b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json @@ -8,6 +8,20 @@ "when": 1762873019948, "tag": "0000_cultured_the_anarchist", "breakpoints": true + }, + { + "idx": 1, + "version": "7", + "when": 1762968119388, + "tag": "0001_handy_jocasta", + "breakpoints": true + }, + { + "idx": 2, + "version": "7", + "when": 1763679632642, + "tag": "0002_striped_lifeguard", + "breakpoints": true } ] } diff --git a/packages/world-postgres/src/drizzle/schema.ts b/packages/world-postgres/src/drizzle/schema.ts index 11347be28..d43da02cf 100644 --- a/packages/world-postgres/src/drizzle/schema.ts +++ b/packages/world-postgres/src/drizzle/schema.ts @@ -49,18 +49,24 @@ type DrizzlishOfType = { */ export type SerializedContent = any[]; +const bytea = customType<{ data: Buffer; notNull: false; default: false }>({ + dataType() { + return 'bytea'; + }, +}); + export const schema = pgSchema('workflow'); export const runs = schema.table( 'workflow_runs', { runId: varchar('id').primaryKey(), - output: jsonb('output').$type(), + output: bytea('output'), deploymentId: varchar('deployment_id').notNull(), status: workflowRunStatus('status').notNull(), workflowName: varchar('name').notNull(), executionContext: jsonb('execution_context').$type>(), - input: jsonb('input').$type().notNull(), + input: bytea('input').notNull(), error: text('error'), createdAt: timestamp('created_at').defaultNow().notNull(), updatedAt: timestamp('updated_at') @@ -84,7 +90,7 @@ export const events = schema.table( correlationId: varchar('correlation_id'), createdAt: timestamp('created_at').defaultNow().notNull(), runId: varchar('run_id').notNull(), - eventData: jsonb('payload'), + eventData: bytea('payload'), } satisfies DrizzlishOfType, (tb) => ({ runFk: index().on(tb.runId), @@ -99,8 +105,8 @@ export const steps = schema.table( stepId: varchar('step_id').primaryKey(), stepName: varchar('step_name').notNull(), status: stepStatus('status').notNull(), - input: jsonb('input').$type().notNull(), - output: jsonb('output').$type(), + input: bytea('input').notNull(), + output: bytea('output'), error: text('error'), attempt: integer('attempt').notNull(), startedAt: timestamp('started_at'), @@ -128,7 +134,7 @@ export const hooks = schema.table( projectId: varchar('project_id').notNull(), environment: varchar('environment').notNull(), createdAt: timestamp('created_at').defaultNow().notNull(), - metadata: jsonb('metadata').$type(), + metadata: bytea('metadata'), } satisfies DrizzlishOfType, (tb) => ({ runFk: index().on(tb.runId), @@ -136,12 +142,6 @@ export const hooks = schema.table( }) ); -const bytea = customType<{ data: Buffer; notNull: false; default: false }>({ - dataType() { - return 'bytea'; - }, -}); - export const streams = schema.table( 'workflow_stream_chunks', { diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 2efce00c7..4afaf427f 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -1,9 +1,15 @@ import type { Storage, World } from '@workflow/world'; -import PgBoss from 'pg-boss'; import createPostgres from 'postgres'; -import type { PostgresWorldConfig } from './config.js'; +import { loadWorldConfig, type PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; +import { createFunctionProxy } from './proxies/function-proxy.js'; +import { createHttpProxy } from './proxies/http-proxy.js'; import { createQueue } from './queue.js'; +import { + createPgBossFunctionProxyQueue, + createPgBossHttpProxyQueue, +} from './queue-drivers/factories.js'; +import { createPgBossQueue } from './queue-drivers/pgboss.js'; import { createEventsStorage, createHooksStorage, @@ -12,33 +18,20 @@ import { } from './storage.js'; import { createStreamer } from './streamer.js'; -function createStorage(drizzle: Drizzle): Storage { - return { - runs: createRunsStorage(drizzle), - events: createEventsStorage(drizzle), - hooks: createHooksStorage(drizzle), - steps: createStepsStorage(drizzle), - }; -} - export function createWorld( - config: PostgresWorldConfig = { - connectionString: - process.env.WORKFLOW_POSTGRES_URL || - 'postgres://world:world@localhost:5432/world', - jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX, - queueConcurrency: - parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || - 10, - } + opts: PostgresWorldConfig = {} ): World & { start(): Promise } { - const boss = new PgBoss({ - connectionString: config.connectionString, - }); + const config = loadWorldConfig(opts); + + const queueDriver = opts.queueFactory + ? opts.queueFactory() + : createPgBossHttpProxyQueue(); + const postgres = createPostgres(config.connectionString); const drizzle = createClient(postgres); - const queue = createQueue(boss, config); + const storage = createStorage(drizzle); + const queue = createQueue(queueDriver, config.securityToken); const streamer = createStreamer(postgres, drizzle); return { @@ -46,11 +39,27 @@ export function createWorld( ...streamer, ...queue, async start() { - await queue.start(); + await queueDriver.start(); }, }; } -// Re-export schema for users who want to extend or inspect the database schema +function createStorage(drizzle: Drizzle): Storage { + return { + runs: createRunsStorage(drizzle), + events: createEventsStorage(drizzle), + hooks: createHooksStorage(drizzle), + steps: createStepsStorage(drizzle), + }; +} + export type { PostgresWorldConfig } from './config.js'; +// Re-export schema for users who want to extend or inspect the database schema export * from './drizzle/schema.js'; + +export { createFunctionProxy, createHttpProxy }; +export { + createPgBossQueue, + createPgBossFunctionProxyQueue, + createPgBossHttpProxyQueue, +}; diff --git a/packages/world-postgres/src/proxies/function-proxy.ts b/packages/world-postgres/src/proxies/function-proxy.ts new file mode 100644 index 000000000..9a6705bdc --- /dev/null +++ b/packages/world-postgres/src/proxies/function-proxy.ts @@ -0,0 +1,29 @@ +import type { MessageData } from '../queue-drivers/types.js'; +import type { WkfProxy } from './types.js'; +import { prepareRequestParams } from './utils.js'; + +export function createFunctionProxy(opts: { + securityToken: string; + stepEntrypoint: (request: Request) => Promise; + workflowEntrypoint: (request: Request) => Promise; +}): WkfProxy { + return { + proxyWorkflow: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + prepareRequestParams(message, opts.securityToken) + ); + + return opts.workflowEntrypoint(request); + }, + + proxyStep: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + prepareRequestParams(message, opts.securityToken) + ); + + return opts.stepEntrypoint(request); + }, + }; +} diff --git a/packages/world-postgres/src/proxies/http-proxy.ts b/packages/world-postgres/src/proxies/http-proxy.ts new file mode 100644 index 000000000..7bb9b5440 --- /dev/null +++ b/packages/world-postgres/src/proxies/http-proxy.ts @@ -0,0 +1,33 @@ +import type { MessageData } from '../queue-drivers/types.js'; +import type { WkfProxy } from './types.js'; +import { prepareRequestParams } from './utils.js'; + +export function createHttpProxy(opts: { + port?: number; + baseUrl?: string; + securityToken: string; +}): WkfProxy { + const resolveBaseUrl = (): string => { + if (opts.baseUrl) return opts.baseUrl; + if (opts.port) return `http://localhost:${opts.port}`; + return 'http://localhost:3000'; + }; + + const baseUrl = resolveBaseUrl(); + + return { + proxyWorkflow: async (message: MessageData): Promise => { + return fetch( + `${baseUrl}/.well-known/workflow/v1/flow`, + prepareRequestParams(message, opts.securityToken) + ); + }, + + proxyStep: async (message: MessageData): Promise => { + return fetch( + `${baseUrl}/.well-known/workflow/v1/step`, + prepareRequestParams(message, opts.securityToken) + ); + }, + }; +} diff --git a/packages/world-postgres/src/proxies/types.ts b/packages/world-postgres/src/proxies/types.ts new file mode 100644 index 000000000..faca2b9ca --- /dev/null +++ b/packages/world-postgres/src/proxies/types.ts @@ -0,0 +1,6 @@ +import type { MessageData } from '../queue-drivers/types.js'; + +export interface WkfProxy { + proxyWorkflow: (message: MessageData) => Promise; + proxyStep: (message: MessageData) => Promise; +} diff --git a/packages/world-postgres/src/proxies/utils.ts b/packages/world-postgres/src/proxies/utils.ts new file mode 100644 index 000000000..18563abca --- /dev/null +++ b/packages/world-postgres/src/proxies/utils.ts @@ -0,0 +1,15 @@ +import { MessageData } from '../queue-drivers/types.js'; + +export const prepareRequestParams = ( + message: MessageData, + securityToken: string +) => { + return { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Workflow-Secret': securityToken, + }, + body: JSON.stringify(MessageData.encode(message)), + }; +}; diff --git a/packages/world-postgres/src/queue-drivers/factories.ts b/packages/world-postgres/src/queue-drivers/factories.ts new file mode 100644 index 000000000..df314e157 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/factories.ts @@ -0,0 +1,87 @@ +import { getQueueConfig, getWorldConfig } from '../config.js'; +import { createFunctionProxy } from '../proxies/function-proxy.js'; +import { createHttpProxy } from '../proxies/http-proxy.js'; +import { createPgBossQueue } from './pgboss.js'; +import type { QueueDriver } from './types.js'; + +/** + * QueueDriver implementation using pg-boss for job management + * and direct function calls for execution. + */ +export function createPgBossFunctionProxyQueue(opts: { + jobPrefix?: string; + securityToken?: string; + connectionString?: string; + queueConcurrency?: number; + stepEntrypoint: (request: Request) => Promise; + workflowEntrypoint: (request: Request) => Promise; +}): QueueDriver { + const worldDefaults = getWorldConfig(); + const queueDefaults = getQueueConfig(); + + const config = { + connectionString: opts.connectionString ?? worldDefaults.connectionString, + securityToken: opts.securityToken ?? worldDefaults.securityToken, + jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix, + queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency, + }; + + return createPgBossQueue( + { + jobPrefix: config.jobPrefix, + connectionString: config.connectionString, + queueConcurrency: config.queueConcurrency, + }, + createFunctionProxy({ + securityToken: config.securityToken, + stepEntrypoint: opts.stepEntrypoint, + workflowEntrypoint: opts.workflowEntrypoint, + }) + ); +} + +/** + * QueueDriver implementation using pg-boss for job management + * and HTTP for execution. + */ +export function createPgBossHttpProxyQueue( + opts: { + port?: number; + baseUrl?: string; + jobPrefix?: string; + securityToken?: string; + connectionString?: string; + queueConcurrency?: number; + } = {} +): QueueDriver { + const worldDefaults = getWorldConfig(); + const queueDefaults = getQueueConfig(); + + const config = { + connectionString: opts.connectionString ?? worldDefaults.connectionString, + securityToken: opts.securityToken ?? worldDefaults.securityToken, + jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix, + queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency, + + port: + opts.port ?? + (process.env.WORKFLOW_POSTGRES_APP_PORT + ? parseInt(process.env.WORKFLOW_POSTGRES_APP_PORT, 10) + : undefined), + + baseUrl: opts.baseUrl ?? process.env.WORKFLOW_POSTGRES_APP_URL, + }; + + return createPgBossQueue( + { + jobPrefix: config.jobPrefix, + connectionString: config.connectionString, + queueConcurrency: config.queueConcurrency, + }, + createHttpProxy({ + port: config.port, + baseUrl: config.baseUrl, + securityToken: config.securityToken, + }) + ); +} diff --git a/packages/world-postgres/src/queue-drivers/pgboss.ts b/packages/world-postgres/src/queue-drivers/pgboss.ts new file mode 100644 index 000000000..ff7fb4014 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/pgboss.ts @@ -0,0 +1,116 @@ +import PgBoss from 'pg-boss'; +import type { WkfProxy } from '../proxies/types.js'; +import { MessageData, type QueueDriver } from './types.js'; + +/** + * Base QueueDriver implementation using pg-boss for job management. + * Takes in a proxy that will handle the actual step/flow execution. + */ +export function createPgBossQueue( + opts: { + jobPrefix: string; + connectionString: string; + queueConcurrency: number; + }, + proxy: WkfProxy +): QueueDriver { + let startPromise: Promise | null = null; + const boss = new PgBoss(opts.connectionString); + + const stepQueueName = `${opts.jobPrefix}steps`; + const workflowQueueName = `${opts.jobPrefix}flows`; + + const ensureStarted = async () => { + if (!startPromise) { + startPromise = boss.start().then(() => { + return Promise.all([ + boss.createQueue(workflowQueueName), + boss.createQueue(stepQueueName), + ]); + }); + } + + await startPromise; + }; + + return { + pushStep: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(stepQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + pushFlow: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(workflowQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + start: async () => { + await ensureStarted(); + + const stepWorker = createWorker(boss, stepQueueName, proxy.proxyStep); + const workflowWorker = createWorker( + boss, + workflowQueueName, + proxy.proxyWorkflow + ); + + for (let i = 0; i < opts.queueConcurrency; i++) { + await boss.work(workflowQueueName, workflowWorker); + await boss.work(stepQueueName, stepWorker); + } + }, + }; +} + +function createWorker( + boss: PgBoss, + queueName: string, + proxy: WkfProxy[keyof WkfProxy] +) { + return async ([job]: PgBoss.Job[]) => { + const message = MessageData.parse(job.data); + + console.log(`[${job.id}] running: ${message.queueName}`); + + try { + const response = await proxy(message); + + if (response.status === 503) { + const body = (await response.json()) as { timeoutSeconds?: number }; + + if (body.timeoutSeconds) { + await boss.send(queueName, job.data, { + startAfter: new Date(Date.now() + body.timeoutSeconds * 1000), + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + + console.log( + `[${job.id}] requeued: ${message.queueName} for ${body.timeoutSeconds}s` + ); + + return; + } + } + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Step failed: ${text}`); + } + } catch (error) { + console.error( + `[${job.id}] Error handling step: ${message.queueName}`, + error + ); + throw error; + } + }; +} diff --git a/packages/world-postgres/src/boss.ts b/packages/world-postgres/src/queue-drivers/types.ts similarity index 55% rename from packages/world-postgres/src/boss.ts rename to packages/world-postgres/src/queue-drivers/types.ts index b0af95454..7adf9ba80 100644 --- a/packages/world-postgres/src/boss.ts +++ b/packages/world-postgres/src/queue-drivers/types.ts @@ -1,21 +1,30 @@ import { MessageId } from '@workflow/world'; import * as z from 'zod'; -import { Base64Buffer } from './zod.js'; +import { Base64Buffer } from '../zod.js'; /** -/* pgboss is using JSON under the hood, so we need to base64 encode -/* the body to ensure binary safety -/* maybe later we can have a `blobs` table for larger payloads - **/ + * Most queues are using JSON under the hood, so we need to base64 + * encode the body to ensure binary safety maybe later we can + * have a `blobs` table for larger payloads + */ export const MessageData = z.object({ - attempt: z.number().describe('The attempt number of the message'), - messageId: MessageId.describe('The unique ID of the message'), - idempotencyKey: z.string().optional(), id: z .string() .describe( "The ID of the sub-queue. For workflows, it's the workflow name. For steps, it's the step name." ), + + idempotencyKey: z.string().optional(), + queueName: z.string().describe('The name of the queue'), data: Base64Buffer.describe('The message that was sent'), + messageId: MessageId.describe('The unique ID of the message'), + attempt: z.number().describe('The attempt number of the message'), }); + export type MessageData = z.infer; + +export interface QueueDriver { + pushStep: (message: MessageData) => Promise; + pushFlow: (message: MessageData) => Promise; + start: () => Promise; +} diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 5aded60f0..1396342af 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -1,135 +1,150 @@ -import * as Stream from 'node:stream'; +import Stream from 'node:stream'; import { JsonTransport } from '@vercel/queue'; import { MessageId, type Queue, + type QueuePayload, QueuePayloadSchema, type QueuePrefix, type ValidQueueName, } from '@workflow/world'; -import { createEmbeddedWorld } from '@workflow/world-local'; -import type PgBoss from 'pg-boss'; import { monotonicFactory } from 'ulid'; -import { MessageData } from './boss.js'; -import type { PostgresWorldConfig } from './config.js'; - -/** - * The Postgres queue works by creating two job types in pg-boss: - * - `workflow` for workflow jobs - * - `step` for step jobs - * - * When a message is queued, it is sent to pg-boss with the appropriate job type. - * When a job is processed, it is deserialized and then re-queued into the _embedded world_, showing that - * we can reuse the embedded world, mix and match worlds to build - * hybrid architectures, and even migrate between worlds. - */ -export function createQueue( - boss: PgBoss, - config: PostgresWorldConfig -): Queue & { start(): Promise } { - const port = process.env.PORT ? Number(process.env.PORT) : undefined; - const embeddedWorld = createEmbeddedWorld({ dataDir: undefined, port }); +import { MessageData, type QueueDriver } from './queue-drivers/types.js'; - const transport = new JsonTransport(); - const generateMessageId = monotonicFactory(); +const transport = new JsonTransport(); - const prefix = config.jobPrefix || 'workflow_'; - const Queues = { - __wkf_workflow_: `${prefix}flows`, - __wkf_step_: `${prefix}steps`, - } as const satisfies Record; +const QUEUE_MAX_VISIBILITY = + parseInt(process.env.WORKFLOW_POSTGRES_QUEUE_MAX_VISIBILITY ?? '0', 10) || + Infinity; - const createQueueHandler = embeddedWorld.createQueueHandler; +export function createQueue( + queueDriver: QueueDriver, + securityToken: string +): Queue { + const generateMessageId = monotonicFactory(); const getDeploymentId: Queue['getDeploymentId'] = async () => { return 'postgres'; }; - const createdQueues = new Map>(); - - function createQueue(name: string) { - let createdQueue = createdQueues.get(name); - if (!createdQueue) { - createdQueue = boss.createQueue(name); - createdQueues.set(name, createdQueue); - } - return createdQueue; - } - const queue: Queue['queue'] = async (queue, message, opts) => { - await boss.start(); - const [prefix, queueId] = parseQueueName(queue); - const jobName = Queues[prefix]; - await createQueue(jobName); const body = transport.serialize(message); + const [prefix, queueId] = parseQueueName(queue); const messageId = MessageId.parse(`msg_${generateMessageId()}`); - await boss.send({ - name: jobName, - options: { - singletonKey: opts?.idempotencyKey ?? messageId, - retryLimit: 3, - }, - data: MessageData.encode({ - id: queueId, - data: body, - attempt: 1, - messageId, - idempotencyKey: opts?.idempotencyKey, - }), - }); + + const payload = { + id: queueId, + data: body, + attempt: 0, + messageId, + idempotencyKey: opts?.idempotencyKey, + queueName: `${prefix}${queueId}`, + }; + + switch (prefix) { + case '__wkf_step_': + await queueDriver.pushStep(payload); + break; + + case '__wkf_workflow_': + await queueDriver.pushFlow(payload); + break; + } + return { messageId }; }; - async function setupListener(queue: QueuePrefix, jobName: string) { - await createQueue(jobName); - await Promise.all( - Array.from({ length: config.queueConcurrency || 10 }, async () => { - await boss.work(jobName, work); - }) - ); - - async function work([job]: PgBoss.Job[]) { - const messageData = MessageData.parse(job.data); - const bodyStream = Stream.Readable.toWeb( - Stream.Readable.from([messageData.data]) - ); - const body = await transport.deserialize( - bodyStream as ReadableStream - ); - const message = QueuePayloadSchema.parse(body); - const queueName = `${queue}${messageData.id}` as const; - await embeddedWorld.queue(queueName, message, { - idempotencyKey: messageData.idempotencyKey, - }); - } - } + const createQueueHandler: Queue['createQueueHandler'] = ( + _prefix, + handler + ) => { + return async (req) => { + const secret = req.headers.get('X-Workflow-Secret'); + const [message, payload] = await parse(req); - async function setupListeners() { - for (const [prefix, jobName] of Object.entries(Queues) as [ - QueuePrefix, - string, - ][]) { - await setupListener(prefix, jobName); - } - } + if (!secret || securityToken !== secret) { + return Response.json( + { error: 'Unauthorized: Invalid or missing secret key' }, + { status: 401 } + ); + } + + if (!isValidQueueName(message.queueName)) { + return Response.json( + { error: `Invalid queue name: ${message.queueName}` }, + { status: 400 } + ); + } + + try { + const result = await handler(payload, { + attempt: message.attempt, + queueName: message.queueName, + messageId: message.messageId, + }); + + let timeoutSeconds: number | null = null; + if (typeof result?.timeoutSeconds === 'number') { + timeoutSeconds = Math.min( + result.timeoutSeconds, + QUEUE_MAX_VISIBILITY + ); + } + + if (timeoutSeconds) { + return Response.json({ timeoutSeconds }, { status: 503 }); + } + + return Response.json({ ok: true }); + } catch (error) { + return Response.json(String(error), { status: 500 }); + } + }; + }; return { createQueueHandler, getDeploymentId, queue, - async start() { - boss = await boss.start(); - await setupListeners(); - }, }; } const parseQueueName = (name: ValidQueueName): [QueuePrefix, string] => { const prefixes: QueuePrefix[] = ['__wkf_step_', '__wkf_workflow_']; + for (const prefix of prefixes) { if (name.startsWith(prefix)) { return [prefix, name.slice(prefix.length)]; } } + throw new Error(`Invalid queue name: ${name}`); }; + +function isValidQueueName(name: string): name is ValidQueueName { + const prefixes: QueuePrefix[] = ['__wkf_step_', '__wkf_workflow_']; + + for (const prefix of prefixes) { + if (name.startsWith(prefix)) { + return true; + } + } + + return false; +} + +async function parse(req: Request): Promise<[MessageData, QueuePayload]> { + const reqBody = await req.json(); + const messageData = MessageData.parse(reqBody); + const bodyStream = Stream.Readable.toWeb( + Stream.Readable.from([messageData.data]) + ); + + const body = await transport.deserialize( + bodyStream as ReadableStream + ); + + const payload = QueuePayloadSchema.parse(body); + + return [messageData, payload]; +} diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index bfe9726a4..0b89607fd 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -51,10 +51,17 @@ function serializeRunError(data: UpdateWorkflowRunRequest): any { * - If errorStack/errorCode exist (legacy) → combine into StructuredError */ function deserializeRunError(run: any): WorkflowRun { - const { error, errorStack, errorCode, ...rest } = run; + const { error, errorStack, errorCode, input, output, ...rest } = run; + + const deserializedInput = fromBuffer(input); + const deserializedOutput = fromBuffer(output); if (!error && !errorStack && !errorCode) { - return run as WorkflowRun; + return { + ...run, + input: deserializedInput, + output: deserializedOutput, + } as WorkflowRun; } // Try to parse as structured error JSON @@ -64,6 +71,8 @@ function deserializeRunError(run: any): WorkflowRun { if (typeof parsed === 'object' && parsed.message !== undefined) { return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: parsed.message, stack: parsed.stack, @@ -79,6 +88,8 @@ function deserializeRunError(run: any): WorkflowRun { // Backwards compatibility: handle legacy separate fields or plain string error return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: error || '', stack: errorStack, @@ -110,10 +121,17 @@ function serializeStepError(data: UpdateStepRequest): any { * Deserialize error JSON string (or legacy flat fields) into a StructuredError object for steps */ function deserializeStepError(step: any): Step { - const { error, ...rest } = step; + const { error, input, output, ...rest } = step; + + const deserializedInput = fromBuffer(input); + const deserializedOutput = fromBuffer(output); if (!error) { - return step as Step; + return { + ...step, + input: deserializedInput, + output: deserializedOutput, + } as Step; } // Try to parse as structured error JSON @@ -123,6 +141,8 @@ function deserializeStepError(step: any): Step { if (typeof parsed === 'object' && parsed.message !== undefined) { return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: parsed.message, stack: parsed.stack, @@ -138,6 +158,8 @@ function deserializeStepError(step: any): Step { // Backwards compatibility: handle legacy separate fields or plain string error return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: error || '', }, @@ -267,11 +289,18 @@ export function createRunsStorage(drizzle: Drizzle): Storage['runs'] { }, async create(data) { const runId = `wrun_${ulid()}`; + const inputBuffer = toBuffer(data.input); + if (!inputBuffer) { + throw new WorkflowAPIError(`Invalid input data`, { + status: 400, + }); + } + const [value] = await drizzle .insert(runs) .values({ runId, - input: data.input, + input: inputBuffer, executionContext: data.executionContext as Record< string, unknown @@ -306,7 +335,7 @@ export function createRunsStorage(drizzle: Drizzle): Storage['runs'] { const updates: Partial = { ...serialized, - output: data.output as SerializedContent, + output: toBuffer(data.output), }; // Only set startedAt the first time transitioning to 'running' @@ -360,7 +389,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { eventId, correlationId: data.correlationId, eventType: data.eventType, - eventData: 'eventData' in data ? data.eventData : undefined, + eventData: 'eventData' in data ? toBuffer(data.eventData) : undefined, }) .returning({ createdAt: events.createdAt }); if (!value) { @@ -399,7 +428,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = EventSchema.parse(compact(v)); + const deserialized = deserializeEvent(compact(v)); + const parsed = EventSchema.parse(deserialized); return filterEventData(parsed, resolveData); }), cursor: values.at(-1)?.eventId ?? null, @@ -432,7 +462,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = EventSchema.parse(compact(v)); + const deserialized = deserializeEvent(compact(v)); + const parsed = EventSchema.parse(deserialized); return filterEventData(parsed, resolveData); }), cursor: values.at(-1)?.eventId ?? null, @@ -458,7 +489,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { .from(hooks) .where(eq(hooks.hookId, hookId)) .limit(1); - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -469,6 +501,7 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { runId, hookId: data.hookId, token: data.token, + metadata: toBuffer(data.metadata), ownerId: '', // TODO: get from context projectId: '', // TODO: get from context environment: '', // TODO: get from context @@ -480,7 +513,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 409, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -491,7 +525,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 404, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -515,7 +550,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = HookSchema.parse(compact(v)); + const deserialized = deserializeHook(compact(v)); + const parsed = HookSchema.parse(deserialized); return filterHookData(parsed, resolveData); }), cursor: values.at(-1)?.hookId ?? null, @@ -532,7 +568,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 404, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -544,15 +581,22 @@ export function createStepsStorage(drizzle: Drizzle): Storage['steps'] { return { async create(runId, data) { + const inputBuffer = toBuffer(data.input); + if (!inputBuffer) { + throw new WorkflowAPIError(`Invalid input data`, { + status: 400, + }); + } + const [value] = await drizzle .insert(steps) .values({ runId, stepId: data.stepId, stepName: data.stepName, - input: data.input as SerializedContent, + input: inputBuffer, status: 'pending', - attempt: 1, + attempt: 0, }) .onConflictDoNothing() .returning(); @@ -606,7 +650,7 @@ export function createStepsStorage(drizzle: Drizzle): Storage['steps'] { const updates: Partial = { ...serialized, - output: data.output as SerializedContent, + output: toBuffer(data.output), }; const now = new Date(); // Only set startedAt the first time the step transitions to 'running' @@ -690,6 +734,22 @@ function filterHookData(hook: Hook, resolveData: ResolveData): Hook { return hook; } +function deserializeEvent(event: any): Event { + const { eventData, ...rest } = event; + return { + ...rest, + eventData: eventData ? fromBuffer(eventData) : undefined, + } as Event; +} + +function deserializeHook(hook: any): Hook { + const { metadata, ...rest } = hook; + return { + ...rest, + metadata: metadata ? fromBuffer(metadata) : undefined, + } as Hook; +} + function filterEventData(event: Event, resolveData: ResolveData): Event { if (resolveData === 'none' && 'eventData' in event) { const { eventData: _, ...rest } = event; @@ -698,3 +758,24 @@ function filterEventData(event: Event, resolveData: ResolveData): Event { } return event; } + +/** + * Convert SerializedContent (array) to Buffer for BYTEA storage. + * PostgreSQL BYTEA natively supports null bytes, unlike JSONB. + */ +function toBuffer( + data: SerializedContent | unknown | undefined | null +): Buffer | undefined { + if (data === undefined || data === null) return undefined; + return Buffer.from(JSON.stringify(data), 'utf8'); +} + +/** + * Convert Buffer from BYTEA storage back to SerializedContent (array). + */ +function fromBuffer( + buffer: Buffer | undefined | null +): SerializedContent | undefined { + if (!buffer) return undefined; + return JSON.parse(buffer.toString('utf8')); +} diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 0ad9edccd..383355bbc 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -43,6 +43,11 @@ export const QueuePayloadSchema = z.union([ ]); export type QueuePayload = z.infer; +export interface QueueOptions { + deploymentId?: string; + idempotencyKey?: string; +} + export interface Queue { getDeploymentId(): Promise; @@ -56,10 +61,7 @@ export interface Queue { queue( queueName: ValidQueueName, message: QueuePayload, - opts?: { - deploymentId?: string; - idempotencyKey?: string; - } + opts?: QueueOptions ): Promise<{ messageId: MessageId }>; /**