Skip to content

Conversation

chenquan
Copy link
Collaborator

@chenquan chenquan commented Jul 6, 2025

Summary by CodeRabbit

  • New Features

    • Added support for writing data to Delta Lake tables, including integration with object storage services such as S3, Azure, and GCS.
    • Introduced configuration options for table URI, write modes (append, overwrite, error, ignore), table creation, schema specification, and partition columns.
    • Enabled Delta Lake as an output option in the platform.
  • Tests

    • Added unit tests for configuration parsing and write mode validation.

@chenquan chenquan marked this pull request as draft July 6, 2025 02:56
Copy link
Contributor

coderabbitai bot commented Jul 6, 2025

"""

Walkthrough

A new Delta Lake output component has been added to the arkflow-plugin crate, enabling writing to Delta Lake tables in object storage. This includes new configuration structures, output logic, builder integration, dependency updates, and initialization within the output module. Unit tests and configuration validation for the new output are also included.

Changes

File(s) Change Summary
crates/arkflow-plugin/Cargo.toml Added deltalake crate as a dependency with features for "s3", "azure", "gcs", "datafusion".
crates/arkflow-plugin/src/output/deltalake.rs Introduced Delta Lake output: config struct, output logic, builder, registration, and tests.
crates/arkflow-plugin/src/output/mod.rs Added deltalake module and registered its initialization in the output module's init().

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Arkflow
    participant DeltaLakeOutput
    participant DeltaTable

    User->>Arkflow: Send message batch
    Arkflow->>DeltaLakeOutput: write(msg)
    DeltaLakeOutput->>DeltaLakeOutput: get_or_create_table()
    alt Table exists
        DeltaLakeOutput->>DeltaTable: Load existing table
    else Create if not exists
        DeltaLakeOutput->>DeltaLakeOutput: create_table()
        DeltaLakeOutput->>DeltaTable: Create new table
    end
    DeltaLakeOutput->>DeltaTable: Write RecordBatch (append/overwrite)
    DeltaLakeOutput->>Arkflow: Return write result
Loading

Suggested labels

enhancement

Poem

In the meadow of code, a new path we make,
With Delta Lake streams, our data will wake.
Tables on clouds, in S3 or Azure,
The rabbit hops forward, the future is sure.
With schemas and batches, we leap and we write—
Hooray for new outputs, our hop is so light!
🐇✨
"""

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Clippy (1.86.0)
Updating crates.io index

warning: failed to write cache, path: /usr/local/registry/index/index.crates.io-1949cf8c6b5b557f/.cache/ar/ro/arrow-json, error: Permission denied (os error 13)
Downloading crates ...
Downloaded adler32 v1.2.0
error: failed to create directory /usr/local/registry/cache/index.crates.io-1949cf8c6b5b557f

Caused by:
Permission denied (os error 13)

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
crates/arkflow-plugin/src/output/mod.rs (1)

21-29: Consider maintaining consistent ordering between module declarations and initialization.

The module declarations don't match the initialization order in the init() function. For better readability and maintainability, consider reordering the module declarations to match the initialization sequence.

-pub mod deltalake;
 pub mod drop;
 pub mod http;
 pub mod kafka;
 pub mod mqtt;
 pub mod nats;
 pub mod redis;
-pub mod sql;
 pub mod stdout;
+pub mod sql;
+pub mod deltalake;

Or alternatively, reorder the initialization calls to match the current declaration order.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c4ce15 and a41fc82.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • crates/arkflow-plugin/Cargo.toml (1 hunks)
  • crates/arkflow-plugin/src/output/deltalake.rs (1 hunks)
  • crates/arkflow-plugin/src/output/mod.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/output/mod.rs (4)
crates/arkflow-plugin/src/output/deltalake.rs (1)
  • init (245-247)
crates/arkflow-plugin/src/output/sql.rs (1)
  • init (428-430)
crates/arkflow-plugin/src/output/nats.rs (1)
  • init (236-238)
