Skip to content

Conversation

chenquan
Copy link
Collaborator

@chenquan chenquan commented Jun 11, 2025

Summary by CodeRabbit

  • New Features
    • Added support for RabbitMQ (AMQP 0.9/0.9.1) as both an input and output option, allowing integration with RabbitMQ queues and exchanges.
    • Introduced configurable connection parameters, queue/exchange options, and message acknowledgment modes for RabbitMQ input.
    • Enabled dynamic routing key evaluation and exchange declaration for RabbitMQ output.
  • Chores
    • Updated dependencies to include a RabbitMQ client library.

Copy link
Contributor

coderabbitai bot commented Jun 11, 2025

Walkthrough

A RabbitMQ (AMQP 0.9) input and output integration was added to the Arkflow plugin. This includes new modules implementing configurable, asynchronous RabbitMQ consumers and publishers, registration logic, and dependency updates. The initialization routines were updated to register these new components, and the lapin crate was added as a dependency.

Changes

File(s) Change Summary
crates/arkflow-plugin/Cargo.toml Added the lapin crate v2.5 as a dependency for RabbitMQ support.
crates/arkflow-plugin/src/input/amqp09.rs Introduced RabbitMQ AMQP 0.9 input component with async consumption, config, and registration.
crates/arkflow-plugin/src/input/mod.rs Declared and initialized the new amqp09 input module.
crates/arkflow-plugin/src/output/amqp09.rs Added RabbitMQ AMQP 0.9 output component with async publishing, config, and registration.
crates/arkflow-plugin/src/output/mod.rs Declared and initialized the new amqp09 output module; updated module and init order.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant ArkflowInput
    participant Amqp09Input
    participant RabbitMQ

    User->>ArkflowInput: Initialize inputs
    ArkflowInput->>Amqp09Input: Create and connect
    Amqp09Input->>RabbitMQ: Connect, declare queue/exchange, bind, consume
    RabbitMQ-->>Amqp09Input: Deliver message
    Amqp09Input-->>ArkflowInput: Provide message batch with ack handler

    User->>ArkflowInput: Acknowledge message
    ArkflowInput->>Amqp09Input: Call ack handler
    Amqp09Input->>RabbitMQ: Send ack (if manual ack)
Loading
sequenceDiagram
    participant User
    participant ArkflowOutput
    participant Amqp09Output
    participant RabbitMQ

    User->>ArkflowOutput: Initialize outputs
    ArkflowOutput->>Amqp09Output: Create and connect
    Amqp09Output->>RabbitMQ: Connect, declare exchange (optional)
    User->>ArkflowOutput: Write message batch
    ArkflowOutput->>Amqp09Output: Write messages
    Amqp09Output->>RabbitMQ: Publish messages (with routing key)
    RabbitMQ-->>Amqp09Output: Confirm publish (if confirms enabled)
Loading

Poem

🐇
New tunnels dug, connections made,
RabbitMQ joins the Arkflow parade!
Messages hop from queue to queue,
With AMQP, through and through.
Input and output, both in sync,
Now we publish, now we drink—
To every message, a rabbit’s wink!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 2

🧹 Nitpick comments (1)
crates/arkflow-plugin/src/input/amqp09.rs (1)

86-86: Consider making the internal channel size configurable.

The flume channel is hardcoded to 1000 messages, which might not be suitable for all use cases. High-throughput scenarios might benefit from a larger buffer.

Add a configuration option:

 pub struct Amqp09InputConfig {
     /// Auto-acknowledge messages
     pub auto_ack: Option<bool>,
     /// Prefetch count
     pub prefetch_count: Option<u16>,
+    /// Internal channel buffer size (default: 1000)
+    pub channel_buffer_size: Option<usize>,
 }

Then use it:

-let (sender, receiver) = flume::bounded::<Amqp09Msg>(1000);
+let buffer_size = config.channel_buffer_size.unwrap_or(1000);
+let (sender, receiver) = flume::bounded::<Amqp09Msg>(buffer_size);
🛑 Comments failed to post (2)
crates/arkflow-plugin/src/output/amqp09.rs (1)

50-62: ⚠️ Potential issue

Implement or remove unused configuration fields.

The following config fields are defined but never used in the implementation:

  • persistent (line 51)
  • priority (line 55)
  • expiration (line 57)
  • content_type (line 59)
  • content_encoding (line 61)

These fields should be applied when constructing BasicProperties in the write method.

Apply this diff to use the configured properties:

