Skip to content

oagudo/outbox

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Release Software License GitHub Actions codecov Go Report Card Go Reference

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

Key Features

  • Minimal External Dependencies: Doesn't impose additional dependencies (like specific Kafka, MySQL, etc.) other than google/uuid on users of this library.
  • Database Agnostic: Designed to work with PostgreSQL, MySQL, Oracle and other relational databases.
  • Message Broker Agnostic: Integrates seamlessly with popular brokers like Kafka, NATS, RabbitMQ, and others.
  • Optimistic Publishing: Optional immediate async message publishing after transaction commit for reduced latency, with guaranteed delivery fallback.
  • Configurable Retry & Backoff Policies: Fixed or exponential back-off with adjustable initial and maximum delay.
  • Max Attempts Safeguard: Automatically discards messages that exceed a configurable maxAttempts threshold, enabling dead-letter routing or alert on poison events.
  • Observability: Exposes channels for processing errors and discarded messages, making it easy to integrate with your metrics and alerting systems.
  • Simplicity: Minimal, easy-to-understand codebase focused on core outbox pattern concepts.
  • Extensible: Designed for easy customization and integration into your own projects.

Usage

The library consists of two main components:

  1. Writer: Stores your entity and corresponding message atomically within a transaction
  2. Reader: Publishes stored messages to your message broker in the background

The Writer

The Writer ensures your entity and outbox message are stored together atomically:

// Setup database connection
db, _ := sql.Open("pgx", "postgres://user:password@localhost:5432/outbox?sslmode=disable")

// Create a DBContext and Writer instance
dbCtx := outbox.NewDBContext(outbox.NewDB(db), outbox.SQLDialectPostgres)
writer := outbox.NewWriter(dbCtx)

// In your business logic:
//
// Create your entity and outbox message
entity := Entity{
    ID:        uuid.New(),
    CreatedAt: time.Now().UTC(),
}

payload, _ := json.Marshal(entity)
metadata := json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)
msg := outbox.NewMessage(payload,
    outbox.WithCreatedAt(entity.CreatedAt),
    outbox.WithMetadata(metadata))

// Write message and entity in a single transaction
err = writer.Write(ctx, msg,
    // This user-defined callback executes queries within the
    // same transaction that stores the outbox message
    func(ctx context.Context, txQueryer outbox.TxQueryer) error {
        _, err := txQueryer.ExecContext(ctx,
            "INSERT INTO entity (id, created_at) VALUES ($1, $2)",
            entity.ID, entity.CreatedAt,
        )
        return err
    })
πŸš€ Optimistic Publishing (Optional)

Optimistic publishing attempts to publish messages immediately after transaction commit, reducing latency while maintaining guaranteed delivery through the background reader as fallback.

How It Works

  1. Transaction commits (entity + outbox message stored)
  2. Immediate publish attempt to broker (asynchronously, will not block the incoming request)
  3. On success: message is removed from outbox
  4. On failure: background reader handles delivery later

Configuration

// Create publisher (see Reader section below)
publisher := &messagePublisher{}

// Enable optimistic publishing in writer
writer := outbox.NewWriter(dbCtx, outbox.WithOptimisticPublisher(publisher))

Important considerations:

  • Publishing happens asynchronously after transaction commit
  • Message consumers must be idempotent as messages could be published twice - by the optimistic publisher and the reader (Note: consumer idempotency is a good practice regardless of optimistic publishing, though some brokers also provide deduplication features)
  • Publishing failures don't affect your transactions - they don't cause Write() to fail

The Reader

The Reader periodically checks for unsent messages and publishes them to your message broker:

// Create a message publisher implementation
type messagePublisher struct {
    // Your message broker client (e.g., Kafka, RabbitMQ)
}
func (p *messagePublisher) Publish(ctx context.Context, msg *outbox.Message) error {
    // Publish the message to your broker. See examples below for specific implementations
    return nil
}

// Create and start the reader
reader := outbox.NewReader(
    dbCtx,                                  // Database context
    &messagePublisher{},                    // Publisher implementation
    outbox.WithInterval(5*time.Second),     // Polling interval (default: 10s)
    outbox.WithReadBatchSize(200),          // Read batch size (default: 100)
    outbox.WithDeleteBatchSize(50),         // Delete batch size (default: 20)
    outbox.WithMaxAttempts(300),            // Discard after 300 attempts (default: MaxInt32)
    outbox.WithDelayStrategy(
        outbox.DelayStrategyExponential),   // Retry/backoff strategy (default: Exponential; can also use Fixed)
    outbox.WithDelay(500*time.Millisecond), // Initial delay between attempts (default: 200ms)
    outbox.WithMaxDelay(30*time.Minute),    // Maximum delay for exponential backoff (default: 1h)
)
reader.Start()
defer reader.Stop(context.Background()) // Stop during application shutdown


