-
Notifications
You must be signed in to change notification settings - Fork 37
perf: Optimized the ACK mechanism #784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the WalkthroughAdds a durable, WAL-backed reliable-ack subsystem (processor, WAL, tasks, metrics, backpressure, idempotent/retry wrappers), a ReliableStream and Stream-level reliable_ack wiring, plus a suite of distributed subsystems (distributed WAL, checkpointing, recovery, node registry, object storage), tests, and examples. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Stream
participant Output
participant ErrOut as ErrorOutput
participant Ack as AckHandle
participant RProc as ReliableAckProcessor
Note over Stream: output path with optional reliable ack
Stream->>Output: write(message)
alt write succeeds
alt ReliableAckProcessor present
Stream->>RProc: enqueue ack (AckId, ack_type, payload)
RProc-->>Stream: ack enqueued (async, metrics++)
Note right of RProc: WAL append + async background processing
else
Stream->>Ack: ack()
end
alt output error and error_output configured
Stream->>ErrOut: write(error_message)
end
else write fails
Stream->>Ack: ack() // fallback ack on write failure
end
sequenceDiagram
autonumber
participant Config as StreamConfig::build
participant WAL as AckWAL
participant RProc as ReliableAckProcessor
participant Stream as Stream::new/_reliable
Config->>Config: parse `reliable_ack` config
alt reliable_ack.enabled == true
Config->>WAL: init(wal_path or temp)
Config->>RProc: ReliableAckProcessor::new(tracker, token, wal_path)
Config->>Stream: Stream::new_reliable(..., Arc<RProc>)
else
Config->>Stream: Stream::new(...)
end
sequenceDiagram
autonumber
participant Input
participant Workers as ProcessorWorkers
participant OutQ as OrderedQueue
participant Output
participant RProc as ReliableAckProcessor
participant Cache as AckCache
Note over Input,Cache: ReliableStream high-level flow
Input-->>Workers: message + Ack(AckId)
Workers-->>OutQ: (seq, result, ack)
loop emit in-order
OutQ->>Output: write(next in-order)
alt success
OutQ->>RProc: ack(ack, ack_type, payload)
else failure
OutQ->>Output: optional error path
OutQ->>Ack: ack() or RProc ack based on config
end
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (10)
crates/arkflow-core/src/stream/mod.rs (3)
22-22
: Missing module import forasync_ack
The
AsyncAckProcessor
is used fromasync_ack
module but the import statement uses it directly without a module prefix. Consider using a module-qualified import for better clarity.Apply this diff to improve the import structure:
-use async_ack::AsyncAckProcessor; +use self::async_ack::AsyncAckProcessor;
409-415
: Consider partial acknowledgment strategyCurrently, acks are only sent when ALL messages in a batch are successfully written (
success_cnt >= size
). This could delay acknowledgments unnecessarily if even one message fails. Consider implementing a partial acknowledgment strategy where successfully written messages are acknowledged independently.Would you like me to propose an implementation that tracks and acknowledges individual message successes within a batch?
139-145
: Keep the clone — it's required for the spawned async task.AsyncAckProcessor is #[derive(Clone)] and do_output accepts Option by value; because do_output is spawned, you cannot pass a borrowed reference to self.ack_processor (lifetime would not hold), so cloning the Option before spawn is correct. Optional alternatives: store the processor as Option<Arc> to cheapen clones, or move ownership with self.ack_processor.take() if you don't need the field afterward.
crates/arkflow-core/src/stream/async_ack.rs (7)
15-27
: Remove duplicate license headerThe file contains a duplicate Apache License 2.0 header (lines 15-27). The first occurrence (lines 1-13) is sufficient.
Apply this diff to remove the duplicate:
-/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -
43-47
: Consider making timeout and retry parameters configurableThe constants for retries, timeouts, and batch sizes are hardcoded. For production use, these values often need tuning based on workload characteristics. Consider making these configurable through the
AsyncAckProcessor
constructor or a configuration struct.Would you like me to propose a configuration structure that allows these parameters to be customized while maintaining sensible defaults?
196-200
: Consider dynamic batch timeout based on loadThe fixed 100ms timeout for batch processing might not be optimal under varying loads. Under high load, you want larger batches; under low load, you want lower latency.
Consider implementing adaptive batching:
- _ = tokio::time::sleep(Duration::from_millis(100)) => { + // Adaptive timeout: shorter when batch is partially full + _ = tokio::time::sleep(Duration::from_millis( + if pending_tasks.len() > BATCH_SIZE / 2 { 50 } else { 100 } + )) => {
234-235
: Timeout applied per-task instead of per-batchEach task gets a full
ACK_TIMEOUT_MS
timeout during batch processing, which could make batch processing take very long if there are many tasks. Consider using a shorter per-task timeout when processing in batches.- let result = - tokio::time::timeout(Duration::from_millis(ACK_TIMEOUT_MS), task.ack.ack()).await; + // Use shorter timeout per task when in batch to prevent blocking + let timeout_ms = ACK_TIMEOUT_MS.min(1000); // 1 second max per task in batch + let result = + tokio::time::timeout(Duration::from_millis(timeout_ms), task.ack.ack()).await;
262-265
: Inefficient task removal from vectorRemoving tasks from the middle of a vector one by one (even in reverse order) is O(n²) for n removals. Consider using
retain
or swapping with a new vector.- let mut tasks_to_remove = Vec::new(); // ... in the loop ... - tasks_to_remove.push(i); + // Mark task as completed by setting a flag or moving to another collection - // Remove tasks in reverse order to maintain correct indices - for &i in tasks_to_remove.iter().rev() { - tasks.remove(i); - } + // Efficient removal using retain + tasks.retain(|task| { + // Keep tasks that haven't been successfully processed or permanently failed + task.retry_count > 0 && task.should_retry() && !task.is_expired() + });
171-210
: Consider handling backpressure when channel is fullWhen the ack channel reaches
MAX_PENDING_ACKS
capacity, new acks will block. Consider implementing backpressure handling or metrics to monitor channel saturation.impl AckProcessorWorker { async fn run(self) { info!("Async ack processor started"); let mut pending_tasks = Vec::with_capacity(BATCH_SIZE); + let mut last_channel_full_warning = Instant::now(); loop { tokio::select! { _ = self.cancellation_token.cancelled() => { break; } result = self.ack_receiver.recv_async() => { match result { Ok(task) => { + // Warn if channel is getting full (>80% capacity) + if self.ack_receiver.len() > MAX_PENDING_ACKS * 8 / 10 + && last_channel_full_warning.elapsed() > Duration::from_secs(10) { + warn!("Ack channel is {}% full", + self.ack_receiver.len() * 100 / MAX_PENDING_ACKS); + last_channel_full_warning = Instant::now(); + } pending_tasks.push(task);
141-158
: Remove redundant sequence counter from AsyncAckProcessorAckTask.sequence and AsyncAckProcessor.sequence_counter are only used for Debug/test and never referenced by the processing logic; either remove the AckTask.sequence field and the processor-local AtomicU64 (and update the tests), or explicitly tie/rename it to Stream's sequence_counter and document the intended purpose. Location: crates/arkflow-core/src/stream/async_ack.rs (AckTask, AsyncAckProcessor::ack, tests).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/arkflow-core/src/stream/async_ack.rs
(1 hunks)crates/arkflow-core/src/stream/mod.rs
(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
crates/arkflow-core/src/stream/mod.rs (2)
crates/arkflow-core/src/stream/async_ack.rs (3)
new
(69-76)new
(120-138)ack
(140-158)crates/arkflow-core/src/input/mod.rs (3)
ack
(41-41)ack
(60-60)ack
(67-71)
crates/arkflow-core/src/stream/async_ack.rs (1)
crates/arkflow-core/src/stream/mod.rs (4)
flume
(93-93)flume
(95-95)new
(57-78)run
(81-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (2)
crates/arkflow-core/src/stream/mod.rs (2)
99-101
: LGTM! Clean integration of async ack processorThe initialization properly integrates with the existing task tracker and cancellation token, maintaining consistent lifecycle management.
370-376
: Consistent error handling pattern for ack processingThe error handling for async ack processor failures only logs the error but doesn't propagate it. This is consistent with the synchronous
ack.ack()
call which also doesn't handle errors. Good consistency in error handling approach.
pub fn is_expired(&self) -> bool { | ||
self.created_at.elapsed() > Duration::from_millis(ACK_TIMEOUT_MS) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue with fixed timeout expiry check
The is_expired
method uses a fixed timeout from task creation time, but doesn't account for retry attempts. A task that's been retried multiple times might expire before it has a chance to complete, leading to premature failures.
Consider tracking the last retry time or implementing an exponential backoff strategy:
pub fn is_expired(&self) -> bool {
- self.created_at.elapsed() > Duration::from_millis(ACK_TIMEOUT_MS)
+ // Allow more time for retried tasks
+ let timeout = Duration::from_millis(ACK_TIMEOUT_MS * (self.retry_count + 1) as u64);
+ self.created_at.elapsed() > timeout
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/stream/async_ack.rs around lines 78-80, is_expired
currently compares created_at against a fixed ACK_TIMEOUT_MS which can
prematurely expire retried tasks; change the expiry logic to use a last_activity
(or last_retry) timestamp instead of created_at and update that timestamp
whenever a retry is attempted, or compute a dynamic timeout using exponential
backoff (e.g., timeout = base_ms * 2^retry_count with a max cap) and compare
elapsed since last_activity against that computed timeout; add the new field to
the struct, update the retry code paths to set last_activity (and increment
retry_count if using backoff), and add/adjust unit tests to cover retry updates
and bounded backoff behavior.
Err(_) => { | ||
// Timeout occurred | ||
if task.should_retry() { | ||
task.increment_retry(); | ||
self.metrics.retried_acks.fetch_add(1, Ordering::Relaxed); | ||
retried_count += 1; | ||
// Keep in pending for retry | ||
} else { | ||
error!("Ack task failed after {} retries", task.retry_count); | ||
self.metrics.failed_acks.fetch_add(1, Ordering::Relaxed); | ||
self.metrics.pending_acks.fetch_sub(1, Ordering::Relaxed); | ||
failed_count += 1; | ||
tasks_to_remove.push(i); // Remove from pending | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing retry delay implementation
The code defines RETRY_DELAY_MS
constant but never uses it. Tasks that fail are immediately eligible for retry in the next batch, which could cause thundering herd issues.
Consider adding a delay before retry:
Err(_) => {
// Timeout occurred
if task.should_retry() {
task.increment_retry();
self.metrics.retried_acks.fetch_add(1, Ordering::Relaxed);
retried_count += 1;
+ // Add exponential backoff delay
+ tokio::time::sleep(Duration::from_millis(
+ RETRY_DELAY_MS * (1 << task.retry_count.min(4))
+ )).await;
// Keep in pending for retry
} else {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Err(_) => { | |
// Timeout occurred | |
if task.should_retry() { | |
task.increment_retry(); | |
self.metrics.retried_acks.fetch_add(1, Ordering::Relaxed); | |
retried_count += 1; | |
// Keep in pending for retry | |
} else { | |
error!("Ack task failed after {} retries", task.retry_count); | |
self.metrics.failed_acks.fetch_add(1, Ordering::Relaxed); | |
self.metrics.pending_acks.fetch_sub(1, Ordering::Relaxed); | |
failed_count += 1; | |
tasks_to_remove.push(i); // Remove from pending | |
} | |
} | |
Err(_) => { | |
// Timeout occurred | |
if task.should_retry() { | |
task.increment_retry(); | |
self.metrics.retried_acks.fetch_add(1, Ordering::Relaxed); | |
retried_count += 1; | |
// Add exponential backoff delay | |
tokio::time::sleep(Duration::from_millis( | |
RETRY_DELAY_MS * (1 << task.retry_count.min(4)) | |
)).await; | |
// Keep in pending for retry | |
} else { | |
error!("Ack task failed after {} retries", task.retry_count); | |
self.metrics.failed_acks.fetch_add(1, Ordering::Relaxed); | |
self.metrics.pending_acks.fetch_sub(1, Ordering::Relaxed); | |
failed_count += 1; | |
tasks_to_remove.push(i); // Remove from pending | |
} | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/stream/async_ack.rs around lines 244 to 258, the
retry delay constant RETRY_DELAY_MS is defined but not used; update the retry
path to schedule a delay instead of making the task immediately eligible for the
next batch by setting the task's next_retry time (e.g., task.next_retry =
Instant::now() + Duration::from_millis(RETRY_DELAY_MS)) or a similar timestamp
field when incrementing retry_count, keep the task in pending (do not push to
tasks_to_remove), and modify the pending-iteration logic elsewhere to skip tasks
until their next_retry has elapsed (i.e., only attempt tasks whose next_retry <=
now); ensure metrics (retried_acks, pending_acks) remain consistent and failing
behavior still removes tasks when retries are exhausted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🧹 Nitpick comments (12)
crates/arkflow-core/src/idempotent_ack.rs (3)
200-208
: Add builder for timeout.pub fn with_max_retries(mut self, max_retries: u32) -> Self { self.max_retries = max_retries; self } pub fn with_retry_delay(mut self, retry_delay: Duration) -> Self { self.retry_delay = retry_delay; self } + + pub fn with_ack_timeout(mut self, ack_timeout: Duration) -> Self { + self.ack_timeout = ack_timeout; + self + }
22-26
: AckCache uses Vec remove(0); switch to VecDeque for O(1) pops.Reduces per-ack overhead at size limit and during cleanup.
-use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; @@ pub struct AckCache { acknowledged: Arc<Mutex<HashSet<AckId>>>, - cache_timestamps: Arc<Mutex<Vec<(Instant, AckId)>>>, + cache_timestamps: Arc<Mutex<VecDeque<(Instant, AckId)>>>, } @@ pub fn new() -> Self { Self { acknowledged: Arc::new(Mutex::new(HashSet::new())), - cache_timestamps: Arc::new(Mutex::new(Vec::new())), + cache_timestamps: Arc::new(Mutex::new(VecDeque::new())), } } @@ - timestamps.push((Instant::now(), ack_id)); + timestamps.push_back((Instant::now(), ack_id)); @@ // Enforce size limit if timestamps.len() > ACK_CACHE_SIZE { - if let Some((_, oldest_id)) = timestamps.first() { + if let Some((_, oldest_id)) = timestamps.front() { acknowledged.remove(oldest_id); - timestamps.remove(0); + timestamps.pop_front(); } }Also applies to: 59-63, 65-70, 88-99, 102-115
27-29
: Make cache limits configurable (optional).Expose
AckCache::with_limits(size, ttl)
for tests and deployments with different traffic shapes.crates/arkflow-core/Cargo.toml (1)
30-31
: Trim default features for uuid (optional).Reduces compile time/binary size.
-uuid = { version = "1.8", features = ["v4"] } +uuid = { version = "1.8", default-features = false, features = ["v4"] }crates/arkflow-core/tests/reliable_ack_test.rs (2)
23-44
: Shut down the worker cleanly to avoid test flakiness.Close and wait on the tracker after cancel.
let metrics = processor.get_metrics(); @@ cancellation_token.cancel(); + tracker.close(); + tracker.wait().await;
46-76
: Same graceful shutdown here.cancellation_token.cancel(); + tracker.close(); + tracker.wait().await;crates/arkflow-core/src/stream/mod.rs (1)
392-470
: Align reliable ack typing with reliable_stream for observability (optional).Consider emitting
"success" | "error" | "error_output"
and meaningful payloads (e.g., input name) similar to reliable_stream.rs for consistency.crates/arkflow-core/src/reliable_stream.rs (2)
308-316
: Simplify backpressure calculation.The backpressure calculation (Line 311-312) uses unnecessary clamping and complex arithmetic that could be simplified.
Simplify the wait time calculation:
- let wait_time = std::cmp::min( - 500, - 100 + (pending_messages as u64 - BACKPRESSURE_THRESHOLD) / 100 * 10, - ); + let wait_time = 100 + ((pending_messages.saturating_sub(BACKPRESSURE_THRESHOLD) * 10) / 100).min(400);
450-450
: Fix increment syntax.The increment operation can be simplified using the
+=
operator.- success_cnt = success_cnt + 1; + success_cnt += 1;crates/arkflow-core/src/reliable_ack.rs (3)
225-226
: Race condition in WAL rotation.The WAL rotation renames the file while holding the lock (Line 226), but concurrent operations might fail if they try to access the file during rotation.
Consider using atomic rename with a temporary file:
- let new_path = self.path.with_extension(format!("wal.{}", timestamp)); - std::fs::rename(&self.path, &new_path)?; + let new_path = self.path.with_extension(format!("wal.{}", timestamp)); + // Ensure atomic rename operation + std::fs::rename(&self.path, &new_path)?; + // Consider adding retry logic if rename fails due to file being in use
493-495
: Timeout duration used twice.The ack operation has a timeout of
ACK_TIMEOUT_MS
(Line 494), but tasks are also checked for expiration using the same timeout (Line 481). This creates a race where tasks might timeout just before being processed.Use separate timeouts or adjust the logic:
- let result = - tokio::time::timeout(Duration::from_millis(ACK_TIMEOUT_MS), task.ack.ack()).await; + // Use a shorter timeout for individual ack operations + let result = + tokio::time::timeout(Duration::from_millis(ACK_TIMEOUT_MS / 2), task.ack.ack()).await;
525-527
: Inefficient task removal pattern.Removing tasks by index in reverse order (Lines 525-527) after collecting indices is inefficient for large batches.
Use
retain
for cleaner and potentially more efficient removal:- // Remove completed tasks - for &i in tasks_to_remove.iter().rev() { - tasks.remove(i); - } + // Remove completed tasks more efficiently + let mut removed_count = 0; + tasks.retain(|_| { + let keep = !tasks_to_remove.contains(&removed_count); + removed_count += 1; + keep + });Or better yet, track completion status in the loop and use a single retain:
+ let mut task_completed = vec![false; tasks.len()]; for (i, task) in tasks.iter_mut().enumerate() { // ... existing logic ... // Instead of tasks_to_remove.push(i), set: + task_completed[i] = true; } - // Remove completed tasks - for &i in tasks_to_remove.iter().rev() { - tasks.remove(i); - } + // Remove completed tasks + let mut i = 0; + tasks.retain(|_| { + let keep = !task_completed[i]; + i += 1; + keep + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (7)
crates/arkflow-core/Cargo.toml
(1 hunks)crates/arkflow-core/src/idempotent_ack.rs
(1 hunks)crates/arkflow-core/src/lib.rs
(1 hunks)crates/arkflow-core/src/reliable_ack.rs
(1 hunks)crates/arkflow-core/src/reliable_stream.rs
(1 hunks)crates/arkflow-core/src/stream/mod.rs
(7 hunks)crates/arkflow-core/tests/reliable_ack_test.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
crates/arkflow-core/src/lib.rs (2)
crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)crates/arkflow-core/src/stream/mod.rs (1)
output
(392-470)
crates/arkflow-core/tests/reliable_ack_test.rs (2)
crates/arkflow-core/src/idempotent_ack.rs (15)
new
(39-46)new
(65-70)new
(132-138)new
(159-165)new
(192-198)new
(242-248)new
(274-276)new
(300-309)ack
(143-149)ack
(170-182)ack
(213-232)ack
(253-266)ack
(281-287)is_acknowledged
(72-76)mark_acknowledged
(78-100)crates/arkflow-core/src/reliable_ack.rs (6)
new
(77-86)new
(157-170)new
(249-292)ack
(304-348)is_expired
(88-90)should_retry
(92-94)
crates/arkflow-core/src/reliable_stream.rs (3)
crates/arkflow-core/src/stream/mod.rs (11)
output
(392-470)flume
(117-117)flume
(119-119)new
(56-77)run
(105-175)do_input
(177-237)do_buffer
(239-281)do_processor
(283-336)do_output
(338-390)build
(525-564)close
(472-509)crates/arkflow-core/src/idempotent_ack.rs (17)
new
(39-46)new
(65-70)new
(132-138)new
(159-165)new
(192-198)new
(242-248)new
(274-276)new
(300-309)with_ack_id
(311-314)with_cache
(316-319)with_tracing
(331-334)build
(336-361)ack
(143-149)ack
(170-182)ack
(213-232)ack
(253-266)ack
(281-287)crates/arkflow-core/src/reliable_ack.rs (6)
new
(77-86)new
(157-170)new
(249-292)run
(378-431)ack
(304-348)default
(136-147)
crates/arkflow-core/src/reliable_ack.rs (2)
crates/arkflow-core/src/reliable_stream.rs (4)
flume
(101-101)flume
(103-103)new
(58-80)run
(89-172)crates/arkflow-core/src/stream/mod.rs (4)
flume
(117-117)flume
(119-119)new
(56-77)run
(105-175)
crates/arkflow-core/src/idempotent_ack.rs (2)
crates/arkflow-core/src/reliable_stream.rs (1)
new
(58-80)crates/arkflow-core/tests/reliable_ack_test.rs (1)
test_ack_cache
(79-87)
crates/arkflow-core/src/stream/mod.rs (2)
crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)crates/arkflow-core/src/reliable_ack.rs (1)
ack
(304-348)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (2)
crates/arkflow-core/src/idempotent_ack.rs (1)
336-361
: Builder defaults and ordering: confirm intended semantics.With current order (Idempotent -> Dedup -> Retryable -> Tracing), retries wrap inside idempotency. After the above IdempotentAck fix (mark-after), this is OK. If you expect outer-layer retries to always attempt the inner ack, this order is correct; otherwise, consider Retryable inside Idempotent.
crates/arkflow-core/src/lib.rs (1)
34-41
: Public modules exposure looks good.New modules are correctly exported and align with usage in tests.
async fn ack(&self) { | ||
if self.cache.mark_acknowledged(self.ack_id.clone()).await { | ||
self.inner.ack().await; | ||
} else { | ||
tracing::debug!("Duplicate acknowledgment for {:?}", self.ack_id); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IdempotentAck marks before inner ack; retries may be suppressed (ordering bug).
Marking the cache before inner.ack()
means timeouts from RetryableAck
will cause subsequent attempts to be dropped as “duplicate,” risking lost acks. Mark after the inner ack completes.
Apply:
@@
impl Ack for IdempotentAck {
async fn ack(&self) {
- if self.cache.mark_acknowledged(self.ack_id.clone()).await {
- self.inner.ack().await;
- } else {
- tracing::debug!("Duplicate acknowledgment for {:?}", self.ack_id);
- }
+ if !self.cache.is_acknowledged(&self.ack_id).await {
+ self.inner.ack().await;
+ let _ = self.cache.mark_acknowledged(self.ack_id.clone()).await;
+ } else {
+ tracing::debug!("Duplicate acknowledgment for {:?}", self.ack_id);
+ }
}
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/idempotent_ack.rs around lines 143 to 149, the cache
is currently marked acknowledged before calling inner.ack(), which can suppress
retries if inner.ack() later times out or fails; change the order to call and
await inner.ack() first, and only upon its successful completion mark the cache
(i.e., move the cache.mark_acknowledged(...) call to after awaiting inner.ack()
and make it conditional on success), ensuring errors from inner.ack() are not
treated as duplicates and are allowed to retry.
pub struct RetryableAck { | ||
inner: Arc<dyn Ack>, | ||
max_retries: u32, | ||
retry_delay: Duration, | ||
} | ||
|
||
impl RetryableAck { | ||
pub fn new(inner: Arc<dyn Ack>) -> Self { | ||
Self { | ||
inner, | ||
max_retries: 3, | ||
retry_delay: Duration::from_millis(100), | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RetryableAck conflates timeout and backoff; likely to timeout fast and suppress with IdempotentAck.
Introduce an explicit ack_timeout
separate from retry_delay
.
pub struct RetryableAck {
inner: Arc<dyn Ack>,
max_retries: u32,
- retry_delay: Duration,
+ retry_delay: Duration,
+ ack_timeout: Duration,
}
@@
pub fn new(inner: Arc<dyn Ack>) -> Self {
Self {
inner,
max_retries: 3,
- retry_delay: Duration::from_millis(100),
+ retry_delay: Duration::from_millis(100),
+ ack_timeout: Duration::from_secs(2),
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub struct RetryableAck { | |
inner: Arc<dyn Ack>, | |
max_retries: u32, | |
retry_delay: Duration, | |
} | |
impl RetryableAck { | |
pub fn new(inner: Arc<dyn Ack>) -> Self { | |
Self { | |
inner, | |
max_retries: 3, | |
retry_delay: Duration::from_millis(100), | |
} | |
} | |
pub struct RetryableAck { | |
inner: Arc<dyn Ack>, | |
max_retries: u32, | |
retry_delay: Duration, | |
ack_timeout: Duration, | |
} | |
impl RetryableAck { | |
pub fn new(inner: Arc<dyn Ack>) -> Self { | |
Self { | |
inner, | |
max_retries: 3, | |
retry_delay: Duration::from_millis(100), | |
ack_timeout: Duration::from_secs(2), | |
} | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/idempotent_ack.rs around lines 185 to 199,
RetryableAck currently uses retry_delay as the only timing parameter which
conflates per-attempt timeout and backoff; add an explicit ack_timeout Duration
field to the struct, initialize it in new (e.g. default 100ms or appropriate
value) and keep retry_delay as the backoff between attempts, update any places
that await or timeout the inner. Specifically: add ack_timeout: Duration to the
struct, accept or set a sensible default in new(), use ack_timeout when
wrapping/awaiting inner. Adjust callers/tests and any docs/comments to reflect
separate timeout vs backoff semantics.
#[async_trait] | ||
impl Ack for RetryableAck { | ||
async fn ack(&self) { | ||
let mut last_error = None; | ||
|
||
for attempt in 0..=self.max_retries { | ||
match tokio::time::timeout(self.retry_delay * (attempt + 1), self.inner.ack()).await { | ||
Ok(_) => return, // Success | ||
Err(timeout_error) => { | ||
last_error = Some(timeout_error); | ||
if attempt < self.max_retries { | ||
tracing::warn!("Ack attempt {} timed out, retrying...", attempt + 1); | ||
tokio::time::sleep(self.retry_delay * (attempt + 1)).await; | ||
} | ||
} | ||
} | ||
} | ||
|
||
if let Some(error) = last_error { | ||
tracing::error!("Ack failed after {} retries: {:?}", self.max_retries, error); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use fixed timeout per attempt; backoff only in sleeps.
impl Ack for RetryableAck {
async fn ack(&self) {
- let mut last_error = None;
-
- for attempt in 0..=self.max_retries {
- match tokio::time::timeout(self.retry_delay * (attempt + 1), self.inner.ack()).await {
- Ok(_) => return, // Success
- Err(timeout_error) => {
- last_error = Some(timeout_error);
- if attempt < self.max_retries {
- tracing::warn!("Ack attempt {} timed out, retrying...", attempt + 1);
- tokio::time::sleep(self.retry_delay * (attempt + 1)).await;
- }
- }
- }
- }
-
- if let Some(error) = last_error {
- tracing::error!("Ack failed after {} retries: {:?}", self.max_retries, error);
- }
+ for attempt in 0..=self.max_retries {
+ match tokio::time::timeout(self.ack_timeout, self.inner.ack()).await {
+ Ok(_) => return,
+ Err(e) if attempt < self.max_retries => {
+ let backoff = self.retry_delay * (attempt + 1);
+ tracing::warn!(
+ "Ack attempt {} timed out after {:?}, retrying in {:?}...",
+ attempt + 1,
+ self.ack_timeout,
+ backoff
+ );
+ tokio::time::sleep(backoff).await;
+ }
+ Err(e) => {
+ tracing::error!("Ack failed after {} retries: {:?}", self.max_retries, e);
+ return;
+ }
+ }
+ }
}
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/idempotent_ack.rs around lines 211 to 233, the
timeout passed to tokio::time::timeout is currently multiplied by (attempt + 1)
which grows the per-attempt timeout; change it to use a fixed timeout value
(self.retry_delay) for each attempt and keep exponential/incremental backoff
only in the subsequent tokio::time::sleep call (e.g., sleep(self.retry_delay *
(attempt + 1))). Leave the retry loop and logging intact and ensure last_error
captures the timeout error for final logging.
pub struct TracedAck { | ||
inner: Arc<dyn Ack>, | ||
ack_id: AckId, | ||
start_time: Instant, | ||
} | ||
|
||
impl TracedAck { | ||
pub fn new(inner: Arc<dyn Ack>, ack_id: AckId) -> Self { | ||
Self { | ||
inner, | ||
ack_id, | ||
start_time: Instant::now(), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TracedAck measures from construction time, not ack start (incorrect latency).
Capture Instant::now()
inside ack()
; drop the stored start_time
.
pub struct TracedAck {
inner: Arc<dyn Ack>,
ack_id: AckId,
- start_time: Instant,
}
@@
impl TracedAck {
pub fn new(inner: Arc<dyn Ack>, ack_id: AckId) -> Self {
- Self {
- inner,
- ack_id,
- start_time: Instant::now(),
- }
+ Self { inner, ack_id }
}
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/idempotent_ack.rs around lines 235 to 249, TracedAck
currently stores a start_time on construction which yields incorrect latency
measurements; remove the start_time field from the struct and its initialization
in new, then change ack() to capture Instant::now() at the beginning of the
ack() call (before delegating to inner.ack()), compute the duration after the
inner call returns, and use that duration for logging/metrics; update any
constructor and trait impls accordingly to compile.
#[async_trait] | ||
impl Ack for TracedAck { | ||
async fn ack(&self) { | ||
tracing::debug!("Starting ack for {:?}", self.ack_id); | ||
|
||
let result = self.inner.ack().await; | ||
|
||
let duration = self.start_time.elapsed(); | ||
tracing::debug!("Ack completed for {:?} in {:?}", self.ack_id, duration); | ||
|
||
if duration > Duration::from_millis(100) { | ||
tracing::warn!("Slow ack detected for {:?}: {:?}", self.ack_id, duration); | ||
} | ||
|
||
result | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix TracedAck timing and return.
impl Ack for TracedAck {
async fn ack(&self) {
tracing::debug!("Starting ack for {:?}", self.ack_id);
-
- let result = self.inner.ack().await;
-
- let duration = self.start_time.elapsed();
+ let start = Instant::now();
+ self.inner.ack().await;
+ let duration = start.elapsed();
tracing::debug!("Ack completed for {:?} in {:?}", self.ack_id, duration);
@@
- result
+ ()
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[async_trait] | |
impl Ack for TracedAck { | |
async fn ack(&self) { | |
tracing::debug!("Starting ack for {:?}", self.ack_id); | |
let result = self.inner.ack().await; | |
let duration = self.start_time.elapsed(); | |
tracing::debug!("Ack completed for {:?} in {:?}", self.ack_id, duration); | |
if duration > Duration::from_millis(100) { | |
tracing::warn!("Slow ack detected for {:?}: {:?}", self.ack_id, duration); | |
} | |
result | |
} | |
} | |
#[async_trait] | |
impl Ack for TracedAck { | |
async fn ack(&self) { | |
tracing::debug!("Starting ack for {:?}", self.ack_id); | |
let start = Instant::now(); | |
self.inner.ack().await; | |
let duration = start.elapsed(); | |
tracing::debug!("Ack completed for {:?} in {:?}", self.ack_id, duration); | |
if duration > Duration::from_millis(100) { | |
tracing::warn!("Slow ack detected for {:?}: {:?}", self.ack_id, duration); | |
} | |
() | |
} | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/idempotent_ack.rs around lines 251 to 267, the
TracedAck::ack implementation is using a stored start_time for timing and then
dropping the result instead of returning it; change it to record the start at
the start of the method (let start = Instant::now()), await
self.inner.ack().await into result, compute duration from start.elapsed(), log
based on that duration, and then return the awaited result (i.e., ensure the
function returns result).
// Initialize reliable ack processor if not already set | ||
if self.ack_processor.is_none() { | ||
let temp_dir = std::env::temp_dir(); | ||
let wal_path = temp_dir.join(format!("ack_wal_{}", std::process::id())); | ||
let ack_processor = | ||
ReliableAckProcessor::new(&tracker, cancellation_token.clone(), &wal_path)?; | ||
self.ack_processor = Some(Arc::new(ack_processor)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid creating WAL processors in library runtime code.
Creating a reliable ack processor with a default temporary WAL path at runtime (Lines 109-113) makes this stream non-deterministic and harder to test. Library code should require explicit processor configuration rather than silently creating stateful components.
Consider making the ack_processor
required when reliability is needed:
- // Initialize reliable ack processor if not already set
- if self.ack_processor.is_none() {
- let temp_dir = std::env::temp_dir();
- let wal_path = temp_dir.join(format!("ack_wal_{}", std::process::id()));
- let ack_processor =
- ReliableAckProcessor::new(&tracker, cancellation_token.clone(), &wal_path)?;
- self.ack_processor = Some(Arc::new(ack_processor));
- }
+ // Verify ack processor is configured if required
+ // The processor should be provided via with_ack_processor() during construction
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/reliable_stream.rs around lines 107 to 114, the code
currently creates a ReliableAckProcessor with a temp WAL path at runtime which
injects non-deterministic state into library code; instead, stop creating a
WAL/processor implicitly and require explicit configuration from the caller:
remove the temp_dir/wal_path creation and the ReliableAckProcessor::new call
here, and either (a) make ack_processor a required argument on the constructor
or method (or accept an Option that triggers a clear error), or (b) accept a
configured WAL path or builder in the stream configuration so callers supply
deterministic state; update the function signature and callers to pass the
ack_processor or WAL path, and add a clear runtime error or doc note if
reliability is requested but no processor was provided.
let ack_id = AckId::new( | ||
"stream_input".to_string(), | ||
format!("msg_{}", std::time::SystemTime::now() | ||
.duration_since(std::time::UNIX_EPOCH) | ||
.unwrap() | ||
.as_millis()) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp collision risk in message ID generation.
Using system time in milliseconds for message IDs (Line 192-195) can lead to collisions when multiple messages arrive within the same millisecond. This breaks the idempotency guarantee of the ack cache.
Use a more robust ID generation strategy:
- let ack_id = AckId::new(
- "stream_input".to_string(),
- format!("msg_{}", std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_millis())
- );
+ let ack_id = AckId::new(
+ "stream_input".to_string(),
+ uuid::Uuid::new_v4().to_string()
+ );
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let ack_id = AckId::new( | |
"stream_input".to_string(), | |
format!("msg_{}", std::time::SystemTime::now() | |
.duration_since(std::time::UNIX_EPOCH) | |
.unwrap() | |
.as_millis()) | |
); | |
let ack_id = AckId::new( | |
"stream_input".to_string(), | |
uuid::Uuid::new_v4().to_string() | |
); |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/reliable_stream.rs around lines 190 to 196, the
current AckId uses SystemTime::now().as_millis() which can collide for multiple
messages within the same millisecond; replace this fragile timestamp-based ID
with a robust unique generator — e.g., generate a UUID v4 string (add the uuid
crate to Cargo.toml and call Uuid::new_v4().to_string()) or use a process-wide
AtomicU64 monotonic counter combined with a short timestamp to ensure
uniqueness; update the AckId construction to use the new unique string and add
the necessary imports and Cargo.toml dependency if using uuid.
match buffer.read().await { | ||
Ok(Some(v)) => { | ||
if let Err(e) = input_sender.send_async(v).await { | ||
error!("Failed to send input message: {}", e); | ||
} | ||
} | ||
_ => {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle buffer flush errors appropriately.
After cancellation, there's an attempt to read from the buffer (Lines 285-292) but only the first message is processed. Any remaining messages after flush will be lost.
Process all remaining messages after flush:
- match buffer.read().await {
- Ok(Some(v)) => {
- if let Err(e) = input_sender.send_async(v).await {
- error!("Failed to send input message: {}", e);
- }
- }
- _ => {}
- }
+ // Process all remaining messages after flush
+ loop {
+ match buffer.read().await {
+ Ok(Some(v)) => {
+ if let Err(e) = input_sender.send_async(v).await {
+ error!("Failed to send buffered message: {}", e);
+ break;
+ }
+ }
+ _ => break,
+ }
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
match buffer.read().await { | |
Ok(Some(v)) => { | |
if let Err(e) = input_sender.send_async(v).await { | |
error!("Failed to send input message: {}", e); | |
} | |
} | |
_ => {} | |
} | |
// Process all remaining messages after flush | |
loop { | |
match buffer.read().await { | |
Ok(Some(v)) => { | |
if let Err(e) = input_sender.send_async(v).await { | |
error!("Failed to send buffered message: {}", e); | |
break; | |
} | |
} | |
_ => break, | |
} | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/reliable_stream.rs around lines 285-292, the current
code reads only a single item after cancellation and drops remaining buffered
messages; change this to drain the buffer until it yields None so all flushed
messages are processed. Replace the single match with a loop that awaits
buffer.read() repeatedly (e.g., while let Ok(Some(msg)) = buffer.read().await)
and for each message call input_sender.send_async(msg).await, logging send
errors but continuing to drain; on Ok(None) break the loop and on Err log the
read error and break/continue as appropriate to avoid losing messages.
// Initialize ack processor if enabled | ||
if self.enable_reliable_ack { | ||
let temp_dir = std::env::temp_dir(); | ||
let wal_path_str = self | ||
.wal_path | ||
.as_ref() | ||
.cloned() | ||
.unwrap_or_else(|| temp_dir.join("ack_wal").to_string_lossy().into_owned()); | ||
let wal_path = std::path::Path::new(&wal_path_str); | ||
|
||
let tracker = TaskTracker::new(); | ||
let cancellation_token = CancellationToken::new(); | ||
let ack_processor = ReliableAckProcessor::new(&tracker, cancellation_token, wal_path)?; | ||
|
||
stream = stream.with_ack_processor(ack_processor); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaking task tracker and cancellation token.
The ReliableStreamConfig::build()
method creates a new TaskTracker
and CancellationToken
(Lines 586-587) to initialize the ack processor, but these are not properly managed. The tracker spawns background tasks that won't be cleaned up when the stream is dropped.
The ack processor should either:
- Be created externally and passed in (preferred)
- Store and manage the tracker/token lifecycle properly
// Initialize ack processor if enabled
if self.enable_reliable_ack {
- let temp_dir = std::env::temp_dir();
- let wal_path_str = self
- .wal_path
- .as_ref()
- .cloned()
- .unwrap_or_else(|| temp_dir.join("ack_wal").to_string_lossy().into_owned());
- let wal_path = std::path::Path::new(&wal_path_str);
-
- let tracker = TaskTracker::new();
- let cancellation_token = CancellationToken::new();
- let ack_processor = ReliableAckProcessor::new(&tracker, cancellation_token, wal_path)?;
-
- stream = stream.with_ack_processor(ack_processor);
+ return Err(Error::Config("ReliableStreamConfig requires an external ack processor. Use Stream::new_reliable() instead.".to_string()));
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Initialize ack processor if enabled | |
if self.enable_reliable_ack { | |
let temp_dir = std::env::temp_dir(); | |
let wal_path_str = self | |
.wal_path | |
.as_ref() | |
.cloned() | |
.unwrap_or_else(|| temp_dir.join("ack_wal").to_string_lossy().into_owned()); | |
let wal_path = std::path::Path::new(&wal_path_str); | |
let tracker = TaskTracker::new(); | |
let cancellation_token = CancellationToken::new(); | |
let ack_processor = ReliableAckProcessor::new(&tracker, cancellation_token, wal_path)?; | |
stream = stream.with_ack_processor(ack_processor); | |
} | |
// Initialize ack processor if enabled | |
if self.enable_reliable_ack { | |
return Err(Error::Config("ReliableStreamConfig requires an external ack processor. Use Stream::new_reliable() instead.".to_string())); | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/reliable_stream.rs around lines 576 to 591, build()
creates a TaskTracker and CancellationToken to construct the
ReliableAckProcessor but never stores or manages them, leaking background tasks
when the stream is dropped; to fix this either (preferred) change the API so an
already-constructed ack processor is passed into ReliableStreamConfig::build()
(move creation responsibility upstream), or if creation must remain here ensure
ownership and lifecycle are captured by storing the tracker and cancellation
token on the stream struct (or have ReliableAckProcessor own them) and implement
a proper shutdown/Drop that cancels the token and waits/joins tracker tasks so
no background tasks outlive the stream.
let ack_id = | ||
AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); | ||
if let Err(e) = processor | ||
.ack( | ||
ack.clone(), | ||
"stream".to_string(), | ||
ack_id.source_id.as_bytes().to_vec(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack payload uses source_id instead of message_id (loses uniqueness).
Use the UUID message_id
for payload; consider a clearer ack_type
.
- let ack_id =
- AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string());
+ let ack_id =
+ AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string());
if let Err(e) = processor
.ack(
ack.clone(),
- "stream".to_string(),
- ack_id.source_id.as_bytes().to_vec(),
+ "success".to_string(),
+ ack_id.message_id.as_bytes().to_vec(),
)
.await
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let ack_id = | |
AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); | |
if let Err(e) = processor | |
.ack( | |
ack.clone(), | |
"stream".to_string(), | |
ack_id.source_id.as_bytes().to_vec(), | |
) | |
let ack_id = | |
AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); | |
if let Err(e) = processor | |
.ack( | |
ack.clone(), | |
"success".to_string(), | |
ack_id.message_id.as_bytes().to_vec(), | |
) | |
.await |
🤖 Prompt for AI Agents
crates/arkflow-core/src/stream/mod.rs around lines 444 to 451: the code builds
an AckId and then passes ack_id.source_id as the payload, which loses the
intended uniqueness and mixes semantics; instead create the AckId so the UUID is
stored as the message_id and use a clearer ack type (e.g., "message" or
"stream_ack"), then pass ack_id.message_id.as_bytes().to_vec() to
processor.ack(...) so the payload uses the UUID message_id and the ack type
reflects message-level acknowledgements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/arkflow-core/src/stream/mod.rs (1)
444-451
: Ack payload uses source_id instead of message_id (loses uniqueness).The code creates an
AckId
with a UUID but then passesack_id.source_id
as the payload, which is always "stream". This loses the unique identifier. The message_id field containing the UUID should be used instead for proper message tracking.Apply this diff to use the unique message_id:
let ack_id = AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); if let Err(e) = processor .ack( ack.clone(), - "stream".to_string(), - ack_id.source_id.as_bytes().to_vec(), + "success".to_string(), + ack_id.message_id.as_bytes().to_vec(), ) .await
🧹 Nitpick comments (4)
examples/reliable_ack_example.yaml (2)
35-35
: Consider using English for configuration comments to maintain consistency.The comment uses Chinese characters "可靠确认配置" while the rest of the codebase primarily uses English. For better maintainability and international collaboration, consider using English.
Apply this diff to use English:
- # 可靠确认配置 + # Reliable acknowledgment configuration
42-42
: Add newline at end of file.YAML files should end with a newline character for better compatibility with various tools and version control systems.
Apply this diff to add the missing newline:
- enable_backpressure: true + enable_backpressure: true +examples/reliable_ack_usage.rs (1)
22-23
: Consider using English for user-facing messages and comments.The Chinese text in comments and println statements may limit international usability and collaboration.
Consider translating the Chinese text to English for better international accessibility. For example:
- "读取YAML配置文件" → "Read YAML configuration file"
- "解析配置" → "Parse configuration"
- "创建流配置" → "Create stream configuration"
- "成功创建可靠确认流!" → "Successfully created reliable acknowledgment stream!"
- "配置详情" → "Configuration details"
- etc.
examples/test_config.rs (1)
15-16
: Remove redundant import.The
serde_yaml
import is redundant since it's already available when imported as a crate dependency.Apply this diff to remove the redundant line:
use arkflow_core::stream::StreamConfig; -use serde_yaml;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
crates/arkflow-core/src/stream/mod.rs
(10 hunks)crates/arkflow-core/tests/reliable_ack_integration_test.rs
(1 hunks)examples/reliable_ack_example.yaml
(1 hunks)examples/reliable_ack_usage.rs
(1 hunks)examples/test_config.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
examples/test_config.rs (1)
examples/reliable_ack_usage.rs (1)
main
(21-63)
examples/reliable_ack_usage.rs (2)
crates/arkflow-core/src/stream/mod.rs (1)
output
(392-470)examples/test_config.rs (1)
main
(18-119)
crates/arkflow-core/tests/reliable_ack_integration_test.rs (1)
crates/arkflow-core/src/stream/mod.rs (1)
new
(56-77)
crates/arkflow-core/src/stream/mod.rs (3)
crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)crates/arkflow-core/src/idempotent_ack.rs (5)
ack
(143-149)ack
(170-182)ack
(213-232)ack
(253-266)ack
(281-287)crates/arkflow-core/src/reliable_ack.rs (1)
ack
(304-348)
🪛 YAMLlint (1.37.1)
examples/reliable_ack_example.yaml
[error] 42-42: no new line character at the end of file
(new-line-at-end-of-file)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (6)
examples/test_config.rs (1)
19-117
: LGTM! Well-structured configuration testing.The test code effectively demonstrates both enabled and disabled reliable acknowledgment scenarios with clear validation and user-friendly output messages. The error handling is appropriate and the code structure is clean.
crates/arkflow-core/tests/reliable_ack_integration_test.rs (3)
20-71
: LGTM! Comprehensive test coverage for configuration parsing.The test properly validates all reliable acknowledgment configuration fields and uses a temporary directory for the WAL path to avoid test artifacts.
73-105
: LGTM! Good negative test case.The test correctly validates that regular streams without reliable acknowledgment configuration work as expected.
107-151
: LGTM! Thorough validation of default values.The test properly validates that default values are applied when only
enabled: true
is specified, ensuring backward compatibility and sensible defaults.crates/arkflow-core/src/stream/mod.rs (2)
79-101
: LGTM! Clean constructor for reliable streams.The new constructor properly initializes all fields including the reliable_ack_processor, maintaining consistency with the original constructor pattern.
513-541
: LGTM! Well-structured configuration with sensible defaults.The
ReliableAckConfig
struct is well-designed with appropriate optional fields and reasonable default values. The serde attributes ensure proper serialization/deserialization.
let wal_path = reliable_ack_config | ||
.wal_path | ||
.as_ref() | ||
.map_or("./reliable_ack.wal".to_string(), |p| p.clone()); | ||
|
||
// Create reliable ack processor | ||
let tracker = tokio_util::task::TaskTracker::new(); | ||
let cancellation_token = tokio_util::sync::CancellationToken::new(); | ||
let ack_processor = | ||
std::sync::Arc::new(crate::reliable_ack::ReliableAckProcessor::new( | ||
&tracker, | ||
cancellation_token, | ||
std::path::Path::new(&wal_path), | ||
)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Potential resource leak: TaskTracker and CancellationToken not properly managed.
The TaskTracker
and CancellationToken
created for the ReliableAckProcessor
are not stored or managed by the Stream. This could lead to resource leaks or improper shutdown handling when the processor needs to be stopped.
Consider storing these resources in the Stream struct or ensuring they're properly tied to the Stream's lifecycle. The processor likely needs these for graceful shutdown.
+ // Store tracker and token for proper lifecycle management
+ // These should be stored in Stream struct fields
let tracker = tokio_util::task::TaskTracker::new();
let cancellation_token = tokio_util::sync::CancellationToken::new();
🏁 Script executed:
#!/bin/bash
# Check how ReliableAckProcessor manages tracker and cancellation_token
rg -n -A10 -B5 "impl.*ReliableAckProcessor" crates/arkflow-core/src/reliable_ack.rs
Length of output: 2039
Manage TaskTracker and CancellationToken so the ReliableAckProcessor can be shut down cleanly
ReliableAckProcessor::new takes &TaskTracker and a CancellationToken, but stream/mod.rs creates a TaskTracker and CancellationToken locally and does not retain them — you cannot call tracker.close()/wait() or token.cancel() to stop the processor, which prevents graceful shutdown and can leak tasks.
- Action: either store the TaskTracker and CancellationToken on the Stream and call tracker.close()/tracker.wait() + cancellation_token.cancel() during shutdown, or have ReliableAckProcessor own/cloned handles and expose a shutdown/close method that cancels and waits for worker tasks.
- Locations: crates/arkflow-core/src/stream/mod.rs (around lines 590–603) and crates/arkflow-core/src/reliable_ack.rs (impl ReliableAckProcessor::new at ~249 and worker impl around ~377).
🤖 Prompt for AI Agents
In crates/arkflow-core/src/stream/mod.rs around lines 590–603, the TaskTracker
and CancellationToken are created locally and dropped after creating
ReliableAckProcessor, preventing shutdown calls (tracker.close()/wait() and
token.cancel()) and leaking tasks; update the Stream struct to store the
TaskTracker and CancellationToken as fields (or otherwise persist
clones/handles) and on Stream shutdown call cancellation_token.cancel() then
tracker.close() and tracker.wait() to ensure worker tasks stop, or alternatively
modify crates/arkflow-core/src/reliable_ack.rs (ReliableAckProcessor::new at
~249 and worker impl around ~377) so the processor takes/owns cloned TaskTracker
and CancellationToken and exposes a shutdown/close method that performs cancel +
wait; implement one of these approaches and wire the shutdown call from Stream’s
shutdown path.
println!( | ||
" - 可靠确认: {}", | ||
stream_config.reliable_ack.unwrap().enabled | ||
); | ||
println!( | ||
" - WAL路径: {:?}", | ||
stream_config.reliable_ack.unwrap().wal_path | ||
); | ||
println!( | ||
" - 最大重试次数: {:?}", | ||
stream_config.reliable_ack.unwrap().max_retries | ||
); | ||
println!( | ||
" - 背压控制: {:?}", | ||
stream_config.reliable_ack.unwrap().enable_backpressure | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple unwrap()
calls on the same Option
cause redundant moves and potential panics.
The code calls unwrap()
multiple times on stream_config.reliable_ack
, which moves the value each time. This will cause a compilation error because unwrap()
consumes the Option
. Additionally, if reliable_ack
is None
, the program will panic without a helpful message.
Apply this diff to fix the issue:
- println!("📋 配置详情:");
- println!(
- " - 可靠确认: {}",
- stream_config.reliable_ack.unwrap().enabled
- );
- println!(
- " - WAL路径: {:?}",
- stream_config.reliable_ack.unwrap().wal_path
- );
- println!(
- " - 最大重试次数: {:?}",
- stream_config.reliable_ack.unwrap().max_retries
- );
- println!(
- " - 背压控制: {:?}",
- stream_config.reliable_ack.unwrap().enable_backpressure
- );
+ println!("📋 配置详情:");
+ if let Some(ref reliable_ack) = stream_config.reliable_ack {
+ println!(" - 可靠确认: {}", reliable_ack.enabled);
+ println!(" - WAL路径: {:?}", reliable_ack.wal_path);
+ println!(" - 最大重试次数: {:?}", reliable_ack.max_retries);
+ println!(" - 背压控制: {:?}", reliable_ack.enable_backpressure);
+ } else {
+ println!(" - 可靠确认: 未配置");
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
println!( | |
" - 可靠确认: {}", | |
stream_config.reliable_ack.unwrap().enabled | |
); | |
println!( | |
" - WAL路径: {:?}", | |
stream_config.reliable_ack.unwrap().wal_path | |
); | |
println!( | |
" - 最大重试次数: {:?}", | |
stream_config.reliable_ack.unwrap().max_retries | |
); | |
println!( | |
" - 背压控制: {:?}", | |
stream_config.reliable_ack.unwrap().enable_backpressure | |
); | |
println!("📋 配置详情:"); | |
if let Some(ref reliable_ack) = stream_config.reliable_ack { | |
println!(" - 可靠确认: {}", reliable_ack.enabled); | |
println!(" - WAL路径: {:?}", reliable_ack.wal_path); | |
println!(" - 最大重试次数: {:?}", reliable_ack.max_retries); | |
println!(" - 背压控制: {:?}", reliable_ack.enable_backpressure); | |
} else { | |
println!(" - 可靠确认: 未配置"); | |
} |
🤖 Prompt for AI Agents
In examples/reliable_ack_usage.rs around lines 36 to 51, multiple calls to
stream_config.reliable_ack.unwrap() both consume the Option repeatedly and risk
panics; capture the Option once (e.g., with if let Some(reliable_ack) =
&stream_config.reliable_ack or let reliable_ack =
stream_config.reliable_ack.as_ref()) and then reference its fields without
additional unwraps, or match and provide a clear fallback/log message for the
None case to avoid panics and redundant moves.
5bc92c3
to
b068241
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 17
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/arkflow-core/src/stream/mod.rs (3)
36-47
: Retain ack processor’s TaskTracker and CancellationToken to avoid leaks and enable graceful shutdown.Store them on Stream so you can cancel/close during close().
pub struct Stream { @@ next_seq: Arc<AtomicU64>, - reliable_ack_processor: Option<Arc<ReliableAckProcessor>>, + reliable_ack_processor: Option<Arc<ReliableAckProcessor>>, + // for reliable ack processor lifecycle management + ack_task_tracker: Option<TaskTracker>, + ack_cancellation_token: Option<CancellationToken>, }
55-77
: Initialize new fields for the non-reliable constructor.Self { @@ next_seq: Arc::new(AtomicU64::new(0)), reliable_ack_processor: None, + ack_task_tracker: None, + ack_cancellation_token: None, }
472-509
: Call cancel/close/wait for the reliable ack tasks during shutdown.Ensures graceful termination.
async fn close(&mut self) -> Result<(), Error> { + // Stop reliable ack processor background tasks first (if any) + if let Some(token) = &self.ack_cancellation_token { + token.cancel(); + } + if let Some(tracker) = &self.ack_task_tracker { + tracker.close(); + tracker.wait().await; + }
🧹 Nitpick comments (7)
crates/arkflow-core/src/object_storage.rs (2)
117-123
: Batch upload is sequential; consider concurrency for throughput.Use a buffered stream with limited concurrency (e.g., futures::stream::iter(...).buffer_unordered(N)). Keep per-key error handling.
257-276
: S3 exists() error matching by string code is brittle.Prefer checking HTTP status (e.g., NotFound/404) via SdkError variants if available for your aws-sdk version.
Please confirm error variant matching against your aws-sdk-s3 version.
crates/arkflow-core/src/stream/mod.rs (1)
399-469
: Align error ack semantics with reliable_stream (use processor with “error”/“error_output” types).Currently, errors with no err_output are treated as success, then acked via processor with type “stream”. Consider mirroring reliable_stream.rs for consistency and observability.
crates/arkflow-core/src/distributed_ack_config.rs (1)
217-249
: Validation covers key invariants; consider adding wal.local_wal_path non-empty check when enabled.Prevents misconfig at startup.
crates/arkflow-core/src/checkpoint_manager.rs (1)
434-483
: Checkpoint cleanup cache update/logging is confusing; track the number removed separately.Avoid reusing removed_count as a decrementing counter for the second pass; compute a new value for logging.
- let to_remove = checkpoints_guard.len() - config.max_checkpoints; - let mut removed_count = 0; + let to_remove = checkpoints_guard.len() - config.max_checkpoints; + let mut removed_success = 0; @@ - } else { - removed_count += 1; + } else { + removed_success += 1; } } @@ - for (timestamp, checkpoint_info) in checkpoints_guard.range(..) { - if removed_count == 0 { + let mut remaining_to_skip = removed_success; + for (timestamp, checkpoint_info) in checkpoints_guard.range(..) { + if remaining_to_skip == 0 { new_checkpoints.insert(*timestamp, checkpoint_info.clone()); } else { - removed_count -= 1; + remaining_to_skip -= 1; } } @@ - if removed_count > 0 { - info!("Cleaned up {} old checkpoints", removed_count); + if removed_success > 0 { + info!("Cleaned up {} old checkpoints", removed_success); }crates/arkflow-core/src/distributed_ack_processor.rs (1)
385-399
: Avoid buildingrecord
in fallback path (unused).Remove dead local to reduce allocations.
- } else if let Some(ref fallback) = self.fallback_processor { - let record = task.to_record(); - // For fallback mode, we just send to the fallback processor + } else if let Some(ref fallback) = self.fallback_processor { + // For fallback mode, we just send to the fallback processor return fallback.ack(task.ack, task.ack_type, task.payload).await; }crates/arkflow-core/Cargo.toml (1)
34-44
: Gate the cloud SDK dependencies behind feature flags.Pulling in
aws-sdk-s3
,azure_storage[_blobs]
,google-cloud-storage
, and their helper crates unconditionally forces every downstream build ofarkflow-core
to compile three full cloud SDK stacks—even when distributed ack/object storage is never used. That’s a big hit to compile time, binary size, and target portability (native TLS backends, extra transitive deps). Please move these under opt-in features (e.g.object-storage-s3
,object-storage-azure
,object-storage-gcs
) and guard the corresponding modules so consumers enable only what they need.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (16)
crates/arkflow-core/Cargo.toml
(1 hunks)crates/arkflow-core/src/checkpoint_manager.rs
(1 hunks)crates/arkflow-core/src/distributed_ack_config.rs
(1 hunks)crates/arkflow-core/src/distributed_ack_processor.rs
(1 hunks)crates/arkflow-core/src/distributed_wal.rs
(1 hunks)crates/arkflow-core/src/lib.rs
(1 hunks)crates/arkflow-core/src/node_registry.rs
(1 hunks)crates/arkflow-core/src/object_storage.rs
(1 hunks)crates/arkflow-core/src/recovery_manager.rs
(1 hunks)crates/arkflow-core/src/stream/mod.rs
(10 hunks)crates/arkflow-core/tests/distributed_ack_integration_test.rs
(1 hunks)crates/arkflow-core/tests/reliable_ack_integration_test.rs
(1 hunks)examples/distributed_ack_example.yaml
(1 hunks)examples/reliable_ack_example.yaml
(1 hunks)examples/reliable_ack_usage.rs
(1 hunks)examples/test_config.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/test_config.rs
- examples/reliable_ack_example.yaml
- examples/reliable_ack_usage.rs
👮 Files not reviewed due to content moderation or server errors (2)
- examples/distributed_ack_example.yaml
- crates/arkflow-core/src/lib.rs
🧰 Additional context used
🧬 Code graph analysis (11)
crates/arkflow-core/src/lib.rs (2)
crates/arkflow-core/src/stream/mod.rs (1)
output
(392-470)crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)
crates/arkflow-core/tests/reliable_ack_integration_test.rs (1)
crates/arkflow-core/src/stream/mod.rs (1)
new
(56-77)
crates/arkflow-core/src/object_storage.rs (4)
crates/arkflow-core/src/checkpoint_manager.rs (2)
new
(97-118)default
(48-59)crates/arkflow-core/src/distributed_wal.rs (4)
new
(170-230)default
(62-78)default
(103-112)default
(129-140)crates/arkflow-core/src/node_registry.rs (5)
new
(158-176)new
(474-489)new
(585-590)default
(60-62)default
(87-97)crates/arkflow-core/src/recovery_manager.rs (3)
new
(145-153)new
(256-286)default
(56-70)
crates/arkflow-core/src/node_registry.rs (1)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(130-155)new
(165-199)new
(389-410)new
(550-559)new
(721-731)
crates/arkflow-core/src/distributed_ack_config.rs (4)
crates/arkflow-core/src/checkpoint_manager.rs (2)
default
(48-59)new
(97-118)crates/arkflow-core/src/distributed_wal.rs (4)
default
(62-78)default
(103-112)default
(129-140)new
(170-230)crates/arkflow-core/src/node_registry.rs (5)
default
(60-62)default
(87-97)new
(158-176)new
(474-489)new
(585-590)crates/arkflow-core/src/recovery_manager.rs (3)
default
(56-70)new
(145-153)new
(256-286)
crates/arkflow-core/src/stream/mod.rs (3)
crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)crates/arkflow-core/src/distributed_ack_processor.rs (1)
ack
(345-400)crates/arkflow-core/src/reliable_ack.rs (1)
ack
(304-348)
crates/arkflow-core/src/checkpoint_manager.rs (2)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(130-155)new
(165-199)new
(389-410)new
(550-559)new
(721-731)crates/arkflow-core/src/distributed_wal.rs (3)
new
(170-230)serde_json
(598-598)create_checkpoint
(663-693)
crates/arkflow-core/src/distributed_wal.rs (1)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(130-155)new
(165-199)new
(389-410)new
(550-559)new
(721-731)
crates/arkflow-core/src/distributed_ack_processor.rs (5)
crates/arkflow-core/src/node_registry.rs (9)
create_node_registry
(557-576)default
(60-62)default
(87-97)new
(158-176)new
(474-489)new
(585-590)get_active_nodes
(125-125)get_active_nodes
(405-421)get_active_nodes
(621-635)crates/arkflow-core/src/checkpoint_manager.rs (5)
default
(48-59)new
(97-118)create_checkpoint
(274-378)shutdown
(593-612)get_latest_checkpoint
(486-492)crates/arkflow-core/src/distributed_ack_config.rs (5)
default
(47-57)default
(70-77)default
(94-101)new
(106-116)for_local_testing
(252-324)crates/arkflow-core/src/distributed_wal.rs (8)
default
(62-78)default
(103-112)default
(129-140)new
(170-230)start_background_tasks
(233-278)get_metrics
(653-655)create_checkpoint
(663-693)shutdown
(696-713)crates/arkflow-core/src/recovery_manager.rs (4)
default
(56-70)new
(145-153)new
(256-286)perform_consistency_check
(713-799)
crates/arkflow-core/tests/distributed_ack_integration_test.rs (3)
crates/arkflow-core/src/distributed_ack_config.rs (4)
new
(106-116)for_local_testing
(252-324)for_production
(327-388)validate
(218-249)crates/arkflow-core/src/distributed_ack_processor.rs (2)
new
(118-196)ack
(345-400)crates/arkflow-core/src/object_storage.rs (4)
new
(165-199)new
(389-410)new
(550-559)new
(721-731)
crates/arkflow-core/src/recovery_manager.rs (2)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(130-155)new
(165-199)new
(389-410)new
(550-559)new
(721-731)crates/arkflow-core/src/checkpoint_manager.rs (1)
new
(97-118)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (6)
crates/arkflow-core/src/distributed_ack_config.rs (1)
104-116
: Builder defaults and new(cluster_id) look good.Clear, composable config surface; default values match other modules.
crates/arkflow-core/src/stream/mod.rs (3)
443-451
: Use message_id and a clear ack_type; don’t send source_id as payload.Match reliable_stream semantics: use ack_type "success" and message UUID for payload.
- let ack_id = - AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); + let ack_id = + AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); if let Err(e) = processor .ack( ack.clone(), - "stream".to_string(), - ack_id.source_id.as_bytes().to_vec(), + "success".to_string(), + ack_id.message_id.as_bytes().to_vec(), ) .await
79-102
: Pass and store tracker/token in new_reliable().So Stream can cancel and wait.
- pub fn new_reliable( + pub fn new_reliable( input: Arc<dyn Input>, pipeline: Pipeline, output: Arc<dyn Output>, error_output: Option<Arc<dyn Output>>, buffer: Option<Arc<dyn Buffer>>, resource: Resource, thread_num: u32, - ack_processor: Arc<ReliableAckProcessor>, + ack_processor: Arc<ReliableAckProcessor>, + ack_task_tracker: TaskTracker, + ack_cancellation_token: CancellationToken, ) -> Self { Self { @@ next_seq: Arc::new(AtomicU64::new(0)), - reliable_ack_processor: Some(ack_processor), + reliable_ack_processor: Some(ack_processor), + ack_task_tracker: Some(ack_task_tracker), + ack_cancellation_token: Some(ack_cancellation_token), } }
587-616
: Clone token before moving into ReliableAckProcessor and pass tracker/token into Stream::new_reliable.Prevents orphaned tasks and enables shutdown.
- let tracker = tokio_util::task::TaskTracker::new(); - let cancellation_token = tokio_util::sync::CancellationToken::new(); + let tracker = tokio_util::task::TaskTracker::new(); + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let stream_ack_token = cancellation_token.clone(); let ack_processor = std::sync::Arc::new(crate::reliable_ack::ReliableAckProcessor::new( &tracker, cancellation_token, std::path::Path::new(&wal_path), )?); @@ - return Ok(Stream::new_reliable( + return Ok(Stream::new_reliable( input, pipeline, output, error_output, buffer, resource, thread_num, - ack_processor, + ack_processor, + tracker, + stream_ack_token, ));crates/arkflow-core/src/object_storage.rs (2)
382-411
: Use ContainerClient instead of a hard-coded BlobClient
Initializingclient
with.blob_client("dummy")
fixes the blob name at construction and breaks operations on arbitrary blobs. Store theContainerClient
(fromStorageAccountClient::container_client
) inAzureStorage
and call.blob_client(key)
per operation. Verify the exactazure_storage_blobs
0.20.x API for types and methods.
415-541
: Use ContainerClient and convert Azure timestamps to SystemTime
- Store a
ContainerClient
inAzureStorage
and replaceself.client.as_container_client().blob_client(key)
withself.container_client.blob_client(key)
.- Convert Azure’s
DateTime<Utc>
(e.g.blob.properties.last_modified
) intostd::time::SystemTime
viato_chrono().to_std().unwrap_or(SystemTime::UNIX_EPOCH)
.- Flatten
response.data
chunks into a singleVec<u8>
or use the SDK’s streaming helper.Confirm your Azure SDK version and update method names/types accordingly.
let checkpoint_info: CheckpointInfo = serde_json::from_slice(&data) | ||
.map_err(|e| Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)))?; | ||
|
||
Ok(checkpoint_info) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compressed checkpoints cannot be loaded (JSON deserialization will fail). Add decompress fallback.
If enable_compression was used during creation, loading must attempt decompression.
- let checkpoint_info: CheckpointInfo = serde_json::from_slice(&data)
- .map_err(|e| Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)))?;
+ let checkpoint_info: CheckpointInfo = match serde_json::from_slice(&data) {
+ Ok(v) => v,
+ Err(_) => {
+ let decompressed = Self::decompress_data(&data)?;
+ serde_json::from_slice(&decompressed).map_err(|e| {
+ Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e))
+ })?
+ }
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let checkpoint_info: CheckpointInfo = serde_json::from_slice(&data) | |
.map_err(|e| Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)))?; | |
Ok(checkpoint_info) | |
} | |
let checkpoint_info: CheckpointInfo = match serde_json::from_slice(&data) { | |
Ok(v) => v, | |
Err(_) => { | |
let decompressed = Self::decompress_data(&data)?; | |
serde_json::from_slice(&decompressed).map_err(|e| { | |
Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)) | |
})? | |
} | |
}; | |
Ok(checkpoint_info) | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/checkpoint_manager.rs around lines 158-162, the code
only attempts serde_json::from_slice(&data) which fails for compressed
checkpoints; change the logic so that on JSON deserialization error you attempt
to decompress `data` (use the project's compression utility or a chosen
algorithm used during creation, e.g., zstd/gzip) and then call
serde_json::from_slice on the decompressed bytes; map any decompression or
subsequent deserialization errors into the same Error::Unknown form so callers
get a single error type, and ensure successful decompression path returns the
deserialized CheckpointInfo.
use crate::node_registry::{create_node_registry, NodeInfo, NodeRegistryManager, NodeStatus}; | ||
use crate::recovery_manager::RecoveryManager; | ||
use crate::reliable_ack::{AckRecord, AckTask, ReliableAckProcessorMetrics}; | ||
use crate::Error; | ||
use flume::{Receiver, Sender}; | ||
use std::collections::HashMap; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::{Duration, Instant, SystemTime}; | ||
use tokio::sync::RwLock; | ||
use tokio_util::sync::CancellationToken; | ||
use tokio_util::task::TaskTracker; | ||
use tracing::{debug, error, info, warn}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private fields on NodeRegistryManager/RecoveryManager; also need a NodeRegistry handle.
Add a NodeRegistry field to the processor and avoid touching private internals.
-use crate::node_registry::{create_node_registry, NodeInfo, NodeRegistryManager, NodeStatus};
+use crate::node_registry::{
+ create_node_registry, NodeInfo, NodeRegistry, NodeRegistryManager, NodeStatus,
+};
@@
pub struct DistributedAckProcessor {
@@
recovery_manager: Option<Arc<RecoveryManager>>,
+ node_registry: Option<Arc<dyn NodeRegistry>>,
@@
let distributed_wal = Arc::new(DistributedWAL::new(wal_config).await?);
self.distributed_wal = Some(distributed_wal.clone());
@@
- let node_registry = create_node_registry(
+ let node_registry = create_node_registry(
self.config.node_registry.coordinator_type.clone(),
self.cluster_id.clone(),
)
.await?;
@@
let node_registry_manager = Arc::new(
NodeRegistryManager::new(
self.node_id.clone(),
node_registry.clone(),
self.config.node_registry.coordinator_type.clone(),
)
.await?,
);
self.node_registry_manager = Some(node_registry_manager.clone());
+ self.node_registry = Some(node_registry);
Also applies to: 45-65, 223-261
🤖 Prompt for AI Agents
In crates/arkflow-core/src/distributed_ack_processor.rs around lines 24-36 (and
also apply similar fixes at 45-65 and 223-261), the code currently accesses
private fields of NodeRegistryManager and RecoveryManager directly; add a
NodeRegistry (or NodeRegistryHandle) field to DistributedAckProcessor struct and
use the public API on NodeRegistryManager (e.g., create_node_registry or
provided getters) and RecoveryManager methods instead of touching internals.
Update the constructor to accept and store the NodeRegistry handle (Arc or
cloneable handle) and replace all direct private-field accesses with calls to
the public methods on the new NodeRegistry field and RecoveryManager public API;
ensure usages at the other listed line ranges are updated consistently to use
the handle and public methods only.
// Start metrics collection task | ||
let metrics = self.metrics.clone(); | ||
let checkpoint_manager = self.checkpoint_manager.clone(); | ||
let recovery_manager = self.recovery_manager.clone(); | ||
let distributed_wal = self.distributed_wal.clone(); | ||
|
||
tracker.spawn(async move { | ||
Self::metrics_collection_task( | ||
metrics, | ||
checkpoint_manager, | ||
recovery_manager, | ||
distributed_wal, | ||
cancellation_token, | ||
) | ||
.await; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
metrics_collection_task reads private fields from RecoveryManager; pass NodeRegistry into the task instead.
Avoids privacy violations and unwraps.
- // Start metrics collection task
+ // Start metrics collection task
let metrics = self.metrics.clone();
- let checkpoint_manager = self.checkpoint_manager.clone();
- let recovery_manager = self.recovery_manager.clone();
+ let checkpoint_manager = self.checkpoint_manager.clone();
+ let node_registry = self.node_registry.clone();
let distributed_wal = self.distributed_wal.clone();
@@
- Self::metrics_collection_task(
+ Self::metrics_collection_task(
metrics,
checkpoint_manager,
- recovery_manager,
+ node_registry,
distributed_wal,
cancellation_token,
)
.await;
});
- async fn metrics_collection_task(
+ async fn metrics_collection_task(
metrics: DistributedAckProcessorMetrics,
checkpoint_manager: Option<Arc<CheckpointManager>>,
- recovery_manager: Option<Arc<RecoveryManager>>,
+ node_registry: Option<Arc<dyn NodeRegistry>>,
distributed_wal: Option<Arc<DistributedWAL>>,
cancellation_token: CancellationToken,
) {
@@
- // Update cluster nodes count
- if let Some(ref recovery_manager) = recovery_manager {
- if let Ok(active_nodes) = recovery_manager.node_registry.get_active_nodes().await {
- metrics.cluster_nodes.store(active_nodes.len() as u64, Ordering::Relaxed);
- }
- }
+ // Update cluster nodes count
+ if let Some(ref registry) = node_registry {
+ if let Ok(active_nodes) = registry.get_active_nodes().await {
+ metrics.cluster_nodes.store(active_nodes.len() as u64, Ordering::Relaxed);
+ }
+ }
Also applies to: 580-616
🤖 Prompt for AI Agents
In crates/arkflow-core/src/distributed_ack_processor.rs around lines 291 to 306
(and similarly at 580–616), the async metrics_collection_task closure is being
given RecoveryManager which causes the task to access RecoveryManager private
fields; instead pass a clone of NodeRegistry into the spawned task and update
the metrics_collection_task signature and call sites to accept NodeRegistry (or
an Arc/handle to it) rather than RecoveryManager. Fix by: replace passing
recovery_manager.clone() with node_registry.clone(), update the
metrics_collection_task parameters to take NodeRegistry (or Arc<NodeRegistry>)
and use its public API inside the task, and update any other call sites (e.g.,
at 580–616) to stop passing RecoveryManager and pass NodeRegistry to avoid
privacy violations and unwraps.
async fn put_object(&self, key: &str, data: Vec<u8>) -> Result<(), Error> { | ||
let full_path = self.get_full_path(key); | ||
let dir = std::path::Path::new(&full_path).parent().unwrap(); | ||
|
||
tokio::fs::create_dir_all(dir) | ||
.await | ||
.map_err(|e| Error::Unknown(format!("Failed to create directory: {}", e)))?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential panic: parent() unwrap when key has no parent.
Handle None to avoid panic for keys without a directory component.
async fn put_object(&self, key: &str, data: Vec<u8>) -> Result<(), Error> {
let full_path = self.get_full_path(key);
- let dir = std::path::Path::new(&full_path).parent().unwrap();
+ let dir = std::path::Path::new(&full_path)
+ .parent()
+ .unwrap_or(std::path::Path::new(&self.base_path));
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn put_object(&self, key: &str, data: Vec<u8>) -> Result<(), Error> { | |
let full_path = self.get_full_path(key); | |
let dir = std::path::Path::new(&full_path).parent().unwrap(); | |
tokio::fs::create_dir_all(dir) | |
.await | |
.map_err(|e| Error::Unknown(format!("Failed to create directory: {}", e)))?; | |
async fn put_object(&self, key: &str, data: Vec<u8>) -> Result<(), Error> { | |
let full_path = self.get_full_path(key); | |
let dir = std::path::Path::new(&full_path) | |
.parent() | |
.unwrap_or(std::path::Path::new(&self.base_path)); | |
tokio::fs::create_dir_all(dir) | |
.await | |
.map_err(|e| Error::Unknown(format!("Failed to create directory: {}", e)))?; | |
// ... | |
} |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/object_storage.rs around lines 740 to 747, calling
.parent().unwrap() can panic when the key/full_path has no parent directory;
change this to handle None safely by checking parent() first (e.g., if let
Some(dir) = Path::new(&full_path).parent() {
tokio::fs::create_dir_all(dir).await.map_err(|e| Error::Unknown(format!("Failed
to create directory: {}", e)))?; } ), and skip directory creation when there is
no parent instead of unwrapping.
Ok(recovered_records) => { | ||
recovery_info.status = RecoveryStatus::Completed { | ||
recovered_records, | ||
duplicates_removed: recovery_info.statistics.duplicate_records, | ||
}; | ||
info!("Recovery {} completed successfully", recovery_id); | ||
} | ||
Err(e) => { | ||
recovery_info.status = RecoveryStatus::Failed { | ||
error: e.to_string(), | ||
}; | ||
error!("Recovery {} failed: {}", recovery_id, e); | ||
return Err(e); | ||
} | ||
} | ||
|
||
// Update history with final status | ||
{ | ||
let mut history = self.recovery_history.write().await; | ||
if let Some(last) = history.last_mut() { | ||
*last = recovery_info.clone(); | ||
} | ||
} | ||
|
||
Ok(recovery_info) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure recovery failures are recorded in history.
On the error branch we return Err(e)
before updating recovery_history
, so callers querying get_recovery_history()
keep seeing an “InProgress” entry even though the run already failed. That’s misleading and makes post-mortem automation impossible. Update history before returning and let both success and failure go through a single exit path.
- match result {
- Ok(recovered_records) => {
- recovery_info.status = RecoveryStatus::Completed {
- recovered_records,
- duplicates_removed: recovery_info.statistics.duplicate_records,
- };
- info!("Recovery {} completed successfully", recovery_id);
- }
- Err(e) => {
- recovery_info.status = RecoveryStatus::Failed {
- error: e.to_string(),
- };
- error!("Recovery {} failed: {}", recovery_id, e);
- return Err(e);
- }
- }
-
- {
- let mut history = self.recovery_history.write().await;
- if let Some(last) = history.last_mut() {
- *last = recovery_info.clone();
- }
- }
-
- Ok(recovery_info)
+ let result = match result {
+ Ok(recovered_records) => {
+ recovery_info.status = RecoveryStatus::Completed {
+ recovered_records,
+ duplicates_removed: recovery_info.statistics.duplicate_records,
+ };
+ info!("Recovery {} completed successfully", recovery_id);
+ Ok(())
+ }
+ Err(e) => {
+ recovery_info.status = RecoveryStatus::Failed {
+ error: e.to_string(),
+ };
+ error!("Recovery {} failed: {}", recovery_id, e);
+ Err(e)
+ }
+ };
+
+ {
+ let mut history = self.recovery_history.write().await;
+ if let Some(last) = history.last_mut() {
+ *last = recovery_info.clone();
+ }
+ }
+
+ result.map(|_| recovery_info)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Ok(recovered_records) => { | |
recovery_info.status = RecoveryStatus::Completed { | |
recovered_records, | |
duplicates_removed: recovery_info.statistics.duplicate_records, | |
}; | |
info!("Recovery {} completed successfully", recovery_id); | |
} | |
Err(e) => { | |
recovery_info.status = RecoveryStatus::Failed { | |
error: e.to_string(), | |
}; | |
error!("Recovery {} failed: {}", recovery_id, e); | |
return Err(e); | |
} | |
} | |
// Update history with final status | |
{ | |
let mut history = self.recovery_history.write().await; | |
if let Some(last) = history.last_mut() { | |
*last = recovery_info.clone(); | |
} | |
} | |
Ok(recovery_info) | |
} | |
let result = match result { | |
Ok(recovered_records) => { | |
recovery_info.status = RecoveryStatus::Completed { | |
recovered_records, | |
duplicates_removed: recovery_info.statistics.duplicate_records, | |
}; | |
info!("Recovery {} completed successfully", recovery_id); | |
Ok(()) | |
} | |
Err(e) => { | |
recovery_info.status = RecoveryStatus::Failed { | |
error: e.to_string(), | |
}; | |
error!("Recovery {} failed: {}", recovery_id, e); | |
Err(e) | |
} | |
}; | |
{ | |
let mut history = self.recovery_history.write().await; | |
if let Some(last) = history.last_mut() { | |
*last = recovery_info.clone(); | |
} | |
} | |
result.map(|_| recovery_info) |
🤖 Prompt for AI Agents
crates/arkflow-core/src/recovery_manager.rs around lines 379 to 404: the error
branch returns before updating recovery_history so failed runs remain marked
InProgress; capture the error string (e.g. let err_str = e.to_string()), set
recovery_info.status = RecoveryStatus::Failed { error: err_str.clone() }, call
error! with err_str or e, then perform the same history update block to write
the final status into self.recovery_history before returning Err(e); refactor so
both success and failure follow the same final-history-update and single exit
path.
let mut config = DistributedAckConfig::for_local_testing("fallback-test-cluster".to_string()); | ||
config.enabled = false; | ||
config.local_wal_path = temp_dir | ||
.path() | ||
.join("local_wal") | ||
.to_string_lossy() | ||
.to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix invalid field access on DistributedAckConfig
.
DistributedAckConfig
doesn’t expose a top-level local_wal_path
; the path lives under wal.local_wal_path
and is meant to be adjusted through the with_local_wal_path
helper. The current assignment fails to compile (no such field), breaking the entire test module. Please switch back to the builder/helper when overriding the WAL path.
- let mut config = DistributedAckConfig::for_local_testing("fallback-test-cluster".to_string());
- config.enabled = false;
- config.local_wal_path = temp_dir
- .path()
- .join("local_wal")
- .to_string_lossy()
- .to_string();
+ let mut config = DistributedAckConfig::for_local_testing("fallback-test-cluster".to_string())
+ .with_local_wal_path(
+ temp_dir
+ .path()
+ .join("local_wal")
+ .to_string_lossy()
+ .to_string(),
+ );
+ config.enabled = false;
🤖 Prompt for AI Agents
In crates/arkflow-core/tests/distributed_ack_integration_test.rs around lines
273 to 279, the test attempts to set a nonexistent top-level field
config.local_wal_path which doesn't exist; instead use the provided helper or
nested field. Replace the direct assignment with the builder/helper: call
DistributedAckConfig::for_local_testing(...).with_local_wal_path(temp_dir.path().join("local_wal").to_string_lossy().to_string())
or mutate config.wal.local_wal_path if you must access the nested struct; ensure
you remove the invalid config.local_wal_path assignment so the test compiles.
let yaml_content = format!( | ||
r#" | ||
input: | ||
type: memory | ||
name: test-input | ||
config: | ||
data: "test message" | ||
pipeline: | ||
thread_num: 2 | ||
processors: [] | ||
output: | ||
type: stdout | ||
name: test-output | ||
config: {{}} | ||
reliable_ack: | ||
enabled: true | ||
wal_path: "{}" | ||
max_pending_acks: 1000 | ||
max_retries: 3 | ||
retry_delay_ms: 500 | ||
enable_backpressure: true | ||
"#, | ||
wal_path.to_str().unwrap() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix YAML wal_path quoting for Windows compatibility
Embedding the temp dir path inside a double-quoted YAML string breaks on Windows: a path such as C:\Temp\...
introduces sequences like \T
/\U
, which YAML treats as invalid escapes, causing the parse to fail. Wrap the substitution in single quotes (or otherwise escape the backslashes) so the test remains cross-platform.
- wal_path: "{}"
+ wal_path: '{}'
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let yaml_content = format!( | |
r#" | |
input: | |
type: memory | |
name: test-input | |
config: | |
data: "test message" | |
pipeline: | |
thread_num: 2 | |
processors: [] | |
output: | |
type: stdout | |
name: test-output | |
config: {{}} | |
reliable_ack: | |
enabled: true | |
wal_path: "{}" | |
max_pending_acks: 1000 | |
max_retries: 3 | |
retry_delay_ms: 500 | |
enable_backpressure: true | |
"#, | |
wal_path.to_str().unwrap() | |
); | |
let yaml_content = format!( | |
r#" | |
input: | |
type: memory | |
name: test-input | |
config: | |
data: "test message" | |
pipeline: | |
thread_num: 2 | |
processors: [] | |
output: | |
type: stdout | |
name: test-output | |
config: {{}} | |
reliable_ack: | |
enabled: true | |
wal_path: '{}' | |
max_pending_acks: 1000 | |
max_retries: 3 | |
retry_delay_ms: 500 | |
enable_backpressure: true | |
"#, | |
wal_path.to_str().unwrap() | |
); |
🤖 Prompt for AI Agents
In crates/arkflow-core/tests/reliable_ack_integration_test.rs around lines
28–51, the YAML embeds wal_path inside double quotes which causes backslash
escapes (e.g. \T) to be interpreted on Windows and breaks parsing; change the
generated YAML so the inserted path is wrapped in single quotes (e.g. wal_path:
'{}') or alternatively escape backslashes before formatting (e.g. replace "\"
with "\\\\" on the path) so Windows paths are preserved and the test becomes
cross-platform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
🧹 Nitpick comments (10)
crates/arkflow-core/src/checkpoint_manager.rs (4)
219-232
: Prefer full UUID for IDs; truncating reduces uniqueness without clear benefit.Using only the first UUID segment increases collision risk. Use the full Uuid string.
Apply this diff:
- uuid::Uuid::new_v4() - .to_string() - .split('-') - .next() - .unwrap_or("unknown") + uuid::Uuid::new_v4().to_string()
559-562
: Remove double clone in get_recovery_point.Minor clean-up.
Apply this diff:
- Ok(valid_checkpoints.first().cloned().cloned()) + Ok(valid_checkpoints.first().map(|cp| (*cp).clone()))
89-92
: Keying the cache by SystemTime can be unstable; consider a deterministic key.System time can go backwards or collide. Prefer sequence or (timestamp_millis, checkpoint_id) as key.
399-427
: MD5 is weak for integrity. Consider SHA-256 or BLAKE3.For non-adversarial checks it’s fine, but upgrading is a low-effort improvement.
crates/arkflow-core/src/distributed_ack_processor.rs (4)
216-219
: Remove unnecessary mut.checkpoint_config isn’t mutated.
Apply this diff:
- let mut checkpoint_config = self.config.checkpoint.clone(); + let checkpoint_config = self.config.checkpoint.clone();
426-427
: Remove unused variable.record is computed then discarded in the fallback path.
Apply this diff:
- let record = task.to_record(); - // For fallback mode, we just send to the fallback processor + // In fallback mode, we just send to the fallback processor
36-42
: MAX_RETRIES is defined but not used.Either wire it into retry policy or remove it.
620-656
: Consider passing NodeRegistry into metrics task instead of RecoveryManager.Avoids an extra indirection and keeps the task decoupled.
crates/arkflow-core/src/distributed_ack_integration.rs (2)
55-69
: Use tracing for consistency (optional).The codebase primarily uses tracing; consider tracing::error! here.
155-167
: Payload is ignored in DistributedReliableAckProcessor::ack.If payload is part of reliability semantics, this silently drops it. If intentional, consider documenting; else, forward it.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (19)
crates/arkflow-core/Cargo.toml
(1 hunks)crates/arkflow-core/examples/distributed_ack_example.toml
(1 hunks)crates/arkflow-core/examples/distributed_ack_usage.rs
(1 hunks)crates/arkflow-core/src/checkpoint_manager.rs
(1 hunks)crates/arkflow-core/src/distributed_ack_init.rs
(1 hunks)crates/arkflow-core/src/distributed_ack_integration.rs
(1 hunks)crates/arkflow-core/src/distributed_ack_processor.rs
(1 hunks)crates/arkflow-core/src/distributed_wal.rs
(1 hunks)crates/arkflow-core/src/input/distributed_ack_input.rs
(1 hunks)crates/arkflow-core/src/input/mod.rs
(1 hunks)crates/arkflow-core/src/lib.rs
(1 hunks)crates/arkflow-core/src/node_registry.rs
(1 hunks)crates/arkflow-core/src/object_storage.rs
(1 hunks)crates/arkflow-core/src/processor/distributed_ack_processor.rs
(1 hunks)crates/arkflow-core/src/processor/mod.rs
(1 hunks)crates/arkflow-core/src/recovery_manager.rs
(1 hunks)crates/arkflow-core/src/reliable_ack.rs
(1 hunks)crates/arkflow-core/src/stream/distributed_ack_stream.rs
(1 hunks)crates/arkflow-core/src/stream/mod.rs
(10 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/arkflow-core/examples/distributed_ack_example.toml
🧰 Additional context used
🧬 Code graph analysis (15)
crates/arkflow-core/src/stream/distributed_ack_stream.rs (3)
crates/arkflow-core/src/checkpoint_manager.rs (1)
new
(96-117)crates/arkflow-core/src/distributed_ack_integration.rs (5)
new
(38-50)new
(78-83)new
(121-129)new
(149-153)new
(175-177)crates/arkflow-core/src/distributed_ack_processor.rs (1)
new
(116-194)
crates/arkflow-core/src/stream/mod.rs (5)
crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)crates/arkflow-core/src/reliable_ack.rs (4)
new
(77-86)new
(182-195)ack
(101-103)ack
(329-373)crates/arkflow-core/src/distributed_ack_integration.rs (2)
ack
(55-68)ack
(155-166)crates/arkflow-core/src/distributed_ack_processor.rs (1)
ack
(385-446)crates/arkflow-core/src/input/mod.rs (3)
ack
(41-41)ack
(60-60)ack
(67-71)
crates/arkflow-core/src/checkpoint_manager.rs (3)
crates/arkflow-core/src/recovery_manager.rs (5)
object_storage
(870-872)new
(145-153)new
(256-286)cluster_id
(885-887)node_id
(890-892)crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(133-158)new
(168-200)new
(390-399)new
(459-466)new
(619-629)crates/arkflow-core/src/distributed_ack_processor.rs (2)
new
(116-194)create_checkpoint
(543-566)
crates/arkflow-core/src/distributed_ack_processor.rs (7)
crates/arkflow-core/src/recovery_manager.rs (8)
checkpoint_manager
(865-867)node_registry
(860-862)default
(56-70)new
(145-153)new
(256-286)node_id
(890-892)cluster_id
(885-887)perform_consistency_check
(713-799)crates/arkflow-core/src/node_registry.rs (10)
create_node_registry
(568-587)default
(61-63)default
(88-98)new
(159-177)new
(475-490)new
(596-601)node_id
(562-564)get_active_nodes
(126-126)get_active_nodes
(406-422)get_active_nodes
(632-646)crates/arkflow-core/src/checkpoint_manager.rs (5)
default
(47-58)new
(96-117)create_checkpoint
(271-378)shutdown
(585-604)get_latest_checkpoint
(477-483)crates/arkflow-core/src/distributed_wal.rs (8)
default
(62-78)default
(103-112)default
(129-140)new
(170-230)start_background_tasks
(233-279)get_metrics
(654-656)create_checkpoint
(664-694)shutdown
(697-714)crates/arkflow-core/src/reliable_ack.rs (6)
default
(161-172)ack_type
(106-108)ack
(101-103)ack
(329-373)payload
(111-113)get_metrics
(383-385)crates/arkflow-core/src/distributed_ack_config.rs (4)
default
(47-57)default
(70-77)default
(94-101)for_local_testing
(252-324)crates/arkflow-core/src/distributed_ack_integration.rs (7)
new
(38-50)new
(78-83)new
(121-129)new
(149-153)process
(134-136)ack
(55-68)ack
(155-166)
crates/arkflow-core/src/distributed_ack_integration.rs (3)
crates/arkflow-core/src/distributed_ack_processor.rs (2)
new
(116-194)ack
(385-446)crates/arkflow-core/src/input/mod.rs (6)
ack
(41-41)ack
(60-60)ack
(67-71)connect
(47-47)read
(50-50)close
(53-53)crates/arkflow-core/src/processor/mod.rs (2)
close
(37-37)process
(34-34)
crates/arkflow-core/src/distributed_wal.rs (2)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(133-158)new
(168-200)new
(390-399)new
(459-466)new
(619-629)crates/arkflow-core/src/reliable_ack.rs (2)
serde_json
(225-225)append
(197-214)
crates/arkflow-core/src/processor/distributed_ack_processor.rs (5)
crates/arkflow-core/src/input/distributed_ack_input.rs (1)
build
(43-79)crates/arkflow-core/src/processor/mod.rs (3)
build
(52-63)build
(67-72)register_processor_builder
(75-88)crates/arkflow-core/src/stream/mod.rs (1)
build
(560-653)crates/arkflow-core/src/distributed_ack_integration.rs (5)
new
(38-50)new
(78-83)new
(121-129)new
(149-153)new
(175-177)crates/arkflow-core/src/distributed_ack_processor.rs (1)
new
(116-194)
crates/arkflow-core/src/input/distributed_ack_input.rs (3)
crates/arkflow-core/src/input/mod.rs (3)
build
(31-36)build
(106-117)register_input_builder
(120-133)crates/arkflow-core/src/processor/distributed_ack_processor.rs (1)
build
(43-80)crates/arkflow-core/src/distributed_ack_integration.rs (5)
new
(38-50)new
(78-83)new
(121-129)new
(149-153)new
(175-177)
crates/arkflow-core/src/lib.rs (3)
crates/arkflow-core/src/recovery_manager.rs (3)
checkpoint_manager
(865-867)node_registry
(860-862)object_storage
(870-872)crates/arkflow-core/src/stream/mod.rs (1)
output
(392-470)crates/arkflow-core/src/reliable_stream.rs (1)
output
(391-476)
crates/arkflow-core/examples/distributed_ack_usage.rs (3)
crates/arkflow-core/src/distributed_ack_init.rs (1)
init_distributed_ack_components
(22-42)crates/arkflow-core/src/distributed_ack_integration.rs (5)
new
(38-50)new
(78-83)new
(121-129)new
(149-153)new
(175-177)crates/arkflow-core/src/distributed_ack_processor.rs (1)
new
(116-194)
crates/arkflow-core/src/reliable_ack.rs (3)
crates/arkflow-core/src/reliable_stream.rs (4)
flume
(101-101)flume
(103-103)new
(58-80)run
(89-172)crates/arkflow-core/src/distributed_ack_processor.rs (2)
ack
(385-446)run
(742-792)crates/arkflow-core/src/input/mod.rs (3)
ack
(41-41)ack
(60-60)ack
(67-71)
crates/arkflow-core/src/distributed_ack_init.rs (2)
crates/arkflow-core/src/input/distributed_ack_input.rs (1)
register_distributed_ack_input_builder
(83-88)crates/arkflow-core/src/processor/distributed_ack_processor.rs (1)
register_distributed_ack_processor_builder
(84-89)
crates/arkflow-core/src/object_storage.rs (4)
crates/arkflow-core/src/checkpoint_manager.rs (2)
new
(96-117)default
(47-58)crates/arkflow-core/src/distributed_wal.rs (4)
new
(170-230)default
(62-78)default
(103-112)default
(129-140)crates/arkflow-core/src/node_registry.rs (5)
new
(159-177)new
(475-490)new
(596-601)default
(61-63)default
(88-98)crates/arkflow-core/src/recovery_manager.rs (3)
new
(145-153)new
(256-286)default
(56-70)
crates/arkflow-core/src/recovery_manager.rs (1)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(133-158)new
(168-200)new
(390-399)new
(459-466)new
(619-629)
crates/arkflow-core/src/node_registry.rs (2)
crates/arkflow-core/src/object_storage.rs (5)
create_object_storage
(133-158)new
(168-200)new
(390-399)new
(459-466)new
(619-629)crates/arkflow-core/src/distributed_ack_config.rs (3)
default
(47-57)default
(70-77)default
(94-101)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (13)
crates/arkflow-core/src/stream/mod.rs (2)
439-455
: Send the unique AckId through the reliable ack processorWe mint an
AckId
with a UUID but then send onlyack_id.source_id
(“stream”) as the payload while also using the static ack type “stream”. That collapses every ack into the same key, defeating dedup/backoff logic in the reliable ack processor. Use the UUID (message_id
) — or serialize the fullAckId
— so each ack remains unique.- let ack_id = - AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); - if let Err(e) = processor - .ack( - ack.clone(), - "stream".to_string(), - ack_id.source_id.as_bytes().to_vec(), - ) + let ack_id = + AckId::new("stream".to_string(), uuid::Uuid::new_v4().to_string()); + if let Err(e) = processor + .ack( + ack.clone(), + "stream_success".to_string(), + ack_id.message_id.as_bytes().to_vec(), + )
620-626
: Retain the tracker/token created forReliableAckProcessor::new
We instantiate a
TaskTracker
andCancellationToken
solely to bootstrap the reliable ack processor, then drop both immediately. As noted previously, losing these handles prevents graceful shutdown and can abort or leak the processor’s background tasks. Please persist them (e.g., store clones on theStream
alongsidereliable_ack_processor
, or have the processor own them internally) so we can cancel/await the worker during stream teardown.crates/arkflow-core/src/distributed_ack_integration.rs (1)
92-107
: Good: distributed ack wrapper preserves original Ack and adds a unique ack_id.This is a clean composition point for the distributed path.
crates/arkflow-core/src/checkpoint_manager.rs (1)
153-161
: Compressed or wrapped checkpoints won't load — add decompress and "checkpoint" wrapper fallback.Current code only attempts direct JSON into CheckpointInfo. This fails for compressed payloads and for manual checkpoints saved with additional_data (wrapped as {"checkpoint": ..., "additional": ...}). Add graceful fallbacks.
Apply this diff:
- let checkpoint_info: CheckpointInfo = serde_json::from_slice(&data) - .map_err(|e| Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)))?; - - Ok(checkpoint_info) + // Try direct parse + if let Ok(cp) = serde_json::from_slice::<CheckpointInfo>(&data) { + return Ok(cp); + } + + // Try wrapper parse on plain bytes + if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&data) { + if let Some(checkpoint) = v.get("checkpoint") { + if let Ok(cp) = serde_json::from_value::<CheckpointInfo>(checkpoint.clone()) { + return Ok(cp); + } + } + } + + // Try decompress -> direct parse + let decompressed = Self::decompress_data(&data)?; + if let Ok(cp) = serde_json::from_slice::<CheckpointInfo>(&decompressed) { + return Ok(cp); + } + + // Try decompress -> wrapper parse + let v: serde_json::Value = serde_json::from_slice(&decompressed).map_err(|e| { + Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)) + })?; + if let Some(checkpoint) = v.get("checkpoint") { + let cp: CheckpointInfo = serde_json::from_value(checkpoint.clone()).map_err(|e| { + Error::Unknown(format!("Failed to deserialize checkpoint info: {}", e)) + })?; + return Ok(cp); + } + + Err(Error::Unknown( + "Invalid checkpoint payload: neither CheckpointInfo nor wrapper format".to_string(), + ))Based on learnings
crates/arkflow-core/src/distributed_ack_processor.rs (1)
468-486
: Avoid unwraps in async calls (panic risk). Return safe defaults.Map failures to sane values instead of panicking.
Apply this diff:
- let total_nodes = if let Some(ref recovery_manager) = self.recovery_manager { - recovery_manager - .node_registry() - .get_all_nodes() - .await - .unwrap() - .len() - } else { - 1 - }; - - let active_nodes = if let Some(ref recovery_manager) = self.recovery_manager { - recovery_manager - .node_registry() - .get_active_nodes() - .await - .unwrap() - .len() - } else { - 1 - }; + let total_nodes = match &self.recovery_manager { + Some(rm) => rm + .node_registry() + .get_all_nodes() + .await + .map(|v| v.len()) + .unwrap_or(1), + None => 1, + }; + + let active_nodes = match &self.recovery_manager { + Some(rm) => rm + .node_registry() + .get_active_nodes() + .await + .map(|v| v.len()) + .unwrap_or(1), + None => 1, + };crates/arkflow-core/src/recovery_manager.rs (1)
378-403
: Don’t return before writing the failed status to historyWe’re still returning
Err(e)
(Line 391) before rewritingrecovery_history
, so failed runs remain stuck atInProgress
—exactly the issue raised in the previous review. Refactor so both success and failure flow through the same history-update block and only return afterwards.- match result { + let result = match result { Ok(recovered_records) => { recovery_info.status = RecoveryStatus::Completed { recovered_records, duplicates_removed: recovery_info.statistics.duplicate_records, }; info!("Recovery {} completed successfully", recovery_id); + Ok(()) } Err(e) => { recovery_info.status = RecoveryStatus::Failed { error: e.to_string(), }; error!("Recovery {} failed: {}", recovery_id, e); - return Err(e); + Err(e) } } // Update history with final status { let mut history = self.recovery_history.write().await; if let Some(last) = history.last_mut() { *last = recovery_info.clone(); } } - Ok(recovery_info) + result.map(|_| recovery_info)crates/arkflow-core/src/object_storage.rs (1)
638-645
: Avoid panic when storing keys without a parent directory
Path::parent().unwrap()
will panic for keys like"foo"
or"foo.json"
, breaking local testing. HandleNone
safely (this was called out in the previous review).- let dir = std::path::Path::new(&full_path).parent().unwrap(); - - tokio::fs::create_dir_all(dir) + if let Some(dir) = std::path::Path::new(&full_path).parent() { + tokio::fs::create_dir_all(dir) .await .map_err(|e| Error::Unknown(format!("Failed to create directory: {}", e)))?; + }crates/arkflow-core/src/reliable_ack.rs (1)
291-327
: Register default ack types synchronouslySpawning an async task to seed the registry means recovery can start before
"noop"
is registered, so recovered tasks are silently dropped (same bug called out earlier). Populate the defaults inline.- let ack_registry_for_spawn = ack_registry.clone(); - tokio::spawn(async move { - Self::register_default_ack_types(&ack_registry_for_spawn).await; - }); + Self::register_default_ack_types(&ack_registry); @@ - async fn register_default_ack_types( + fn register_default_ack_types( registry: &Arc<Mutex<HashMap<String, Box<dyn Fn(&Vec<u8>) -> Arc<dyn Ack> + Send + Sync>>>>, ) { - let mut registry = registry.lock().await; + let mut registry = registry + .try_lock() + .expect("new registry mutex should be uncontended"); registry.insert( "noop".to_string(), Box::new(|_| Arc::new(crate::input::NoopAck)), ); }crates/arkflow-core/src/distributed_wal.rs (3)
404-411
: Stop zeroing batches on serialization failureLine 405 still turns a
serde_json::to_vec
error into an empty payload and proceeds as if the upload succeeded, which permanently drops those acknowledgements. We must surface the error and leave the tasks queued for retry (this was already flagged earlier).- let json_data = serde_json::to_vec(&batch_data) - .map_err(|e| { - error!("Failed to serialize batch data: {}", e); - e - }) - .unwrap_or_default(); + let json_data = match serde_json::to_vec(&batch_data) { + Ok(data) => data, + Err(e) => { + metrics + .failed_uploads + .fetch_add(tasks.len() as u64, Ordering::Relaxed); + error!("Failed to serialize batch data: {}", e); + batch.extend(tasks.into_iter()); + continue; + } + };
413-443
: Requeue failed uploads instead of discarding themWhen
put_object
fails, we log and drop the drained tasks. That ignoresretry_count
, violatesmax_retry_attempts
, and silently loses ACKs—same critical bug called out previously. Please requeue each task (incrementingretry_count
and respecting the retry budget) so they aren’t lost.- Err(e) => { - failed_uploads += tasks.len(); - metrics - .failed_uploads - .fetch_add(tasks.len() as u64, Ordering::Relaxed); - error!("Failed to upload batch to {}: {}", object_key, e); - } + Err(e) => { + failed_uploads += tasks.len(); + metrics + .failed_uploads + .fetch_add(tasks.len() as u64, Ordering::Relaxed); + error!("Failed to upload batch to {}: {}", object_key, e); + + for mut task in tasks { + let next_retry = task.retry_count + 1; + if next_retry > config.max_retry_attempts { + warn!( + "Giving up on {} after {} attempts", + task.global_id, next_retry + ); + continue; + } + task.retry_count = next_retry; + batch.push(task); + } + continue; + }(You’ll need to thread
config.max_retry_attempts
into this function.)
503-545
: Persist to the local WAL before enqueueing
append
never writes the record toself.local_wal
, so a crash before the upload worker runs still loses the ACK—exactly the durability gap flagged in the earlier review. Please append (and sync) the record to the local WAL first and roll back if queueing fails.- // Queue for upload - match self.upload_queue.send_async(upload_task).await { + if let Err(e) = self + .local_wal + .append(record) + .await + .map_err(|e| Error::Unknown(format!("Failed to append to local WAL: {}", e))) + { + self.metrics.failed_uploads.fetch_add(1, Ordering::Relaxed); + self.metrics.pending_uploads.fetch_sub(1, Ordering::Relaxed); + return Err(e); + } + + // Queue for upload + match self.upload_queue.send_async(upload_task).await { Ok(_) => {(If
AckWAL
exposessync
, call it here as well.)crates/arkflow-core/src/node_registry.rs (2)
288-293
: Fix the heartbeat age calculation
node.last_heartbeat.duration_since(SystemTime::now())
returnsErr
for every heartbeat in the past, so every node is marked dead. Compute the age from “now” instead, as already requested in the earlier review.- match node.last_heartbeat.duration_since(SystemTime::now()) { - Ok(duration) => duration <= self.node_timeout, - Err(_) => false, - } + match SystemTime::now().duration_since(node.last_heartbeat) { + Ok(age) => age <= self.node_timeout, + Err(_) => false, + }
632-676
: Same heartbeat inversion in the in-memory registryThe in-memory implementation repeats the same mistake:
last_heartbeat.duration_since(now)
yieldsErr
for past timestamps, so active nodes immediately disappear. Please flip the subtraction here too (mirroring the earlier feedback).- node.last_heartbeat - .duration_since(now) - .map(|d| d.as_secs() < 90) - .unwrap_or(false) + now.duration_since(node.last_heartbeat) + .map(|d| d.as_secs() < 90) + .unwrap_or(false)Apply the same fix in
is_node_alive
andget_cluster_info
.
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | ||
tracker.clone(), | ||
cancellation_token.clone(), | ||
&distributed_ack_config, | ||
) | ||
.await?; | ||
|
||
// Wrap input with distributed acknowledgment | ||
let builder = DistributedAckBuilder::new(distributed_ack_config); | ||
let wrapped_input = builder.wrap_input(input, Arc::new(distributed_processor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix DistributedAckProcessor::new invocation (compile blocker).
DistributedAckProcessor::new
is defined as new(tracker: &TaskTracker, cancellation_token: CancellationToken, config: DistributedAckConfig)
(see crates/arkflow-core/src/distributed_ack_processor.rs), but here we pass an owned TaskTracker
and a &DistributedAckConfig
. This will not compile. Pass &tracker
and clone the config before moving it into the builder.
- arkflow_core::distributed_ack_processor::DistributedAckProcessor::new(
- tracker.clone(),
- cancellation_token.clone(),
- &distributed_ack_config,
+ arkflow_core::distributed_ack_processor::DistributedAckProcessor::new(
+ &tracker,
+ cancellation_token.clone(),
+ distributed_ack_config.clone(),
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | |
tracker.clone(), | |
cancellation_token.clone(), | |
&distributed_ack_config, | |
) | |
.await?; | |
// Wrap input with distributed acknowledgment | |
let builder = DistributedAckBuilder::new(distributed_ack_config); | |
let wrapped_input = builder.wrap_input(input, Arc::new(distributed_processor)); | |
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | |
&tracker, | |
cancellation_token.clone(), | |
distributed_ack_config.clone(), | |
) | |
.await?; | |
// Wrap input with distributed acknowledgment | |
let builder = DistributedAckBuilder::new(distributed_ack_config); | |
let wrapped_input = builder.wrap_input(input, Arc::new(distributed_processor)); |
🤖 Prompt for AI Agents
In crates/arkflow-core/examples/distributed_ack_usage.rs around lines 191-200,
the call to DistributedAckProcessor::new uses an owned TaskTracker and a
reference to DistributedAckConfig which mismatches the signature; change it to
pass &tracker for the first arg, pass the cancellation token by value (e.g.
cancellation_token.clone()), and pass a cloned config
(distributed_ack_config.clone()) so the config is moved correctly; likewise,
when creating the DistributedAckBuilder, call
DistributedAckBuilder::new(distributed_ack_config.clone()) so the builder
receives an owned config.
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | ||
tracker.clone(), | ||
cancellation_token.clone(), | ||
&distributed_ack_config, | ||
) | ||
.await?; | ||
|
||
// Wrap processor with distributed acknowledgment | ||
let builder = DistributedAckBuilder::new(distributed_ack_config); | ||
let wrapped_processor = builder.wrap_processor(processor, Arc::new(distributed_processor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply the same DistributedAckProcessor::new fix here.
Same signature mismatch as above: we must pass &tracker
and an owned DistributedAckConfig
. Clone the config for the processor before moving the original into the builder.
- arkflow_core::distributed_ack_processor::DistributedAckProcessor::new(
- tracker.clone(),
- cancellation_token.clone(),
- &distributed_ack_config,
+ arkflow_core::distributed_ack_processor::DistributedAckProcessor::new(
+ &tracker,
+ cancellation_token.clone(),
+ distributed_ack_config.clone(),
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | |
tracker.clone(), | |
cancellation_token.clone(), | |
&distributed_ack_config, | |
) | |
.await?; | |
// Wrap processor with distributed acknowledgment | |
let builder = DistributedAckBuilder::new(distributed_ack_config); | |
let wrapped_processor = builder.wrap_processor(processor, Arc::new(distributed_processor)); | |
arkflow_core::distributed_ack_processor::DistributedAckProcessor::new( | |
&tracker, | |
cancellation_token.clone(), | |
distributed_ack_config.clone(), | |
) | |
.await?; | |
// Wrap processor with distributed acknowledgment | |
let builder = DistributedAckBuilder::new(distributed_ack_config); | |
let wrapped_processor = builder.wrap_processor(processor, Arc::new(distributed_processor)); |
🤖 Prompt for AI Agents
In crates/arkflow-core/examples/distributed_ack_usage.rs around lines 265 to
274, the call to DistributedAckProcessor::new uses tracker.clone() and
&distributed_ack_config causing a signature mismatch; change the call to pass
&tracker (a reference to the existing tracker) and an owned DistributedAckConfig
by cloning the config for the processor (e.g. distributed_ack_config.clone()) so
the original distributed_ack_config can then be moved into
DistributedAckBuilder::new(distributed_ack_config) — in short, call
DistributedAckProcessor::new(&tracker, cancellation_token.clone(),
distributed_ack_config.clone()).await? and keep the builder using the original
distributed_ack_config.
let metadata = CheckpointMetadata { | ||
sequence: latest_sequence, | ||
timestamp: SystemTime::now(), | ||
node_id: "auto".to_string(), | ||
cluster_id: cluster_id.to_string(), | ||
checksum: "auto".to_string(), | ||
size_bytes: 0, | ||
compressed: config.enable_compression, | ||
previous_checkpoint, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto-checkpoints set checksum="auto" and may be marked compressed without actually compressing.
- "checksum: auto" guarantees restore_from_checkpoint will fail.
- When enable_compression=true, auto path doesn’t compress the payload but sets compressed flag.
Compute a deterministic checksum and compress like the manual path.
Apply this diff:
- let metadata = CheckpointMetadata {
- sequence: latest_sequence,
- timestamp: SystemTime::now(),
- node_id: "auto".to_string(),
- cluster_id: cluster_id.to_string(),
- checksum: "auto".to_string(),
- size_bytes: 0,
- compressed: config.enable_compression,
- previous_checkpoint,
- };
+ let timestamp = SystemTime::now();
+ let checksum = Self::calculate_checksum(
+ cluster_id,
+ latest_sequence,
+ "auto",
+ timestamp,
+ );
+ let metadata = CheckpointMetadata {
+ sequence: latest_sequence,
+ timestamp,
+ node_id: "auto".to_string(),
+ cluster_id: cluster_id.to_string(),
+ checksum,
+ size_bytes: 0,
+ compressed: config.enable_compression,
+ previous_checkpoint,
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let metadata = CheckpointMetadata { | |
sequence: latest_sequence, | |
timestamp: SystemTime::now(), | |
node_id: "auto".to_string(), | |
cluster_id: cluster_id.to_string(), | |
checksum: "auto".to_string(), | |
size_bytes: 0, | |
compressed: config.enable_compression, | |
previous_checkpoint, | |
}; | |
let timestamp = SystemTime::now(); | |
let checksum = Self::calculate_checksum( | |
cluster_id, | |
latest_sequence, | |
"auto", | |
timestamp, | |
); | |
let metadata = CheckpointMetadata { | |
sequence: latest_sequence, | |
timestamp, | |
node_id: "auto".to_string(), | |
cluster_id: cluster_id.to_string(), | |
checksum, | |
size_bytes: 0, | |
compressed: config.enable_compression, | |
previous_checkpoint, | |
}; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/checkpoint_manager.rs around lines 233 to 242, the
auto-checkpoint metadata currently sets checksum="auto", size_bytes=0 and marks
compressed based on config without actually compressing the payload; update the
auto-checkpoint path to mirror the manual path by serializing the checkpoint
payload, computing a deterministic checksum (e.g. SHA-256) over the final stored
bytes, optionally compressing the serialized payload when
config.enable_compression is true, setting compressed to true only when
compression is applied, and setting size_bytes to the length of the bytes
written; then populate CheckpointMetadata with the computed checksum, actual
size_bytes and correct compressed flag (and write/store the resulting bytes
wherever the manual path does).
// Save checkpoint | ||
let checkpoint_key = format!("{}/{}_checkpoint.json", config.base_path, checkpoint_id); | ||
let data = serde_json::to_vec(&checkpoint_info) | ||
.map_err(|e| Error::Unknown(format!("Failed to serialize checkpoint info: {}", e)))?; | ||
|
||
object_storage.put_object(&checkpoint_key, data).await?; | ||
|
||
// Update cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto-checkpoints ignore compression toggle.
Compress the JSON when config.enable_compression is true.
Apply this diff:
- object_storage.put_object(&checkpoint_key, data).await?;
+ let data = if config.enable_compression {
+ Self::compress_data(&data)?
+ } else {
+ data
+ };
+
+ object_storage.put_object(&checkpoint_key, data).await?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Save checkpoint | |
let checkpoint_key = format!("{}/{}_checkpoint.json", config.base_path, checkpoint_id); | |
let data = serde_json::to_vec(&checkpoint_info) | |
.map_err(|e| Error::Unknown(format!("Failed to serialize checkpoint info: {}", e)))?; | |
object_storage.put_object(&checkpoint_key, data).await?; | |
// Update cache | |
// Save checkpoint | |
let checkpoint_key = format!("{}/{}_checkpoint.json", config.base_path, checkpoint_id); | |
let data = serde_json::to_vec(&checkpoint_info) | |
.map_err(|e| Error::Unknown(format!("Failed to serialize checkpoint info: {}", e)))?; | |
// Apply compression if enabled | |
let data = if config.enable_compression { | |
Self::compress_data(&data)? | |
} else { | |
data | |
}; | |
object_storage.put_object(&checkpoint_key, data).await?; | |
// Update cache |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/checkpoint_manager.rs around lines 252 to 259, the
code always serializes checkpoint_info to raw JSON bytes and writes them to
object_storage, ignoring config.enable_compression; update the logic so that
when config.enable_compression is true you compress the serialized JSON (e.g.,
gzip) before calling object_storage.put_object, and when false you keep raw
JSON; also adjust the object key or metadata if needed (for example append ".gz"
to checkpoint_key or set a compression flag) and ensure errors from compression
are mapped to Error similarly to serialization errors.
// Create metadata | ||
let metadata = CheckpointMetadata { | ||
sequence, | ||
timestamp: SystemTime::now(), | ||
node_id: node_id.clone(), | ||
cluster_id: self.cluster_id.clone(), | ||
checksum: self.calculate_checksum(sequence, &node_id), | ||
size_bytes: 0, | ||
compressed: self.config.enable_compression, | ||
previous_checkpoint, | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checksum must be deterministic; use metadata.timestamp (not now) and cluster_id.
Current calculate_checksum() uses SystemTime::now internally, making restore checks always fail. Compute with immutable fields.
Apply this diff:
- // Create metadata
- let metadata = CheckpointMetadata {
- sequence,
- timestamp: SystemTime::now(),
- node_id: node_id.clone(),
- cluster_id: self.cluster_id.clone(),
- checksum: self.calculate_checksum(sequence, &node_id),
+ // Create metadata (deterministic checksum)
+ let timestamp = SystemTime::now();
+ let checksum =
+ Self::calculate_checksum(&self.cluster_id, sequence, &node_id, timestamp);
+ let metadata = CheckpointMetadata {
+ sequence,
+ timestamp,
+ node_id: node_id.clone(),
+ cluster_id: self.cluster_id.clone(),
+ checksum,
size_bytes: 0,
compressed: self.config.enable_compression,
previous_checkpoint,
};
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/checkpoint_manager.rs around lines 306 to 317, the
checksum is computed non-deterministically because calculate_checksum() uses
SystemTime::now() and doesn't include cluster_id; change the flow to first
capture a single immutable timestamp value, build the metadata with that
timestamp and cluster_id, and compute the checksum from only immutable fields
(sequence, node_id, metadata.timestamp, and self.cluster_id) — update
calculate_checksum signature or add a new deterministic helper that accepts
timestamp and cluster_id so the metadata.checksum is computed from those values
and then assigned to metadata before persisting.
// Find checkpoints after the timestamp | ||
let checkpoints_after = self | ||
.checkpoint_manager | ||
.get_checkpoints_after_sequence( | ||
timestamp.duration_since(UNIX_EPOCH).unwrap().as_secs() as u64 | ||
) | ||
.await?; | ||
|
||
if let Some(checkpoint) = checkpoints_after.first() { | ||
// Use the earliest checkpoint after the timestamp | ||
self.recover_from_checkpoint(recovery_info, &checkpoint.checkpoint_id) | ||
.await | ||
} else { | ||
// No checkpoints after timestamp, recover all data | ||
self.recover_all_available_data(recovery_info).await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix timestamp-based recovery lookup
get_checkpoints_after_sequence
expects a WAL sequence number, but we’re feeding it seconds-since-epoch. The call therefore falls back to “recover all” instead of the intended timestamp-based recovery. Use (or add) a timestamp-aware accessor, or translate the timestamp to an actual sequence first.
- let checkpoints_after = self
- .checkpoint_manager
- .get_checkpoints_after_sequence(
- timestamp.duration_since(UNIX_EPOCH).unwrap().as_secs() as u64
- )
- .await?;
+ let checkpoints_after = self
+ .checkpoint_manager
+ .get_checkpoints_after_timestamp(timestamp)
+ .await?;
If such an API doesn’t exist yet, implement it in CheckpointManager
(e.g., load checkpoints and filter by created_at
).
Committable suggestion skipped: line range outside the PR's diff.
let record = task.to_record(); | ||
if let Err(e) = self.wal.append(&record).await { | ||
error!("Failed to persist ack to WAL: {}", e); | ||
return Err(crate::Error::Unknown(format!( | ||
"WAL persistence failed: {}", | ||
e | ||
))); | ||
} | ||
self.metrics.persisted_acks.fetch_add(1, Ordering::Relaxed); | ||
|
||
match self.ack_sender.send_async(task).await { | ||
Ok(_) => Ok(()), | ||
Err(e) => { | ||
self.metrics.pending_acks.fetch_sub(1, Ordering::Relaxed); | ||
self.metrics.failed_acks.fetch_add(1, Ordering::Relaxed); | ||
Err(crate::Error::Unknown(format!( | ||
"Failed to send ack task: {}", | ||
e | ||
))) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WAL entries never get pruned after success
We append every ack to the WAL but nowhere (outside startup recovery) do we remove committed entries. The file grows indefinitely and, after a crash, replays all historical acks, re-executing side effects. Introduce a prune/compact step—after batching succeeds, rewrite the WAL with only still-pending tasks (or append to a “tail” file and rotate once flushed).
Without that fix, durability is wrong and the WAL quickly hits MAX_WAL_SIZE
.
for record in records { | ||
if let Some(factory) = registry.get(&record.ack_type) { | ||
let ack = factory(&record.payload); | ||
let task = AckTask::from_record(record, ack); | ||
|
||
// Re-add to processing queue | ||
if let Err(e) = self.ack_sender.send_async(task).await { | ||
error!("Failed to queue recovered ack: {}", e); | ||
} else { | ||
recovered_count += 1; | ||
self.metrics.recovered_acks.fetch_add(1, Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recovery deadlocks when WAL backlog exceeds queue capacity
recover_from_wal
pushes recovered tasks via send_async
before the worker loop starts pulling. Once the channel reaches MAX_PENDING_ACKS
(5 000), send_async
awaits forever and startup stalls. Either push straight into pending_tasks
or start draining before requeueing. For example:
- for record in records {
- if let Some(factory) = registry.get(&record.ack_type) {
- let ack = factory(&record.payload);
- let task = AckTask::from_record(record, ack);
-
- // Re-add to processing queue
- if let Err(e) = self.ack_sender.send_async(task).await {
- error!("Failed to queue recovered ack: {}", e);
- } else {
- recovered_count += 1;
- self.metrics.recovered_acks.fetch_add(1, Ordering::Relaxed);
- }
- }
- }
+ let mut pending = Vec::new();
+ for record in records {
+ if let Some(factory) = registry.get(&record.ack_type) {
+ pending.push(AckTask::from_record(record, factory(&record.payload)));
+ }
+ }
+ drop(registry);
+ self.process_batch(&mut pending).await;
or drain the receiver first before queueing. Without this, recovery hangs whenever the WAL backlog exceeds the queue size.
Committable suggestion skipped: line range outside the PR's diff.
for record in records { | ||
if let Some(factory) = registry.get(&record.ack_type) { | ||
let ack = factory(&record.payload); | ||
let task = AckTask::from_record(record, ack); | ||
|
||
// Re-add to processing queue | ||
if let Err(e) = self.ack_sender.send_async(task).await { | ||
error!("Failed to queue recovered ack: {}", e); | ||
} else { | ||
recovered_count += 1; | ||
self.metrics.recovered_acks.fetch_add(1, Ordering::Relaxed); | ||
} | ||
} | ||
} | ||
|
||
if recovered_count > 0 { | ||
info!("Recovered {} unprocessed acks from WAL", recovered_count); | ||
} | ||
|
||
// Clear WAL after successful recovery | ||
self.wal | ||
.clear() | ||
.await | ||
.map_err(|e| crate::Error::Unknown(format!("Failed to clear WAL: {}", e)))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain the WAL atomically to avoid truncating new appends
recover()
releases the file lock before clear()
reacquires it. A concurrent ack()
can append in that gap, and the subsequent clear()
wipes the freshly persisted record—durability violation. Add a drain()
helper that reads and truncates the file while holding the same lock:
pub async fn recover(&self) -> io::Result<Vec<AckRecord>> {
- let mut file = self.file.lock().await;
- file.seek(SeekFrom::Start(0))?;
- ...
- Ok(records)
+ self.drain().await.map(|(records, _)| records)
}
- pub async fn clear(&self) -> io::Result<()> {
- let mut file = self.file.lock().await;
- file.set_len(0)?;
- file.seek(SeekFrom::Start(0))?;
- self.current_size.store(0, Ordering::Relaxed);
- Ok(())
+ pub async fn drain(&self) -> io::Result<(Vec<AckRecord>, u64)> {
+ let mut file = self.file.lock().await;
+ file.seek(SeekFrom::Start(0))?;
+ let mut records = Vec::new();
+ let mut buffer = String::new();
+ file.read_to_string(&mut buffer)?;
+ for line in buffer.lines() {
+ if let Ok(record) = serde_json::from_str::<AckRecord>(line) {
+ records.push(record);
+ }
+ }
+ file.set_len(0)?;
+ file.seek(SeekFrom::Start(0))?;
+ self.current_size.store(0, Ordering::Relaxed);
+ Ok((records, 0))
}
and in recover_from_wal
call let (records, _) = self.wal.drain().await?;
. This keeps the file empty before other writers run.
Committable suggestion skipped: line range outside the PR's diff.
// Create distributed acknowledgment processor | ||
let tracker = tokio_util::task::TaskTracker::new(); | ||
let cancellation_token = CancellationToken::new(); | ||
|
||
let distributed_processor = tokio::runtime::Handle::current() | ||
.block_on(async { | ||
DistributedAckProcessor::new( | ||
&tracker, | ||
cancellation_token.clone(), | ||
distributed_ack_config, | ||
) | ||
.await | ||
}) | ||
.map_err(|e| Error::Config(format!("Failed to create distributed ack processor: {}", e)))?; | ||
|
||
// Create a custom stream that integrates distributed acknowledgment | ||
let mut stream = Stream::new( | ||
input, | ||
pipeline, | ||
output, | ||
error_output, | ||
buffer, | ||
resource, | ||
thread_num, | ||
); | ||
|
||
// Store the distributed processor in the stream | ||
// Note: This would require modifying the Stream struct to support this | ||
// For now, we'll return a regular stream and handle distributed ack differently | ||
|
||
// TODO: Implement proper distributed acknowledgment integration | ||
// For now, return the basic stream | ||
Ok(stream) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wiring gap: distributed ack stream drops the processor immediately.
create_distributed_ack_stream
constructs a DistributedAckProcessor
but never stores or attaches it to the returned Stream
; the processor is dropped at the end of the function, so no distributed acknowledgement ever happens. Before this lands we need to extend Stream
(or parallel structures) to hold and drive that processor, otherwise callers get a plain stream despite providing DistributedAckConfig
. Please wire the processor into the stream lifecycle (e.g., by storing it in Stream
or wrapping the stream with a distributed-ack-aware type).
# Conflicts: # Cargo.lock
Summary by CodeRabbit
New Features
Configuration
Examples
Tests
Chores