-                    BasicProperties::default(),
+                    {
+                        let mut props = BasicProperties::default();
+                        if let Some(persistent) = self.config.persistent {
+                            props = props.with_delivery_mode(if persistent { 2 } else { 1 });
+                        }
+                        if let Some(priority) = self.config.priority {
+                            props = props.with_priority(priority);
+                        }
+                        if let Some(ref expiration) = self.config.expiration {
+                            props = props.with_expiration(expiration.clone());
+                        }
+                        if let Some(ref content_type) = self.config.content_type {
+                            props = props.with_content_type(content_type.clone().into());
+                        }
+                        if let Some(ref content_encoding) = self.config.content_encoding {
+                            props = props.with_content_encoding(content_encoding.clone().into());
+                        }
+                        props
+                    },

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 50 to 62, the fields
persistent, priority, expiration, content_type, and content_encoding are defined
but not used when constructing BasicProperties in the write method. To fix this,
update the write method to apply these configuration fields to the
BasicProperties builder, setting each property accordingly if the option is
Some, so that the message properties reflect the configured values.
crates/arkflow-plugin/src/input/amqp09.rs (1)

216-256: 🛠️ Refactor suggestion

Add error recovery for the consumer task.

The spawned task could panic and silently stop consuming messages. Consider adding panic recovery or at least logging when the task exits unexpectedly.

Store the task handle and monitor it:

+use tokio::task::JoinHandle;

 pub struct Amqp09Input {
     // ... existing fields ...
+    consumer_task: Arc<Mutex<Option<JoinHandle<()>>>>,
 }

Then in connect:

-tokio::spawn(async move {
+let handle = tokio::spawn(async move {
     // existing task code
 });
+{
+    let mut task_guard = self.consumer_task.lock().await;
+    *task_guard = Some(handle);
+}

This allows you to check task status in read() or implement recovery logic.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

        // Spawn the consumer loop and keep its JoinHandle for monitoring
        let handle = tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = cancellation_token.cancelled() => {
                        break;
                    }
                    result = async {
                        let mut consumer_guard = consumer_arc.lock().await;
                        if let Some(consumer) = &mut *consumer_guard {
                            consumer.next().await
                        } else {
                            None
                        }
                    } => {
                        match result {
                            Some(delivery_result) => {
                                match delivery_result {
                                    Ok(delivery) => {
                                        if let Err(_) = sender_clone.send_async(Amqp09Msg::Delivery(delivery)).await {
                                            break;
                                        }
                                    }
                                    Err(e) => {
                                        error!("RabbitMQ delivery error: {}", e);
                                        if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::Disconnection)).await {
                                            break;
                                        }
                                    }
                                }
                            }
                            None => {
                                // Consumer ended
                                if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::EOF)).await {
                                    break;
                                }
                            }
                        }
                    }
                }
            }
        });

        // Store the handle so we can .await it or check for panics later
        {
            let mut task_guard = self.consumer_task.lock().await;
            *task_guard = Some(handle);
        }
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/input/amqp09.rs around lines 216 to 256, the
spawned consumer task lacks error recovery and may silently stop on panic.
Modify the code to store the JoinHandle returned by tokio::spawn so you can
monitor the task status. Add panic recovery by wrapping the async block with a
catch_unwind or use a task supervision pattern to log unexpected exits. Then, in
the connect function, keep the JoinHandle and check its status in read() or
implement logic to restart the task if it has stopped.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
crates/arkflow-plugin/src/output/amqp09.rs (2)

57-57: Consider using a numeric type for the expiration field.

The expiration field is currently a String representing milliseconds. Consider using Option<u64> or Option<std::time::Duration> for better type safety and to avoid runtime parsing errors.

-    /// Message expiration (in milliseconds)
-    expiration: Option<String>,
+    /// Message expiration
+    expiration: Option<std::time::Duration>,

112-118: Add validation for unrecognized exchange types.