// Monitor standard processing errors (read / publish / delete / update).
go func() {
    for err := range reader.Errors() {
        log.Printf("outbox %s error: %v", err.Op, err.Err)
    }
}()

// Monitor discarded messages (hit the max-attempts threshold).
go func() {
    for msg := range reader.DiscardedMessages() {
        log.Printf("outbox message %s discarded after %d attempts",
            msg.ID, msg.TimesAttempted)
        // Example next steps:
        //   β€’ forward to a dead-letter topic
        //   β€’ raise an alert / metric
        //   β€’ persist for manual inspection
    }
}()

Database Setup

1. Choose Your Database Dialect

The library supports multiple relational databases. Configure the appropriate SQLDialect when creating the DBContext. Supported dialects are PostgreSQL, MySQL, MariaDB, SQLite, Oracle and SQL Server.

// Example creating a DBContext with MySQL dialect
dbCtx := outbox.NewDBContext(outbox.NewDB(db), outbox.SQLDialectMySQL)

2. Create the Outbox Table

The outbox table stores messages that need to be published to your message broker. Choose your database below:

🐘 PostgreSQL
CREATE TABLE IF NOT EXISTS outbox (
    id UUID PRIMARY KEY,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
    scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
    metadata BYTEA,
    payload BYTEA NOT NULL,
    times_attempted INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
πŸ“Š MySQL
CREATE TABLE IF NOT EXISTS outbox (
    id BINARY(16) PRIMARY KEY,
    created_at TIMESTAMP(3) NOT NULL,
    scheduled_at TIMESTAMP(3) NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
🐬 MariaDB
CREATE TABLE IF NOT EXISTS outbox (
    id UUID PRIMARY KEY,
    created_at TIMESTAMP(3) NOT NULL,
    scheduled_at TIMESTAMP(3) NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
πŸ—ƒοΈ SQLite
CREATE TABLE IF NOT EXISTS outbox (
    id TEXT PRIMARY KEY,
    created_at DATETIME NOT NULL,
    scheduled_at DATETIME NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
πŸ›οΈ Oracle
CREATE TABLE outbox (
    id RAW(16) PRIMARY KEY,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
    scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
    metadata BLOB,
    payload BLOB NOT NULL,
    times_attempted NUMBER(10) NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);
πŸͺŸ SQL Server
CREATE TABLE outbox (
    id UNIQUEIDENTIFIER PRIMARY KEY,
    created_at DATETIMEOFFSET(3) NOT NULL,
    scheduled_at DATETIMEOFFSET(3) NOT NULL,
    metadata VARBINARY(MAX),
    payload VARBINARY(MAX) NOT NULL,
    times_attempted INT NOT NULL
);

CREATE INDEX idx_outbox_created_at ON outbox (created_at);
CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

Examples

Complete working examples for different databases and message brokers:

To run an example:

cd examples/postgres-kafka # or examples/oracle-nats or examples/mysql-rabitmq
../../scripts/up-and-wait.sh
go run service.go

# In another terminal trigger a POST to trigger entity creation
curl -X POST http://localhost:8080/entity

FAQ

What happens when multiple instances of my service use the library?

When running multiple instances of your service, each with its own reader, be aware that:

  • Multiple readers will independently poll for messages
  • This can result in duplicate message publishing
  • To handle this, you can either:
    1. Ensure your consumers are idempotent and accept duplicates
    2. Use broker-side deduplication if available (e.g. NATS JetStream's Msg-Id)
    3. Run the reader in a single instance only (e.g. single replica deployment in k8s with reader)

The optimistic publisher feature can significantly reduce the number of duplicates. With optimistic publisher messages are delivered as soon as they are committed, so readers will usually see no messages in the outbox table.

Note that even in single instance deployments, message duplicates can still occur (e.g. if the service crashes right after successfully publishing to the broker). However, these duplicates are less frequent compared to multi instance deployments.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

About

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published