Skip to content

radioactive-labs/chrono_forge

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

36 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

ChronoForge

Gem Version Ruby License: MIT

A robust framework for building durable, distributed workflows in Ruby on Rails applications

ChronoForge provides a powerful solution for handling long-running processes, managing state, and recovering from failures in your Rails applications. Built on top of ActiveJob, it ensures your critical business processes remain resilient and traceable.

🌟 Features

  • Durable Execution: Automatically tracks and recovers from failures during workflow execution
  • State Management: Built-in workflow state tracking with persistent context storage
  • Concurrency Control: Advanced locking mechanisms to prevent parallel execution of the same workflow
  • Error Handling: Comprehensive error tracking with configurable retry strategies
  • Execution Logging: Detailed logging of workflow steps and errors for visibility
  • Wait States: Support for time-based waits and condition-based waiting
  • Database-Backed: All workflow state is persisted to ensure durability
  • ActiveJob Integration: Compatible with all ActiveJob backends, though database-backed processors (like Solid Queue) provide the most reliable experience for long-running workflows

πŸ“¦ Installation

Add to your application's Gemfile:

gem 'chrono_forge'

Then execute:

$ bundle install

Or install directly:

$ gem install chrono_forge

After installation, run the generator to create the necessary database migrations:

$ rails generate chrono_forge:install
$ rails db:migrate

πŸ“‹ Usage

Creating and Executing Workflows

ChronoForge workflows are ActiveJob classes that prepend the ChronoForge::Executor module. Each workflow can only accept keyword arguments:

# Define your workflow class
class OrderProcessingWorkflow < ApplicationJob
  prepend ChronoForge::Executor
  
  def perform(order_id:, customer_id:)
    # Workflow steps...
  end
end

All workflows require a unique identifier when executed. This identifier is used to track and manage the workflow:

# Execute the workflow
OrderProcessingWorkflow.perform_later(
  "order-123",                 # Unique workflow key
  order_id: "order-134",       # Custom kwargs
  customer_id: "customer-456"  # More custom kwargs
)

Basic Workflow Example

Here's a complete example of a durable order processing workflow:

class OrderProcessingWorkflow < ApplicationJob
  prepend ChronoForge::Executor

  def perform(order_id:)
    @order_id = order_id

    # Context can be used to pass and store data between executions
    context.set_once "execution_id", SecureRandom.hex

    # Wait until payment is confirmed
    wait_until :payment_confirmed?

    # Wait for potential fraud check
    wait 1.minute, :fraud_check_delay

    # Durably execute order processing
    durably_execute :process_order

    # Final steps
    durably_execute :complete_order
  end

  private

  def payment_confirmed?
    PaymentService.confirmed?(@order_id, context["execution_id"])
  end

  def process_order
    OrderProcessor.process(@order_id, context["execution_id"])
    context["processed_at"] = Time.current.iso8601
  end

  def complete_order
    OrderCompletionService.complete(@order_id, context["execution_id"])
    context["completed_at"] = Time.current.iso8601
  end
end

Core Workflow Features

πŸš€ Executing Workflows

ChronoForge workflows are executed through ActiveJob's standard interface with a specific parameter structure:

# Perform the workflow immediately
OrderProcessingWorkflow.perform_now(
  "order-123",                     # Unique workflow key
  order_id: "O-123",               # Custom parameter
  customer_id: "C-456"             # Another custom parameter
)

# Or queue it for background processing
OrderProcessingWorkflow.perform_later(
  "order-123-async",               # Unique workflow key
  order_id: "O-124",
  customer_id: "C-457"
)

Important: Workflows must use keyword arguments only, not positional arguments.

⚑ Durable Execution

The durably_execute method ensures operations are executed exactly once with automatic retry logic and fault tolerance:

# Basic execution
durably_execute :send_welcome_email

# With custom retry attempts
durably_execute :critical_payment_processing, max_attempts: 5

# With custom name for tracking multiple calls to same method
durably_execute :upload_file, name: "profile_image_upload"

# Complex example with error-prone operation
class FileProcessingWorkflow < ApplicationJob
  prepend ChronoForge::Executor

  def perform(file_id:)
    @file_id = file_id
    
    # This might fail due to network issues, rate limits, etc.
    durably_execute :upload_to_s3, max_attempts: 5
    
    # Process file after successful upload
    durably_execute :generate_thumbnails, max_attempts: 3
  end

  private

  def upload_to_s3
    file = File.find(@file_id)
    S3Client.upload(file.path, bucket: 'my-bucket')
    Rails.logger.info "Successfully uploaded file #{@file_id} to S3"
  end

  def generate_thumbnails
    ThumbnailService.generate(@file_id)
  end
end

Key Features:

  • Idempotent: Same operation won't be executed twice during replays
  • Automatic Retries: Failed executions retry with exponential backoff (2^attempt seconds, capped at 32s)
  • Error Tracking: All failures are logged with detailed error information
  • Configurable: Customize retry attempts and step naming

⏱️ Wait States

ChronoForge supports three types of wait states, each optimized for different use cases:

1. Time-based Waits (wait)

For simple delays and scheduled pauses:

# Simple delays
wait 30.minutes, "cooling_period"
wait 1.day, "daily_batch_interval"

# Complex workflow with multiple waits
def user_onboarding_flow
  durably_execute :send_welcome_email
  wait 1.hour, "welcome_delay"
  
  durably_execute :send_tutorial_email
  wait 2.days, "tutorial_followup"
  
  durably_execute :send_feedback_request
end

2. Automated Condition Waits (wait_until)

For conditions that can be automatically polled at regular intervals:

# Wait for external API
wait_until :external_api_ready?, 
  timeout: 30.minutes, 
  check_interval: 1.minute

# Wait with retry on specific errors
wait_until :database_migration_complete?,
  timeout: 2.hours,
  check_interval: 30.seconds,
  retry_on: [ActiveRecord::ConnectionNotEstablished, Net::TimeoutError]

# Complex condition example
def third_party_service_ready?
  response = HTTParty.get("https://api.example.com/health")
  response.code == 200 && response.body.include?("healthy")
end

wait_until :third_party_service_ready?,
  timeout: 1.hour,
  check_interval: 2.minutes,
  retry_on: [Net::TimeoutError, Net::HTTPClientException]

3. Event-driven Waits (continue_if)

For conditions that depend on external events like webhooks, requiring manual workflow continuation:

# Basic usage - wait for webhook-driven state change
continue_if :payment_confirmed?

# With custom name for better tracking
continue_if :payment_confirmed?, name: "stripe_webhook"

# Wait for manual approval
continue_if :document_approved?

# Wait for external file processing
continue_if :processing_complete?

# Multiple waits with same condition but different contexts
continue_if :external_system_ready?, name: "payment_gateway"
# ... other steps ...
continue_if :external_system_ready?, name: "inventory_system"

# Complete workflow example
class PaymentWorkflow < ApplicationJob
  prepend ChronoForge::Executor

  def perform(order_id:)
    @order_id = order_id
    
    # Initialize payment
    durably_execute :create_payment_request
    
    # Wait for external payment confirmation (webhook-driven)
    continue_if :payment_confirmed?, name: "stripe_confirmation"
    
    # Complete order after payment
    durably_execute :fulfill_order
  end

  private

  def payment_confirmed?
    PaymentService.confirmed?(@order_id)
  end
end

# Later, when webhook arrives:
PaymentService.mark_confirmed(order_id)
PaymentWorkflow.perform_later("order-#{order_id}", order_id: order_id)

When to Use Each Wait Type:

Wait Type Use Case Polling Resource Usage Response Time
wait Fixed delays, rate limiting None Minimal Exact timing
wait_until API readiness, data processing Automatic Medium Check interval
continue_if Webhooks, user actions, file uploads Manual only Minimal Immediate

Key Differences:

  • wait: Time-based, no condition checking, resumes automatically
  • wait_until: Condition-based with automatic polling, resumes when condition becomes true or timeout
  • continue_if: Condition-based without polling, requires manual workflow retry when condition might have changed

πŸ”„ Periodic Tasks

The durably_repeat method enables robust periodic task execution within workflows. Tasks are scheduled at regular intervals until a specified condition is met, with automatic catch-up for missed executions and configurable error handling.

class NotificationWorkflow < ApplicationJob
  prepend ChronoForge::Executor

  def perform(user_id:)
    @user_id = user_id
    
    # Send reminders every 3 days until user completes onboarding
    durably_repeat :send_reminder_email, 
      every: 3.days, 
      till: :user_onboarded?
    
    # Critical payment processing every hour - fail workflow if it fails
    durably_repeat :process_pending_payments,
      every: 1.hour,
      till: :all_payments_processed?,
      on_error: :fail_workflow
  end

  private

  def send_reminder_email(scheduled_time = nil)
    # Optional parameter receives the scheduled execution time
    if scheduled_time
      lateness = Time.current - scheduled_time
      Rails.logger.info "Reminder scheduled for #{scheduled_time}, running #{lateness.to_i}s late"
    end
    
    UserMailer.onboarding_reminder(@user_id).deliver_now
  end

  def user_onboarded?
    User.find(@user_id).onboarded?
  end

  def process_pending_payments
    PaymentProcessor.process_pending_for_user(@user_id)
  end

  def all_payments_processed?
    Payment.where(user_id: @user_id, status: :pending).empty?
  end
end

Key Features:

  • Idempotent Execution: Each repetition gets a unique execution log, preventing duplicates during replays
  • Automatic Catch-up: Missed executions due to downtime are automatically skipped using timeout-based fast-forwarding
  • Flexible Timing: Support for custom start times and precise interval scheduling
  • Error Resilience: Individual execution failures don't break the periodic schedule
  • Configurable Error Handling: Choose between continuing despite failures or failing the entire workflow

Advanced Options:

durably_repeat :generate_daily_report,
  every: 1.day,                          # Execution interval
  till: :reports_complete?,              # Stop condition
  start_at: Date.tomorrow.beginning_of_day, # Custom start time (optional)
  max_attempts: 5,                       # Retries per execution (default: 3)
  timeout: 2.hours,                      # Catch-up timeout (default: 1.hour)
  on_error: :fail_workflow,              # Error handling (:continue or :fail_workflow)
  name: "daily_reports"                  # Custom task name (optional)

Method Parameters:

Your periodic methods can optionally receive the scheduled execution time as their first argument:

# Without scheduled time parameter
def cleanup_files
  FileCleanupService.perform
end

# With scheduled time parameter
def cleanup_files(scheduled_time)
  # Use scheduled time for business logic
  cleanup_date = scheduled_time.to_date
  FileCleanupService.perform(date: cleanup_date)
  
  # Log timing information
  delay = Time.current - scheduled_time
  Rails.logger.info "Cleanup was #{delay.to_i} seconds late"
end

πŸ”„ Workflow Context

ChronoForge provides a persistent context that survives job restarts. The context behaves like a Hash but with additional capabilities:

# Set context values
context[:user_name] = "John Doe"
context[:status] = "processing"

# Read context values
user_name = context[:user_name]

# Using the fetch method (returns default if key doesn't exist)
status = context.fetch(:status, "pending")

# Set a value with the set method (alias for []=)
context.set(:total_amount, 99.99)

# Set a value only if the key doesn't already exist
context.set_once(:created_at, Time.current.iso8601)

# Check if a key exists
if context.key?(:user_id)
  # Do something with the user ID
end

The context supports serializable Ruby objects (Hash, Array, String, Integer, Float, Boolean, and nil) and validates types automatically.

πŸ›‘οΈ Error Handling

ChronoForge automatically tracks errors and provides configurable retry capabilities:

class MyWorkflow < ApplicationJob
  prepend ChronoForge::Executor

  private

  def should_retry?(error, attempt_count)
    case error
    when NetworkError
      attempt_count < 5  # Retry network errors up to 5 times
    when ValidationError
      false  # Don't retry validation errors
    else
      attempt_count < 3  # Default retry policy
    end
  end
end

πŸ§ͺ Testing

ChronoForge is designed to be easily testable using ChaoticJob, a testing framework that makes it simple to test complex job workflows:

  1. Add ChaoticJob to your Gemfile's test group:
group :test do
  gem 'chaotic_job'
end
  1. Set up your test helper:
# test_helper.rb
require 'chrono_forge'
require 'minitest/autorun'
require 'chaotic_job'

Example test:

class WorkflowTest < ActiveJob::TestCase
  include ChaoticJob::Helpers

  def test_workflow_completion
    # Enqueue the job with a unique key and custom parameters
    OrderProcessingWorkflow.perform_later(
      "order-test-123",
      order_id: "O-123",
      customer_id: "C-456"
    )
    
    # Perform all enqueued jobs
    perform_all_jobs
    
    # Assert workflow completed successfully
    workflow = ChronoForge::Workflow.find_by(key: "order-test-123")
    assert workflow.completed?
    
    # Check workflow context
    assert workflow.context["processed_at"].present?
    assert workflow.context["completed_at"].present?
  end
end

πŸ—„οΈ Database Schema

ChronoForge creates three main tables:

  1. chrono_forge_workflows: Stores workflow state and context
  2. chrono_forge_execution_logs: Tracks individual execution steps
  3. chrono_forge_error_logs: Records detailed error information

πŸ” When to Use ChronoForge

ChronoForge is ideal for:

  • Long-running business processes - Order processing, account registration flows
  • Processes requiring durability - Financial transactions, data migrations
  • Multi-step workflows - Onboarding flows, approval processes, multi-stage jobs
  • State machines with time-based transitions - Document approval, subscription lifecycle

🧠 Advanced State Management

ChronoForge workflows follow a sophisticated state machine model to ensure durability and fault tolerance. Understanding these states and transitions is essential for troubleshooting and recovery.

Workflow State Diagram

stateDiagram-v2
    [*] --> created: Workflow Created
    created --> idle: Initial State
    idle --> running: Job Started
    running --> idle: Waiting
    running --> completed: All Steps Completed
    running --> failed: Max Retries Exhausted
    running --> stalled: Unrecoverable Error
    idle --> running: Resumed
    stalled --> [*]: Requires Manual Intervention
    failed --> [*]: Requires Manual Intervention
    completed --> [*]: Workflow Succeeded
Loading

State Descriptions

Created

  • Description: Initial state when a workflow record is first created
  • Behavior: Transitions immediately to idle state
  • Duration: Momentary

Idle

  • Description: The workflow is waiting to be processed or between processing steps
  • Behavior: Not locked, available to be picked up by job processor
  • Duration: Can be minutes to days, depending on wait conditions

Running

  • Description: The workflow is actively being processed
  • Identifiers: Has locked_at and locked_by values set
  • Behavior: Protected against concurrent execution
  • Duration: Should be brief unless performing long operations

Completed

  • Description: The workflow has successfully executed all steps
  • Identifiers: Has completed_at timestamp, state = "completed"
  • Behavior: Final state, no further processing
  • Typical Exit Points: All processing completed successfully

Failed

  • Description: The workflow has failed after exhausting retry attempts
  • Identifiers: Has failure-related data in error_logs, state = "failed"
  • Behavior: No automatic recovery, requires manual intervention
  • Typical Exit Points: Max retries exhausted, explicit failure, non-retryable error

Stalled

  • Description: The workflow encountered an unrecoverable error but wasn't explicitly failed
  • Identifiers: Not completed, not running, has errors in error_logs
  • Behavior: Requires manual investigation and intervention
  • Typical Exit Points: ExecutionFailedError, unexpected exceptions, system failures

Handling Different Workflow States

Recovering Stalled/Failed Workflows

workflow = ChronoForge::Workflow.find_by(key: "order-123")

if workflow.stalled? || workflow.failed?
  job_class = workflow.job_class.constantize
  
  # Retry immediately
  job_class.retry_now(workflow.key)
  
  # Or retry asynchronously
  job_class.retry_later(workflow.key)
end

Monitoring Running Workflows

Long-running workflows might indicate issues:

# Find workflows running for too long
long_running = ChronoForge::Workflow.where(state: :running)
                                   .where('locked_at < ?', 30.minutes.ago)

long_running.each do |workflow|
  # Log potential issues for investigation
  Rails.logger.warn "Workflow #{workflow.key} has been running for >30 minutes"
  
  # Optionally force unlock if you suspect deadlock
  # CAUTION: Only do this if you're certain the job is stuck
  # workflow.update!(locked_at: nil, locked_by: nil, state: :idle)
end

πŸš€ Development

After checking out the repo, run:

$ bin/setup                 # Install dependencies
$ bundle exec rake test     # Run the tests
$ bin/appraise              # Run the full suite of appraisals
$ bin/console               # Start an interactive console

The test suite uses SQLite by default and includes:

  • Unit tests for core functionality
  • Integration tests with ActiveJob
  • Example workflow implementations

πŸ‘₯ Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin feature/my-new-feature)
  5. Create a new Pull Request

Please include tests for any new features or bug fixes.

πŸ“œ License

This gem is available as open source under the terms of the MIT License.

πŸ“š API Reference

Core Workflow Methods

Method Purpose Key Parameters
durably_execute Execute method with retry logic method, max_attempts: 3, name: nil
wait Time-based pause duration, name
wait_until Condition-based waiting condition, timeout: 1.hour, check_interval: 15.minutes, retry_on: []
continue_if Manual continuation wait condition, name: nil
durably_repeat Periodic task execution method, every:, till:, start_at: nil, max_attempts: 3, timeout: 1.hour, on_error: :continue

Context Methods

Method Purpose Example
context[:key] = value Set context value context[:user_id] = 123
context[:key] Get context value user_id = context[:user_id]
context.set(key, value) Set context value (alias) context.set(:status, "active")
context.set_once(key, value) Set only if key doesn't exist context.set_once(:created_at, Time.current)
context.fetch(key, default) Get with default value context.fetch(:count, 0)
context.key?(key) Check if key exists context.key?(:user_id)

About

A robust framework for building durable, distributed workflows in Ruby on Rails applications

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published