The code silently defaults to Direct for unrecognized exchange types, which could hide configuration errors.

 let exchange_type = match self.config.exchange_type.as_deref() {
     Some("direct") => ExchangeKind::Direct,
     Some("fanout") => ExchangeKind::Fanout,
     Some("topic") => ExchangeKind::Topic,
     Some("headers") => ExchangeKind::Headers,
+    Some(unknown) => {
+        return Err(Error::Config(format!("Unknown exchange type: {}", unknown)));
+    }
-    _ => ExchangeKind::Direct,
+    None => ExchangeKind::Direct,
 };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53f91c7 and d4e774c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • crates/arkflow-plugin/Cargo.toml (1 hunks)
  • crates/arkflow-plugin/src/input/amqp09.rs (1 hunks)
  • crates/arkflow-plugin/src/input/mod.rs (2 hunks)
  • crates/arkflow-plugin/src/output/amqp09.rs (1 hunks)
  • crates/arkflow-plugin/src/output/mod.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • crates/arkflow-plugin/src/input/amqp09.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • crates/arkflow-plugin/Cargo.toml
  • crates/arkflow-plugin/src/output/mod.rs
  • crates/arkflow-plugin/src/input/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (3)
crates/arkflow-plugin/src/output/amqp09.rs (3)

74-84: LGTM!

The constructor correctly initializes all fields with appropriate default values.


214-232: LGTM!

The builder implementation correctly handles configuration parsing and error cases.


192-211: LGTM!

The close method properly handles resource cleanup in the correct order (channel before connection) and appropriately resets the connection state.

Comment on lines +175 to +187
channel
.basic_publish(
&self.config.exchange,
routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?
.await
.map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Message properties from configuration are not applied.

The code uses BasicProperties::default() instead of applying the configured message properties (persistent, priority, expiration, content_type, content_encoding).

+           let mut props = BasicProperties::default();
+           if let Some(persistent) = self.config.persistent {
+               props = props.with_delivery_mode(if persistent { 2 } else { 1 });
+           }
+           if let Some(priority) = self.config.priority {
+               props = props.with_priority(priority);
+           }
+           if let Some(ref expiration) = self.config.expiration {
+               props = props.with_expiration(expiration.clone().into());
+           }
+           if let Some(ref content_type) = self.config.content_type {
+               props = props.with_content_type(content_type.clone().into());
+           }
+           if let Some(ref content_encoding) = self.config.content_encoding {
+               props = props.with_content_encoding(content_encoding.clone().into());
+           }
+
            channel
                .basic_publish(
                    &self.config.exchange,
                    routing_key,
                    BasicPublishOptions::default(),
                    &payload,
-                   BasicProperties::default(),
+                   props,
                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
channel
.basic_publish(
&self.config.exchange,
routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?
.await
.map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
}
// build up message properties from config
let mut props = BasicProperties::default();
if let Some(persistent) = self.config.persistent {
props = props.with_delivery_mode(if persistent { 2 } else { 1 });
}
if let Some(priority) = self.config.priority {
props = props.with_priority(priority);
}
if let Some(ref expiration) = self.config.expiration {
props = props.with_expiration(expiration.clone().into());
}
if let Some(ref content_type) = self.config.content_type {
props = props.with_content_type(content_type.clone().into());
}
if let Some(ref content_encoding) = self.config.content_encoding {
props = props.with_content_encoding(content_encoding.clone().into());
}
channel
.basic_publish(
&self.config.exchange,
routing_key,
BasicPublishOptions::default(),
&payload,
props,
)
.await
.map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?
.await
.map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 175 to 187, the
message properties are set to BasicProperties::default(), ignoring the
configured properties like persistent, priority, expiration, content_type, and
content_encoding. Update the code to build BasicProperties using the
configuration values instead of default(), ensuring all relevant properties from
self.config are applied when publishing the message.

🛠️ Refactor suggestion

Handle publisher confirms conditionally.

The code always awaits the confirmation future, but this should only happen when publisher confirms are enabled.

-           channel
+           let publish_future = channel
                .basic_publish(
                    &self.config.exchange,
                    routing_key,
                    BasicPublishOptions::default(),
                    &payload,
                    BasicProperties::default(),
                )
-               .await
-               .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?
-               .await
-               .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
+               .await
+               .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?;
+
+           if self.config.confirm.unwrap_or(false) {
+               publish_future
+                   .await
+                   .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
+           }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
channel
.basic_publish(
&self.config.exchange,
routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?
.await
.map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
}
// Publish the message, always awaiting the publish result…
let publish_future = channel
.basic_publish(
&self.config.exchange,
routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?;
// …and only await confirmation if confirms are enabled
if self.config.confirm.unwrap_or(false) {
publish_future
.await
.map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;
}
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 175 to 187, the code
always awaits the publisher confirmation future, but this should only be done if
publisher confirms are enabled in the configuration. Modify the code to check if
publisher confirms are enabled before awaiting the confirmation future, and skip
awaiting it otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant