From e88dfdec3aa7ebfb311a4a8b9266bb4a4a9f8415 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 15 Aug 2025 15:14:47 -0400 Subject: [PATCH 1/3] fix: go and python benchmarks for hatchet --- competitors/hatchet/README.md | 43 ++++--- .../go/hatchet-go-quickstart/cmd/run/main.go | 45 ++----- .../hatchet-go-quickstart/cmd/worker/main.go | 8 +- .../go/hatchet-go-quickstart/run-workers.sh | 44 +++++++ .../workflows/fibo_bulk_workflow.go | 40 ------- .../workflows/fibo_workflow.go | 112 +++++++++++------- competitors/hatchet/flows/python/worker.py | 60 +++++++--- 7 files changed, 194 insertions(+), 158 deletions(-) create mode 100644 competitors/hatchet/flows/go/hatchet-go-quickstart/run-workers.sh delete mode 100644 competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_bulk_workflow.go diff --git a/competitors/hatchet/README.md b/competitors/hatchet/README.md index 26eadb1..70aacb3 100644 --- a/competitors/hatchet/README.md +++ b/competitors/hatchet/README.md @@ -1,15 +1,14 @@ -Hatchet -======= +# Hatchet ## Installation -- Follow the guide on (Hatchet's official documentation)[https://docs.hatchet.run/self-hosting/docker-compose] +- Follow the guide on [Hatchet's official documentation](https://docs.hatchet.run/self-hosting/docker-compose) - `docker compose up -d` - create an API token on the UI (localhost:8080) and export it in your terminal along with - ``` - export HATCHET_CLIENT_TOKEN="..." - HATCHET_CLIENT_TLS_STRATEGY=none - ``` + ``` + export HATCHET_CLIENT_TOKEN="..." + HATCHET_CLIENT_TLS_STRATEGY=none + ``` ## Running flows @@ -17,15 +16,15 @@ Hatchet - Run the workers: e.g - ``` - ITERATIONS=10 python3 worker.py - ``` + ``` + ITERATIONS=10 python3 worker.py + ``` - Trigger the workflow - ``` - python3 trigger.py --n 33 - ``` + ``` + python3 trigger.py --n 33 + ``` ### Go @@ -33,18 +32,16 @@ We followed the sample in https://github.com/hatchet-dev/hatchet-go-quickstart - Run the workers: e.g - ``` - # Run workers (e.g 10) - # ITERATIONS / PARALLEL can be passed depending on what analysis you're trying to do - for i in {1..10}; do go run ITERATIONS=400 cmd/worker/main.go > /dev/null & done - ``` + ``` + # Run workers (e.g 10) + sh run-workers.sh 10 + ``` - Trigger the workflow - ``` - # or use --bulk --iterations=xxx to trigger bulk run mode - go run cmd/run/main.go --n=38 - ``` + ``` + go run cmd/run/main.go --n=38 + ``` ## Timing analysis @@ -58,4 +55,4 @@ python3 benchmark_analyzer.py d264c5cd-c769-4cef-ba46-664c06e265cd 10 # create file e.g run_100.txt with the 100 different workflow ids you get from the cmd/run/main.go command # e.g 10 workers, run_100.txt with the ids python3 benchmark_analyzer.py 10 --bulk run_100.txt -``` \ No newline at end of file +``` diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/run/main.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/run/main.go index 22f68e5..9d85974 100644 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/run/main.go +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/run/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "log" + "time" hatchet_client "hatchet-go-quickstart/hatchet_client" workflows "hatchet-go-quickstart/workflows" @@ -12,8 +13,6 @@ import ( func main() { n := flag.Int("n", 10, "Fibonacci number to compute") - iterations := flag.Int("iterations", 1, "Number of bulk iterations") - bulk := flag.Bool("bulk", false, "Run multiple independent Fibonacci workflows") flag.Parse() hatchet, err := hatchet_client.HatchetClient() @@ -21,39 +20,19 @@ func main() { log.Fatalf("failed to initialize hatchet client: %v", err) } - if *bulk { - fmt.Printf("Running %d independent Fibonacci workflows with n=%d...\n", *iterations, *n) + fmt.Printf("Running single Fibonaccii workflow for n=%d...\n", *n) - wf := workflows.BulkFibonacciWorkflow(hatchet) + _, parent := workflows.FibonacciWorkflow(hatchet) - var inputs []workflows.FiboInput - for i := 0; i < *iterations; i++ { - inputs = append(inputs, workflows.FiboInput{ - N: *n, - }) - } + start := time.Now() - runIDs, err := wf.RunBulkNoWait(context.Background(), inputs) - if err != nil { - log.Fatalf("RunBulkNoWait failed: %v", err) - } + _, err = parent.Run(context.Background(), workflows.ParentInput{ + N: *n, + }) - for _, id := range runIDs { - fmt.Println("Started workflow run:", id) - } - } else { - fmt.Printf("Running single Fibonaccii workflow for n=%d...\n", *n) - - wf := workflows.FibonacciWorkflow(hatchet) - - result, err := wf.Run(context.Background(), workflows.FibonacciInput{ - N: *n, - }) - - if err != nil { - log.Fatalf("workflow run failed: %v", err) - } - - fmt.Printf("fibo(%d) = %d\n", *n, result.Fibo.Result) + if err != nil { + log.Fatalf("workflow run failed: %v", err) } -} \ No newline at end of file + + fmt.Printf("fibo(%d) workflow completed successfully in %v\n", *n, time.Since(start)) +} diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go index e08149e..c919ba8 100644 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go @@ -10,21 +10,21 @@ import ( ) func main() { - hatchet, err := hatchet_client.HatchetClient() if err != nil { panic(err) } + child, parent := workflows.FibonacciWorkflow(hatchet) + worker, err := hatchet.Worker( worker.WorkerOpts{ Name: "fibo-workflow-worker", Workflows: []workflow.WorkflowBase{ - workflows.FibonacciWorkflow(hatchet), - workflows.BulkFibonacciWorkflow(hatchet), + child, parent, }, - Slots: 1, + Slots: 100, }, ) diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/run-workers.sh b/competitors/hatchet/flows/go/hatchet-go-quickstart/run-workers.sh new file mode 100644 index 0000000..7b6d0bf --- /dev/null +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/run-workers.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# Check if number of workers argument is provided +if [ $# -eq 0 ]; then + echo "Usage: $0 " + echo "Example: $0 5" + exit 1 +fi + +# Get number of workers from first argument +num_workers=$1 + +# Validate that the argument is a positive integer +if ! [[ "$num_workers" =~ ^[1-9][0-9]*$ ]]; then + echo "Error: Number of workers must be a positive integer" + exit 1 +fi + +# Array to store process IDs +pids=() + +# Function to clean up background processes +cleanup() { + echo "Terminating all worker processes..." + for pid in "${pids[@]}"; do + kill "$pid" 2>/dev/null + done + exit 0 +} + +# Set up trap to catch SIGINT (Ctrl+C) +trap cleanup SIGINT + +# Start the specified number of worker processes +for ((i=1; i<=num_workers; i++)); do + go run cmd/worker/main.go > /dev/null & + pids+=($!) # Store the process ID +done + +echo "Started $num_workers worker processes. Press Ctrl+C to terminate all." + +# Wait for all background processes to complete +# or for user to press Ctrl+C +wait \ No newline at end of file diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_bulk_workflow.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_bulk_workflow.go deleted file mode 100644 index aba26c9..0000000 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_bulk_workflow.go +++ /dev/null @@ -1,40 +0,0 @@ -package workflows - -import ( - "fmt" - - "github.com/hatchet-dev/hatchet/pkg/client/create" - v1 "github.com/hatchet-dev/hatchet/pkg/v1" - "github.com/hatchet-dev/hatchet/pkg/v1/factory" - "github.com/hatchet-dev/hatchet/pkg/v1/workflow" - "github.com/hatchet-dev/hatchet/pkg/worker" -) - -type FiboInput struct { - N int `json:"n"` -} - -type FiboTaskOutput struct { - Result int `json:"result"` -} - -func BulkFibonacciWorkflow(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[FiboInput, FiboTaskOutput] { - wf := factory.NewWorkflow[FiboInput, FiboTaskOutput]( - create.WorkflowCreateOpts[FiboInput]{ - Name: "bulk-fibo-workflow", - }, - hatchet, - ) - - wf.Task( - create.WorkflowTask[FiboInput, FiboTaskOutput]{ - Name: "compute_fibo", - }, - func(ctx worker.HatchetContext, input FiboInput) (any, error) { - fmt.Printf("Running fibo(%d)\n", input.N) - return &FiboTaskOutput{Result: fibo(input.N)}, nil - }, - ) - - return wf -} \ No newline at end of file diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go index d921c0f..9a1bfb0 100644 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go @@ -4,11 +4,11 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/hatchet-dev/hatchet/pkg/client/create" v1 "github.com/hatchet-dev/hatchet/pkg/v1" "github.com/hatchet-dev/hatchet/pkg/v1/factory" - "github.com/hatchet-dev/hatchet/pkg/v1/task" "github.com/hatchet-dev/hatchet/pkg/v1/workflow" "github.com/hatchet-dev/hatchet/pkg/worker" ) @@ -21,54 +21,84 @@ type FibonacciOutput struct { Result int } -type FibonacciResult struct { - Fibo FibonacciOutput +type ParentInput struct { + N int } -func FibonacciWorkflow(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[FibonacciInput, FibonacciResult] { - simple := factory.NewWorkflow[FibonacciInput, FibonacciResult]( - create.WorkflowCreateOpts[FibonacciInput]{ - Name: "fibo-workflow", +type ParentOutput struct{} + +func FibonacciWorkflow(hatchet v1.HatchetClient) (workflow.WorkflowDeclaration[FibonacciInput, FibonacciOutput], workflow.WorkflowDeclaration[ParentInput, ParentOutput]) { + fibo := factory.NewTask[FibonacciInput, FibonacciOutput]( + create.StandaloneTask{ + Name: "fibo-task-2", + }, + func(ctx worker.HatchetContext, input FibonacciInput) (*FibonacciOutput, error) { + fmt.Println("Fibonacci task called") + return &FibonacciOutput{ + Result: fibo(input.N), + }, nil }, hatchet, ) - var previous *task.TaskDeclaration[FibonacciInput] - parallel := os.Getenv("PARALLEL") != "" + parent := factory.NewTask( + create.StandaloneTask{ + Name: "fibo-parent-2", + }, + func(ctx worker.HatchetContext, input ParentInput) (*ParentOutput, error) { + iterations := 100 + if iterStr := os.Getenv("ITERATIONS"); iterStr != "" { + if iter, err := strconv.Atoi(iterStr); err == nil && iter > 0 { + iterations = iter + } + } - iterations := 100 - if iterStr := os.Getenv("ITERATIONS"); iterStr != "" { - if iter, err := strconv.Atoi(iterStr); err == nil && iter > 0 { - iterations = iter - } - } + parallel := os.Getenv("PARALLEL") != "" - for i := 0; i < iterations; i++ { - name := fmt.Sprintf("fibo_%d", i) - - opts := create.WorkflowTask[FibonacciInput, FibonacciResult]{ - Name: name, - } - - // Only set parents if not running in parallel mode - if !parallel && previous != nil { - opts.Parents = []create.NamedTask{previous} - } - - current := simple.Task( - opts, - func(ctx worker.HatchetContext, input FibonacciInput) (any, error) { - fmt.Println("Fibonacci task called") - return &FibonacciOutput{ - Result: fibo(input.N), - }, nil - }, - ) - - previous = current - } + if parallel { + wg := sync.WaitGroup{} + wg.Add(iterations) + + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + n := input.N + fmt.Printf("Fibonacci task %d called\n", n) + result, err := fibo.Run(ctx, FibonacciInput{N: n}) + + if err != nil { + fmt.Printf("Error in Fibonacci task %d: %v\n", n, err) + return + } + + fmt.Printf("Fibonacci result for %d: %d\n", n, result.Result) + }() + } - return simple + wg.Wait() + } else { + fmt.Println("Running in sequential mode") + + for i := 0; i < iterations; i++ { + fmt.Printf("Fibonacci task %d called\n", i) + n := input.N + result, err := fibo.Run(ctx, FibonacciInput{N: n}) + + if err != nil { + fmt.Printf("Error in Fibonacci task %d: %v\n", n, err) + return nil, err + } + + fmt.Printf("Fibonacci result for %d: %d\n", n, result.Result) + } + } + + return nil, nil + }, + hatchet, + ) + + return fibo, parent } func fibo(n int) int { @@ -76,4 +106,4 @@ func fibo(n int) int { return n } return fibo(n-1) + fibo(n-2) -} \ No newline at end of file +} diff --git a/competitors/hatchet/flows/python/worker.py b/competitors/hatchet/flows/python/worker.py index c53967a..d7cae1d 100644 --- a/competitors/hatchet/flows/python/worker.py +++ b/competitors/hatchet/flows/python/worker.py @@ -1,33 +1,59 @@ -import os -from hatchet_sdk import Context, Hatchet +from functools import cache from pydantic import BaseModel +from hatchet_sdk import Context, Hatchet hatchet = Hatchet(debug=True) -class WorkflowInput(BaseModel): - n: int = 10 +class FibonacciInput(BaseModel): + n: int + +class FibonacciTriggerInput(BaseModel): + n: int + iterations: int + parallel: bool + +class FibonacciOutput(BaseModel): + result: int -fibo_wf = hatchet.workflow(name="StaticSequentialFibo", input_validator=WorkflowInput) +class FibonacciTriggerOutput(BaseModel): + results: list[FibonacciOutput] +@cache def fibo(n: int) -> int: if n <= 1: return n - return fibo(n - 1) + fibo(n - 2) - -ITERATIONS = int(os.environ.get("ITERATIONS", 5)) -task_refs = [] -for i in range(ITERATIONS): - @fibo_wf.task(name=f"fibo_task_{i}", parents=[task_refs[i-1]] if i > 0 else []) - def fibo_task(input: WorkflowInput, ctx: Context, idx=i) -> dict: - print(f"Task {idx}: Computing fibo({input.n})") - result = fibo(input.n) - return {"index": idx, "fibo": result} + return fibo(n - 1) + fibo(n - 2) - task_refs.append(fibo_task) +@hatchet.task(input_validator=FibonacciInput) +def compute_fibonacci(input: FibonacciInput, _: Context) -> FibonacciOutput: + return FibonacciOutput(result=fibo(input.n)) + +@hatchet.task(input_validator=FibonacciTriggerInput) +async def fibonacci_parent( + input: FibonacciTriggerInput, _: Context +) -> FibonacciTriggerOutput: + if input.parallel: + return FibonacciTriggerOutput( + results=await compute_fibonacci.aio_run_many( + [ + compute_fibonacci.create_bulk_run_item(FibonacciInput(n=input.n)) + for _ in range(input.iterations) + ] + ) + ) + + return FibonacciTriggerOutput( + results=[ + await compute_fibonacci.aio_run(FibonacciInput(n=input.n)) + for _ in range(input.iterations) + ] + ) def main() -> None: - worker = hatchet.worker(slots=1, name="fibo-worker", workflows=[fibo_wf]) + worker = hatchet.worker( + slots=100, name="fibo-worker", workflows=[fibonacci_parent, compute_fibonacci] + ) worker.start() if __name__ == "__main__": From 8c8706ade77e60836aa6b41c4d92492a71f32af3 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 15 Aug 2025 16:25:59 -0400 Subject: [PATCH 2/3] set slots to 1, use child spawning --- .../flows/go/hatchet-go-quickstart/cmd/worker/main.go | 3 ++- .../hatchet-go-quickstart/workflows/fibo_workflow.go | 11 +++++++---- competitors/hatchet/flows/python/trigger.py | 4 ++-- competitors/hatchet/flows/python/worker.py | 6 +++--- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go index c919ba8..1d67e58 100644 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/cmd/worker/main.go @@ -24,7 +24,8 @@ func main() { Workflows: []workflow.WorkflowBase{ child, parent, }, - Slots: 100, + Slots: 1, + DurableSlots: 1, }, ) diff --git a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go b/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go index 9a1bfb0..62071ba 100644 --- a/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go +++ b/competitors/hatchet/flows/go/hatchet-go-quickstart/workflows/fibo_workflow.go @@ -41,11 +41,11 @@ func FibonacciWorkflow(hatchet v1.HatchetClient) (workflow.WorkflowDeclaration[F hatchet, ) - parent := factory.NewTask( + parent := factory.NewDurableTask( create.StandaloneTask{ Name: "fibo-parent-2", }, - func(ctx worker.HatchetContext, input ParentInput) (*ParentOutput, error) { + func(ctx worker.DurableHatchetContext, input ParentInput) (*ParentOutput, error) { iterations := 100 if iterStr := os.Getenv("ITERATIONS"); iterStr != "" { if iter, err := strconv.Atoi(iterStr); err == nil && iter > 0 { @@ -63,8 +63,11 @@ func FibonacciWorkflow(hatchet v1.HatchetClient) (workflow.WorkflowDeclaration[F go func() { defer wg.Done() n := input.N + key := fmt.Sprintf("fibo-task-%d", i) fmt.Printf("Fibonacci task %d called\n", n) - result, err := fibo.Run(ctx, FibonacciInput{N: n}) + result, err := fibo.RunAsChild(ctx, FibonacciInput{N: n}, workflow.RunAsChildOpts{ + Key: &key, + }) if err != nil { fmt.Printf("Error in Fibonacci task %d: %v\n", n, err) @@ -82,7 +85,7 @@ func FibonacciWorkflow(hatchet v1.HatchetClient) (workflow.WorkflowDeclaration[F for i := 0; i < iterations; i++ { fmt.Printf("Fibonacci task %d called\n", i) n := input.N - result, err := fibo.Run(ctx, FibonacciInput{N: n}) + result, err := fibo.RunAsChild(ctx, FibonacciInput{N: n}, workflow.RunAsChildOpts{}) if err != nil { fmt.Printf("Error in Fibonacci task %d: %v\n", n, err) diff --git a/competitors/hatchet/flows/python/trigger.py b/competitors/hatchet/flows/python/trigger.py index 85f7a2b..ff81a68 100644 --- a/competitors/hatchet/flows/python/trigger.py +++ b/competitors/hatchet/flows/python/trigger.py @@ -1,12 +1,12 @@ import argparse -from worker import fibo_wf, WorkflowInput +from worker import fibonacci_parent, FibonacciTriggerInput def main(): parser = argparse.ArgumentParser() parser.add_argument('--n', type=int, default=10) args = parser.parse_args() - result = fibo_wf.run(WorkflowInput(n=args.n)) + result = fibonacci_parent.run(FibonacciTriggerInput(n=args.n)) print(result) if __name__ == "__main__": diff --git a/competitors/hatchet/flows/python/worker.py b/competitors/hatchet/flows/python/worker.py index d7cae1d..ebf7921 100644 --- a/competitors/hatchet/flows/python/worker.py +++ b/competitors/hatchet/flows/python/worker.py @@ -1,6 +1,6 @@ from functools import cache -from pydantic import BaseModel from hatchet_sdk import Context, Hatchet +from pydantic import BaseModel hatchet = Hatchet(debug=True) @@ -29,7 +29,7 @@ def fibo(n: int) -> int: def compute_fibonacci(input: FibonacciInput, _: Context) -> FibonacciOutput: return FibonacciOutput(result=fibo(input.n)) -@hatchet.task(input_validator=FibonacciTriggerInput) +@hatchet.durable_task(input_validator=FibonacciTriggerInput) async def fibonacci_parent( input: FibonacciTriggerInput, _: Context ) -> FibonacciTriggerOutput: @@ -52,7 +52,7 @@ async def fibonacci_parent( def main() -> None: worker = hatchet.worker( - slots=100, name="fibo-worker", workflows=[fibonacci_parent, compute_fibonacci] + slots=1, durable_slots=1, name="fibo-worker", workflows=[fibonacci_parent, compute_fibonacci] ) worker.start() From abdd524515f8ddf892595571b6c671114c4ad880 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 15 Aug 2025 16:34:36 -0400 Subject: [PATCH 3/3] remove @cache, import os and update config loader --- competitors/hatchet/flows/python/worker.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/competitors/hatchet/flows/python/worker.py b/competitors/hatchet/flows/python/worker.py index ebf7921..a25ff50 100644 --- a/competitors/hatchet/flows/python/worker.py +++ b/competitors/hatchet/flows/python/worker.py @@ -1,4 +1,4 @@ -from functools import cache +import os from hatchet_sdk import Context, Hatchet from pydantic import BaseModel @@ -9,8 +9,8 @@ class FibonacciInput(BaseModel): class FibonacciTriggerInput(BaseModel): n: int - iterations: int - parallel: bool + iterations: int = int(os.getenv("ITERATIONS", "10")) + parallel: bool = bool(os.getenv("PARALLEL", "True") == "True") class FibonacciOutput(BaseModel): result: int @@ -18,7 +18,6 @@ class FibonacciOutput(BaseModel): class FibonacciTriggerOutput(BaseModel): results: list[FibonacciOutput] -@cache def fibo(n: int) -> int: if n <= 1: return n