-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add worker pool and optimize message processing #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds Config.MaxWorkers and a concurrent worker pool to the indexer with 1s Kafka polling and per-worker Sentry/logging; adds CLI flag/env wiring for max-workers. Updates DB: BatchSize → 200, QueryTimeout → 30s, and adds GORM NewClient and Ping helpers. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant K as Kafka Consumer
participant I as Indexer
participant W as Worker[1..N]
participant DLQ as DLQ
participant S as Sentry
rect rgb(245,250,255)
note over I: Startup — init buffered workChan\nspawn N workers
I->>W: workers ready
end
loop Poll every 1s
K->>I: ReadMessage(timeout=1s)
alt Message received
I-->>W: enqueue message on workChan
else Timeout / none
I-->>I: continue polling (non-fatal)
end
end
par For each dequeued message
W->>W: create per-message context\nset Sentry scope (worker_id)
W->>W: process message
alt Success
W-->>K: commit offset
else Failure
W-->>DLQ: publish to DLQ
W-->>K: commit offset (after DLQ)
W-->>S: capture error
end
end
rect rgb(255,245,240)
note over I,W: Shutdown
I-->>I: close workChan
W-->>W: drain remaining messages and exit
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to data retention organization setting Knowledge base: Disabled due to 📒 Files selected for processing (1)
💤 Files with no reviewable changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
Comment |
Docstrings generation was requested by @SeUkKim. * #172 (comment) The following files were modified: * `pkg/db/db.go`
Note Generated docstrings for this pull request at #173 |
Docstrings generation was requested by @SeUkKim. * #172 (comment) The following files were modified: * `pkg/db/db.go` Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pkg/db/db.go (1)
29-33
: Review the 5-minute transaction timeout and consider connection pool settings.
DefaultTransactionTimeout
is set toQueryTimeout
(5 minutes), which is unusually long and may lead to prolonged locks and connection exhaustion under load, especially with the new worker pool. Additionally, connection pool settings (max open/idle connections, connection lifetime) are not configured.Consider:
- Reducing
DefaultTransactionTimeout
to a more typical value (e.g., 30-60 seconds) unless long-running transactions are intentional.- Adding connection pool configuration:
func NewClient(databaseURL string) (*gorm.DB, error) { - return gorm.Open(postgres.Open(databaseURL), &gorm.Config{DefaultTransactionTimeout: QueryTimeout, PrepareStmt: true}) + db, err := gorm.Open(postgres.Open(databaseURL), &gorm.Config{ + DefaultTransactionTimeout: 30 * time.Second, // or keep QueryTimeout if needed + PrepareStmt: true, + }) + if err != nil { + return nil, err + } + + sqlDB, err := db.DB() + if err != nil { + return nil, err + } + + // Configure connection pool for worker pool usage + sqlDB.SetMaxOpenConns(25) // adjust based on worker count + sqlDB.SetMaxIdleConns(5) + sqlDB.SetConnMaxLifetime(time.Hour) + sqlDB.SetConnMaxIdleTime(10 * time.Minute) + + return db, nil }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base
setting
📒 Files selected for processing (1)
pkg/db/db.go
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
pkg/db/db.go (2)
35-39
: LGTM!The
Ping
function correctly verifies database connectivity using a standardSELECT 1
query with proper context handling.
18-18
: BatchSize increase safe: even for tables with up to ~20 columns, 200×20 = 4 000 parameters per batch, well under PostgreSQL’s 32 767 limit.
… pool settings for worker pool usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pkg/db/db.go (1)
45-48
: Make DB connection pool settings configurable and align with worker count
Default workers flag (--workers
) defaults to 10 but can be increased—ensureMaxOpenConns
(currently 25) is configurable and set ≥NumWorkers
to prevent connection exhaustion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base
setting
📒 Files selected for processing (1)
pkg/db/db.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (4)
pkg/db/db.go (4)
18-18
: LGTM: Batch size increase aligns with concurrent processing.Doubling the batch size reduces database round-trips and complements the worker pool mentioned in the PR objectives.
32-35
: LGTM: Prepared statement caching enabled.Enabling
PrepareStmt
improves performance for repeated queries, which is beneficial given the worker pool processing pattern.
53-57
: LGTM: Standard health check implementation.The
Ping
function correctly implements a database health check usingSELECT 1
with context-based timeout control.
21-21
: Confirm 30s timeout covers all batch operations.There are 28 CreateInBatches calls in pkg/db/db.go; under peak load with BatchSize=200 and concurrent workers, ensure these complete within the 30s QueryTimeout to avoid unintended failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base
setting
📒 Files selected for processing (2)
event-indexer/cmd/indexer/cmd.go
(5 hunks)event-indexer/indexer/indexer.go
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (8)
event-indexer/indexer/indexer.go (5)
361-361
: Reduced ReadMessage timeout improves shutdown responsiveness.The timeout reduction from 10 seconds to 1 second allows faster response to shutdown signals while maintaining efficient polling.
372-377
: Good pattern for non-blocking dispatch with cancellation.The nested select properly handles both message dispatch and shutdown cancellation, preventing blocked sends when the channel is full.
396-396
: Verify context.Background() usage.Using
context.Background()
means in-flight message processing won't be interrupted whenstopCtx
is cancelled. This is likely intentional to avoid partial processing, but it means workers may continue processing for up to 10 minutes (based onmax.poll.interval.ms
at lines 121, 134) after shutdown is requested.If this behavior is unintended, consider using a derived context with timeout:
ctx, cancel := context.WithTimeout(stopCtx, 5*time.Minute) defer cancel()
348-421
: Message ordering may be lost with concurrent processing.The worker pool processes messages concurrently, which means messages from the same partition can be processed out of order. If block height ordering is critical for your indexing logic, this could cause issues.
Verify whether out-of-order processing is acceptable for your use case. If ordering is required, consider:
- Processing messages from each partition in a dedicated worker
- Using a sequential queue per partition
348-421
: Align DB connection pool with worker count. The database is configured withSetMaxOpenConns(25)
andSetMaxIdleConns(10)
inpkg/db/db.go
; ensuref.config.MaxWorkers
does not exceed these limits or make the pool settings configurable to prevent connection exhaustion.event-indexer/cmd/indexer/cmd.go (3)
30-30
: Constant follows established naming pattern.The
FlagMaxWorkers
constant is consistent with other flag definitions in the file.
80-80
: MaxWorkers properly wired to indexer config.The value flows correctly from CLI flag to indexer configuration.
134-134
: Flag definition follows established pattern.The flag registration is consistent with other flags in the command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
event-indexer/cmd/indexer/cmd.go (1)
112-115
: LGTM! Validation issue resolved.The code now properly validates that
maxWorkers
is positive, addressing the concern from the previous review. The fallback to 10 on parse errors or non-positive values is appropriate.event-indexer/indexer/indexer.go (1)
354-362
: LGTM! Worker pool implementation with proper synchronization.The worker pool correctly uses a WaitGroup to track worker goroutines and ensures graceful shutdown by:
- Closing the work channel when stopCtx is cancelled
- Waiting for all workers to complete via
wg.Wait()
- Handling the shutdown signal both in the main loop and when enqueuing messages
The 1-second Kafka polling timeout is appropriate for responsive shutdown.
Also applies to: 367-369, 382-391
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base
setting
📒 Files selected for processing (2)
event-indexer/cmd/indexer/cmd.go
(5 hunks)event-indexer/indexer/indexer.go
(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (2)
event-indexer/cmd/indexer/cmd.go (1)
30-30
: LGTM! MaxWorkers flag integration is correct.The flag constant, flag binding, config wiring, and environment-to-flag flow are all properly implemented.
Also applies to: 54-54, 80-80, 134-134
event-indexer/indexer/indexer.go (1)
420-425
: LGTM! Worker-specific observability enhancements.The addition of
worker_id
to Sentry scope and worker identifiers in DLQ/commit error logs provides excellent visibility for debugging and monitoring concurrent worker behavior.Also applies to: 433-433, 440-440
…age processing logic
Summary by CodeRabbit
Refactor
Chores