crates/arkflow-plugin/src/output/redis.rs (1)
  • init (192-194)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: build
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
🔇 Additional comments (2)
crates/arkflow-plugin/Cargo.toml (1)

86-88: LGTM!

The Delta Lake dependency is correctly added with appropriate features for S3, Azure, GCS, and DataFusion support.

crates/arkflow-plugin/src/output/deltalake.rs (1)

109-136: LGTM!

The table creation logic properly handles partition columns and schema configuration. The implementation follows Delta Lake best practices.

Comment on lines 249 to 301
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn test_config_parsing() {
let config_json = json!({
"table_uri": "s3://my-bucket/my-table",
"write_mode": "append",
"storage_options": {
"AWS_ACCESS_KEY_ID": "test",
"AWS_SECRET_ACCESS_KEY": "test"
},
"create_if_not_exists": true,
"partition_columns": ["year", "month"]
});

let config: DeltaLakeOutputConfig = serde_json::from_value(config_json).unwrap();
assert_eq!(config.table_uri, "s3://my-bucket/my-table");
assert_eq!(config.write_mode, "append");
assert_eq!(config.storage_options.len(), 2);
assert!(config.create_if_not_exists);
assert_eq!(config.partition_columns, vec!["year", "month"]);
}

#[test]
fn test_write_mode_validation() {
let config = DeltaLakeOutputConfig {
table_uri: "test".to_string(),
write_mode: "append".to_string(),
storage_options: HashMap::new(),
create_if_not_exists: true,
schema: None,
partition_columns: vec![],
};

let output = DeltaLakeOutput::new(config).unwrap();
assert!(output.validate_write_mode().is_ok());

let invalid_config = DeltaLakeOutputConfig {
table_uri: "test".to_string(),
write_mode: "invalid".to_string(),
storage_options: HashMap::new(),
create_if_not_exists: true,
schema: None,
partition_columns: vec![],
};

let invalid_output = DeltaLakeOutput::new(invalid_config).unwrap();
assert!(invalid_output.validate_write_mode().is_err());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Expand test coverage for core functionality.

The current tests only cover configuration parsing and validation. Consider adding tests for:

  • Table creation with schema
  • Write operations (mocked)
  • Error handling scenarios
  • Storage options configuration

Would you like me to help generate additional test cases for better coverage?

🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 249 to 301, the
existing tests only cover config parsing and write mode validation. To improve
coverage, add tests for table creation with schema by mocking schema inputs,
simulate write operations using mocks or test doubles to verify data handling,
include error handling tests by forcing failure scenarios, and add tests to
verify correct handling of storage options configuration. Implement these tests
within the #[cfg(test)] mod to ensure core functionality is thoroughly
validated.

Comment on lines 138 to 147
/// Validate write mode string
fn validate_write_mode(&self) -> Result<(), Error> {
match self.config.write_mode.to_lowercase().as_str() {
"append" | "overwrite" | "error" | "ignore" => Ok(()),
_ => Err(Error::Config(format!(
"Invalid write mode: {}. Supported modes: append, overwrite, error, ignore",
self.config.write_mode
))),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Validate write mode once during construction and fix unimplemented modes.

The current implementation has two issues:

  1. validate_write_mode() is called on every write operation, which is inefficient
  2. The validation accepts "error" and "ignore" modes, but these aren't implemented

Move validation to the constructor and either implement the missing modes or remove them from validation:

 impl DeltaLakeOutput {
     /// Create a new Delta Lake output component
     pub fn new(config: DeltaLakeOutputConfig) -> Result<Self, Error> {
+        // Validate write mode once during construction
+        match config.write_mode.to_lowercase().as_str() {
+            "append" | "overwrite" => {},
+            _ => return Err(Error::Config(format!(
+                "Invalid write mode: {}. Supported modes: append, overwrite",
+                config.write_mode
+            ))),
+        }
         Ok(Self {
             config,
             table: Arc::new(Mutex::new(None)),
         })
     }

Then remove the validation call from the write() method:

     async fn write(&self, msg: MessageBatch) -> Result<(), Error> {
         let table = self.get_or_create_table().await?;
-        self.validate_write_mode()?;

Also applies to: 165-193

🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 138 to 147 and 165
to 193, the validate_write_mode function is inefficiently called on every write
operation and incorrectly accepts "error" and "ignore" modes that are not
implemented. Move the validation logic to the constructor to ensure the write
mode is validated once during object creation. Remove "error" and "ignore" from
the list of accepted modes unless you implement their functionality. Finally,
remove the validate_write_mode call from the write() method to avoid redundant
checks.

Comment on lines 79 to 84
async fn get_or_create_table(&self) -> Result<DeltaTable, Error> {
let mut table_guard = self.table.lock().await;

if let Some(table) = table_guard.as_ref() {
return Ok(table.clone());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider the performance impact of cloning DeltaTable.

The current implementation clones the DeltaTable on every access. Depending on the size of the table metadata, this could impact performance.

Please verify if DeltaTable implements efficient cloning (e.g., using Arc internally) or if there's a better pattern for managing table references in the Delta Lake library.


🌐 Web query:

Does the Rust deltalake crate's DeltaTable type implement efficient cloning or reference counting?

💡 Result:

The DeltaTable type in the Rust deltalake crate (now primarily in the deltalake-core crate) is an in-memory representation of a Delta Lake table[4]. In Rust, the cost and semantics of cloning a type like DeltaTable depend on its implementation of the Clone trait or the use of reference counting such as Arc<T> or Rc<T> for shared ownership.

Cloning and Reference Counting in DeltaTable

  • Efficient Cloning:
    There is no documentation or evidence in the available search results indicating that DeltaTable implements an efficient clone (i.e., a cheap, shallow clone or the use of an internal Arc for shared ownership that would make clone() inexpensive). Typically, in Rust, unless a type is specifically designed around interior reference counting, cloning a complex struct like a table will copy all its owned data, which may be costly if the object contains large collections or buffers[4][1].

  • Reference Counting:
    There is also no indication from the documentation or examples that DeltaTable is wrapped in a reference-counted smart pointer by default (such as Rc<DeltaTable> or Arc<DeltaTable>). Users who want to share a table between threads or components efficiently would need to explicitly wrap their DeltaTable in an Arc or Rc, following normal Rust ownership patterns[8]. This is standard for Rust libraries that do not want to impose overhead unless necessary.

The deltalake crate provides an in-memory representation of a Delta Table, and like most Rust types, will do a deep clone unless its internals are reference-counted or otherwise optimized. There is no public API or documentation indicating use of Arc or similar mechanisms internally[4][1].

What to Do If You Need Efficient Sharing

  • If you need to share a DeltaTable instance efficiently:
    • Wrap it in Arc<DeltaTable> for thread-safe shared ownership.
    • Use Rc<DeltaTable> for single-threaded cases[8].

This gives you cheap clones of the pointer rather than of the table data itself. The Rust standard library's Arc and Rc types provide reference counting, ensuring the underlying data is only dropped when all references die[8].

Summary Table

Feature Supported by DeltaTable Notes
Efficient Internal Cloning No No evidence of shallow cloning or interior Arc/Rc in the type
Implements Clone Yes* Likely, but will be a deep clone unless explicitly optimized
Uses Reference Counting Internally No Not in public docs; wrap in Arc/Rc yourself if needed

*Assumed for most Rust structs, but efficiency depends on the type's fields and implementation.

Conclusion:
DeltaTable does not implement efficient cloning or automatic reference counting internally; cloning it is likely a deep (potentially costly) operation[4][1]. For efficient sharing, wrap the table in a smart pointer like Arc or Rc as appropriate for your use case[8].

Citations:


Use Arc to avoid expensive deep clones

Cloning DeltaTable invokes a full, potentially costly deep clone. Since the deltalake crate does not provide internal reference counting, wrap your table in a smart pointer for cheap, shallow clones.

File: crates/arkflow-plugin/src/output/deltalake.rs
Lines 79–84: change the mutex’d storage from Option<DeltaTable> to Option<Arc<DeltaTable>> and return Arc<DeltaTable> instead of cloning the table.
Lines 200–202: when creating or updating the table, wrap it in Arc::new(...) so subsequent accesses can cheaply clone the Arc.

Example diff:

-   self.table: Arc<Mutex<Option<DeltaTable>>>
+   self.table: Arc<Mutex<Option<Arc<DeltaTable>>>>,
...
-   async fn get_or_create_table(&self) -> Result<DeltaTable, Error> {
+   async fn get_or_create_table(&self) -> Result<Arc<DeltaTable>, Error> {
        let mut guard = self.table.lock().await;
-       if let Some(table) = guard.as_ref() {
-           return Ok(table.clone());
+       if let Some(tbl_arc) = guard.as_ref() {
+           return Ok(Arc::clone(tbl_arc));
        }
...
-       let table = DeltaTable::new(...).await?;
-       *guard = Some(table.clone());
-       Ok(table)
+       let table = Arc::new(DeltaTable::new(...).await?);
+       *guard = Some(Arc::clone(&table));
+       Ok(table)

This change ensures each call to get_or_create_table performs a cheap clone of the Arc instead of a full deep copy of the table.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn get_or_create_table(&self) -> Result<DeltaTable, Error> {
let mut table_guard = self.table.lock().await;
if let Some(table) = table_guard.as_ref() {
return Ok(table.clone());
}
async fn get_or_create_table(&self) -> Result<Arc<DeltaTable>, Error> {
let mut table_guard = self.table.lock().await;
if let Some(tbl_arc) = table_guard.as_ref() {
return Ok(Arc::clone(tbl_arc));
}
let table = Arc::new(DeltaTable::new(...).await?);
*table_guard = Some(Arc::clone(&table));
Ok(table)
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 79 to 84, change
the mutex-protected storage from Option<DeltaTable> to Option<Arc<DeltaTable>>
to avoid expensive deep cloning. Modify the return type of get_or_create_table
to return Arc<DeltaTable> by cloning the Arc instead of the full DeltaTable.
Additionally, around lines 200 to 202, wrap the created or updated DeltaTable
instance in Arc::new(...) before storing it in the mutex to enable cheap shallow
cloning on subsequent accesses.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
crates/arkflow-plugin/src/output/deltalake.rs (2)

88-91: Implement the Arc optimization suggested in past reviews.

The current implementation stores Option<DeltaTable> which causes expensive deep clones on every access. This should be changed to Option<Arc<DeltaTable>> as suggested in previous reviews.


328-423: Expand test coverage as suggested in past reviews.

The tests cover configuration parsing and validation well, but lack coverage for the core functionality like table creation, write operations, and error handling scenarios.

🧹 Nitpick comments (3)
crates/arkflow-plugin/src/output/deltalake.rs (3)

19-33: Consider grouping imports by source for better organization.

The import statements are functional but could be better organized by grouping standard library, external crates, and internal crates separately for improved readability.

+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use deltalake::kernel::Schema;
+use deltalake::{
+    arrow::record_batch::RecordBatch, operations::create::CreateBuilder, DeltaOps, DeltaTable,
+    DeltaTableBuilder,
+};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+use tracing::{debug, info, warn};
+
+use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
+use arkflow_core::{Error, MessageBatch, Resource};
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
-use arkflow_core::{Error, MessageBatch, Resource};
-use async_trait::async_trait;
-use deltalake::kernel::Schema;
-use deltalake::{
-    arrow::record_batch::RecordBatch, operations::create::CreateBuilder, DeltaOps, DeltaTable,
-    DeltaTableBuilder,
-};
-use serde::{Deserialize, Serialize};
-use tokio::sync::RwLock;
-use tracing::{debug, info, warn};

194-215: Enhance retry logic with exponential backoff and jitter.

The current retry logic uses linear backoff which can lead to thundering herd problems. Consider implementing exponential backoff with jitter for better distributed system behavior.

 async fn write_with_retry(&self, record_batch: RecordBatch) -> Result<DeltaTable, Error> {
     let mut last_error = None;
     
     for attempt in 0..=self.config.max_retries {
         if attempt > 0 {
-            let delay = std::time::Duration::from_millis(self.config.retry_delay_ms * attempt as u64);
+            // Exponential backoff with jitter
+            let base_delay = self.config.retry_delay_ms * (2_u64.pow(attempt as u32 - 1));
+            let jitter = fastrand::u64(0..=base_delay / 4); // 25% jitter
+            let delay = std::time::Duration::from_millis(base_delay + jitter);
             debug!("Retrying write operation after {:?} (attempt {})", delay, attempt);
             tokio::time::sleep(delay).await;
         }
         
         match self.execute_write_operation(record_batch.clone()).await {
             Ok(table) => return Ok(table),
             Err(e) => {
                 warn!("Write attempt {} failed: {}", attempt + 1, e);
                 last_error = Some(e);
             }
         }
     }
     
     Err(last_error.unwrap_or_else(|| Error::Process("All write attempts failed".to_string())))
 }

243-248: Improve schema parsing with better error context.

The schema parsing error could be more descriptive by including the actual schema content that failed to parse.

 fn parse_schema(&self, schema_str: &str) -> Result<Schema, Error> {
     serde_json::from_str(schema_str)
-        .map_err(|e| Error::Config(format!("Invalid schema JSON: {}", e)))
+        .map_err(|e| Error::Config(format!("Invalid schema JSON '{}': {}", schema_str, e)))
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a41fc82 and f6aeb0b.

📒 Files selected for processing (1)
  • crates/arkflow-plugin/src/output/deltalake.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
  • GitHub Check: build

Comment on lines +259 to +284
async fn write(&self, msg: MessageBatch) -> Result<(), Error> {
// Convert MessageBatch to RecordBatch
let record_batch: RecordBatch = msg.into();

// Early return for empty batches
if record_batch.num_rows() == 0 {
debug!("Skipping write for empty record batch");
return Ok(());
}

debug!("Writing {} rows to Delta table", record_batch.num_rows());

// Execute write with retry logic
let result = self.write_with_retry(record_batch).await?;

debug!(
"Successfully wrote data to Delta table, version: {}",
result.version()
);

// Update the table reference with the new version
let mut table_guard = self.table.write().await;
table_guard.replace(result);

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for record batch schema compatibility.

The write method converts MessageBatch to RecordBatch but doesn't validate if the schema matches the Delta table schema, which could lead to runtime errors.

 async fn write(&self, msg: MessageBatch) -> Result<(), Error> {
     // Convert MessageBatch to RecordBatch
     let record_batch: RecordBatch = msg.into();
     
     // Early return for empty batches
     if record_batch.num_rows() == 0 {
         debug!("Skipping write for empty record batch");
         return Ok(());
     }

+    // Validate schema compatibility with existing table
+    let table = self.get_or_create_table().await?;
+    let table_schema = table.schema()?;
+    let batch_schema = record_batch.schema();
+    
+    if !schemas_compatible(&table_schema, &batch_schema) {
+        return Err(Error::Process(format!(
+            "Record batch schema incompatible with table schema. Table: {:?}, Batch: {:?}",
+            table_schema, batch_schema
+        )));
+    }

     debug!("Writing {} rows to Delta table", record_batch.num_rows());

     // Execute write with retry logic
     let result = self.write_with_retry(record_batch).await?;

     debug!(
         "Successfully wrote data to Delta table, version: {}",
         result.version()
     );

     // Update the table reference with the new version
     let mut table_guard = self.table.write().await;
     table_guard.replace(result);

     Ok(())
 }

You'll also need to add a helper function for schema compatibility checking:

fn schemas_compatible(table_schema: &Schema, batch_schema: &arrow::datatypes::Schema) -> bool {
    // Implementation to check if schemas are compatible
    // This is a simplified check - you may want more sophisticated logic
    table_schema.fields().len() == batch_schema.fields().len()
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 259 to 284, the
write method lacks validation to ensure the RecordBatch schema matches the Delta
table schema, risking runtime errors. Add a helper function to check schema
compatibility by comparing the number of fields or other relevant schema
properties. Then, before proceeding with the write, retrieve the current table
schema and validate it against the RecordBatch schema using this helper. If the
schemas are incompatible, return an error early to prevent invalid writes.

Comment on lines +304 to +320
if config.is_none() {
return Err(Error::Config(
"Delta Lake output configuration is missing".to_string(),
));
}

// Parse the configuration
let config: DeltaLakeOutputConfig = serde_json::from_value(config.clone().unwrap())
.map_err(|e| Error::Config(format!("Invalid Delta Lake configuration: {}", e)))?;

// Validate required fields
if config.table_uri.is_empty() {
return Err(Error::Config("table_uri is required".to_string()));
}

Ok(Arc::new(DeltaLakeOutput::new(config)?))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve configuration validation in the builder.

The builder validates table_uri emptiness but doesn't use the comprehensive validation already implemented in DeltaLakeOutput::validate_config.

 fn build(
     &self,
     _name: Option<&String>,
     config: &Option<serde_json::Value>,
     _resource: &Resource,
 ) -> Result<Arc<dyn Output>, Error> {
     if config.is_none() {
         return Err(Error::Config(
             "Delta Lake output configuration is missing".to_string(),
         ));
     }

     // Parse the configuration
     let config: DeltaLakeOutputConfig = serde_json::from_value(config.clone().unwrap())
         .map_err(|e| Error::Config(format!("Invalid Delta Lake configuration: {}", e)))?;

-    // Validate required fields
-    if config.table_uri.is_empty() {
-        return Err(Error::Config("table_uri is required".to_string()));
-    }

     Ok(Arc::new(DeltaLakeOutput::new(config)?))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if config.is_none() {
return Err(Error::Config(
"Delta Lake output configuration is missing".to_string(),
));
}
// Parse the configuration
let config: DeltaLakeOutputConfig = serde_json::from_value(config.clone().unwrap())
.map_err(|e| Error::Config(format!("Invalid Delta Lake configuration: {}", e)))?;
// Validate required fields
if config.table_uri.is_empty() {
return Err(Error::Config("table_uri is required".to_string()));
}
Ok(Arc::new(DeltaLakeOutput::new(config)?))
}
fn build(
&self,
_name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
) -> Result<Arc<dyn Output>, Error> {
if config.is_none() {
return Err(Error::Config(
"Delta Lake output configuration is missing".to_string(),
));
}
// Parse the configuration
let config: DeltaLakeOutputConfig = serde_json::from_value(config.clone().unwrap())
.map_err(|e| Error::Config(format!("Invalid Delta Lake configuration: {}", e)))?;
Ok(Arc::new(DeltaLakeOutput::new(config)?))
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 304 to 320, the
builder currently checks only if table_uri is empty but does not leverage the
existing comprehensive validation in DeltaLakeOutput::validate_config. Refactor
the code to remove the manual table_uri emptiness check and instead call
DeltaLakeOutput::validate_config with the parsed config to perform full
validation. Handle any validation errors by returning them as Error::Config to
ensure consistent and thorough configuration validation.

Comment on lines +106 to +121
fn validate_config(config: &DeltaLakeOutputConfig) -> Result<(), Error> {
if config.table_uri.trim().is_empty() {
return Err(Error::Config("table_uri cannot be empty".to_string()));
}

// Validate URI format
if !config.table_uri.starts_with("s3://")
&& !config.table_uri.starts_with("azure://")
&& !config.table_uri.starts_with("gs://")
&& !config.table_uri.starts_with("file://")
&& !config.table_uri.starts_with("/") {
warn!("Table URI '{}' may not be a valid storage path", config.table_uri);
}

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for storage options based on URI scheme.

The URI validation only warns about potentially invalid paths but doesn't validate that required storage options are present for each scheme (e.g., AWS credentials for S3 URIs).

 fn validate_config(config: &DeltaLakeOutputConfig) -> Result<(), Error> {
     if config.table_uri.trim().is_empty() {
         return Err(Error::Config("table_uri cannot be empty".to_string()));
     }
     
     // Validate URI format
     if !config.table_uri.starts_with("s3://") 
         && !config.table_uri.starts_with("azure://") 
         && !config.table_uri.starts_with("gs://")
         && !config.table_uri.starts_with("file://")
         && !config.table_uri.starts_with("/") {
         warn!("Table URI '{}' may not be a valid storage path", config.table_uri);
     }
+    
+    // Validate storage options based on URI scheme
+    if config.table_uri.starts_with("s3://") {
+        if !config.storage_options.contains_key("AWS_ACCESS_KEY_ID") 
+            && !config.storage_options.contains_key("AWS_PROFILE") 
+            && std::env::var("AWS_ACCESS_KEY_ID").is_err() {
+            warn!("S3 URI detected but no AWS credentials found in storage options or environment");
+        }
+    } else if config.table_uri.starts_with("azure://") {
+        if !config.storage_options.contains_key("AZURE_STORAGE_ACCOUNT_NAME") 
+            && std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
+            warn!("Azure URI detected but no Azure credentials found in storage options or environment");
+        }
+    }
     
     Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn validate_config(config: &DeltaLakeOutputConfig) -> Result<(), Error> {
if config.table_uri.trim().is_empty() {
return Err(Error::Config("table_uri cannot be empty".to_string()));
}
// Validate URI format
if !config.table_uri.starts_with("s3://")
&& !config.table_uri.starts_with("azure://")
&& !config.table_uri.starts_with("gs://")
&& !config.table_uri.starts_with("file://")
&& !config.table_uri.starts_with("/") {
warn!("Table URI '{}' may not be a valid storage path", config.table_uri);
}
Ok(())
}
fn validate_config(config: &DeltaLakeOutputConfig) -> Result<(), Error> {
if config.table_uri.trim().is_empty() {
return Err(Error::Config("table_uri cannot be empty".to_string()));
}
// Validate URI format
if !config.table_uri.starts_with("s3://")
&& !config.table_uri.starts_with("azure://")
&& !config.table_uri.starts_with("gs://")
&& !config.table_uri.starts_with("file://")
&& !config.table_uri.starts_with("/") {
warn!("Table URI '{}' may not be a valid storage path", config.table_uri);
}
// Validate storage options based on URI scheme
if config.table_uri.starts_with("s3://") {
if !config.storage_options.contains_key("AWS_ACCESS_KEY_ID")
&& !config.storage_options.contains_key("AWS_PROFILE")
&& std::env::var("AWS_ACCESS_KEY_ID").is_err() {
warn!("S3 URI detected but no AWS credentials found in storage options or environment");
}
} else if config.table_uri.starts_with("azure://") {
if !config.storage_options.contains_key("AZURE_STORAGE_ACCOUNT_NAME")
&& std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
warn!("Azure URI detected but no Azure credentials found in storage options or environment");
}
}
Ok(())
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/deltalake.rs around lines 106 to 121, the
validate_config function only warns about invalid URI schemes but does not check
for required storage options based on the URI scheme. Enhance the function to
validate that for each supported URI scheme (like s3://, azure://, gs://), the
corresponding required storage options (e.g., AWS credentials for s3://) are
present in the config. Return an error if any required options are missing to
ensure proper configuration.

@chenquan chenquan self-assigned this Jul 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant