-
Notifications
You must be signed in to change notification settings - Fork 37
feat(output/s3): Add S3 Output Plugin #412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the WalkthroughThe changes introduce an S3-compatible output plugin for the Arkflow system, enabling data export to AWS S3 and Google Cloud Storage with configurable formats (JSON, CSV, Parquet) and compression (Gzip, Bzip2). New dependencies are added, and a YAML example demonstrates pipeline configuration for S3 output. Additionally, the input handling is refactored by moving file-based inputs out of SQL input, a new file input plugin is added, and the codec interface is updated to use Changes
Sequence Diagram(s)sequenceDiagram
participant Pipeline
participant S3Output
participant ObjectStore (S3/GCS)
Pipeline->>S3Output: write(batch)
S3Output->>S3Output: Serialize batch (JSON/CSV/Parquet)
S3Output->>S3Output: Compress data (Gzip/Bzip2, optional)
S3Output->>ObjectStore (S3/GCS): Upload file (with timestamped filename)
ObjectStore (S3/GCS)-->>S3Output: Upload result
S3Output-->>Pipeline: Write result
Possibly related PRs
Suggested labels
Poem
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (7)
examples/s3_output_example.yaml (2)
18-18
: Fix formatting issues detected by static analysis.Remove trailing spaces and add a newline at the end of the file.
Apply this diff to fix the formatting:
- type: "s3" + type: "s3" ... - path: "test" - prefix: "arkflow" - format: "JSON" #json/csv/parquet + path: "test" + prefix: "arkflow" + format: "JSON" #json/csv/parquet ... - type: "stdout" + type: "stdout" +Also applies to: 25-27, 31-31
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 18-18: trailing spaces
(trailing-spaces)
27-27
: Consider using consistent casing for format values.The format value uses uppercase "JSON" while the comment shows lowercase options.
For consistency with the comment, consider:
- format: "JSON" #json/csv/parquet + format: "json" #json/csv/parquet🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 27-27: trailing spaces
(trailing-spaces)
crates/arkflow-plugin/src/output/s3.rs (5)
176-183
: Consider performing actual connection verification.The
connect
method only logs but doesn't verify the connection to the storage provider.Consider adding a lightweight connection check:
async fn connect(&self) -> Result<(), Error> { info!( "S3 client for bucket '{}' initialized. Performing a lightweight connection check (e.g., listing root).", self.config.bucket ); + // Perform a lightweight operation to verify connectivity + self.s3_client + .list(Some(&Path::from(&self.config.path))) + .next() + .await; Ok(()) }
282-282
: Replace Korean comments with English for consistency.The codebase should maintain consistent language in comments.
Apply this diff:
- .map_err(|e| Error::Process(format!("Gzip write_all failed: {}", e)))?; // 에러 메시지 구체화 + .map_err(|e| Error::Process(format!("Gzip write_all failed: {}", e)))?; ... - .map_err(|e| Error::Process(format!("Bzip2 write_all failed: {}", e)))?; // 에러 메시지 구체화 + .map_err(|e| Error::Process(format!("Bzip2 write_all failed: {}", e)))?;Also applies to: 291-291
299-309
: Document that Parquet format ignores compression setting.The file extension logic shows that Parquet format doesn't use compression suffixes, but this isn't documented.
Consider adding a comment or handling compression for Parquet:
fn get_file_extension_from_config(config: &S3OutputConfig) -> &'static str { match (&config.format, &config.compression) { (S3OutputFormat::Json, Some(S3Compression::Gzip)) => "json.gz", (S3OutputFormat::Csv, Some(S3Compression::Gzip)) => "csv.gz", (S3OutputFormat::Json, Some(S3Compression::Bzip2)) => "json.bz2", (S3OutputFormat::Csv, Some(S3Compression::Bzip2)) => "csv.bz2", (S3OutputFormat::Json, None) => "json", (S3OutputFormat::Csv, None) => "csv", + // Note: Parquet has built-in compression, external compression is not applied (S3OutputFormat::Parquet, _) => "parquet", } }
187-217
: Consider adding error context for debugging.The spawn_blocking task could benefit from additional context in error messages.
Consider adding more context to help with debugging:
.await - .map_err(|e| Error::Process(format!("Data processing task failed: {}", e)))??; + .map_err(|e| Error::Process(format!( + "Data processing task failed for format {:?} with compression {:?}: {}", + self.config.format, self.config.compression, e + )))??;
326-326
: Avoid unnecessary clone of config.The config is cloned when it could be borrowed.
Use borrowing instead of cloning:
- let config: S3OutputConfig = serde_json::from_value(config.clone().unwrap())?; + let config: S3OutputConfig = serde_json::from_value(config.as_ref().unwrap().clone())?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
Cargo.toml
(2 hunks)crates/arkflow-plugin/Cargo.toml
(4 hunks)crates/arkflow-plugin/src/output/s3.rs
(1 hunks)examples/s3_output_example.yaml
(1 hunks)
🧰 Additional context used
🪛 YAMLlint (1.37.1)
examples/s3_output_example.yaml
[error] 18-18: trailing spaces
(trailing-spaces)
[error] 25-25: trailing spaces
(trailing-spaces)
[error] 26-26: trailing spaces
(trailing-spaces)
[error] 27-27: trailing spaces
(trailing-spaces)
[error] 31-31: no new line character at the end of file
(new-line-at-end-of-file)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (2)
Cargo.toml (1)
24-24
: LGTM!The new workspace dependencies are appropriate for the S3 output plugin's serialization and CSV format requirements.
Also applies to: 34-34
crates/arkflow-plugin/Cargo.toml (1)
20-20
: LGTM!All new dependencies are necessary and well-chosen for the S3 output plugin's functionality:
- Serialization formats:
arrow-csv
,parquet
- Compression:
bzip2
,flate2
- Utilities:
serde_with
,chrono
Also applies to: 35-35, 40-40, 50-50, 81-82
examples/s3_output_example.yaml
Outdated
access_key_id: "your access key id" | ||
secret_access_key: "your secret access key" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add security warning about credentials in example.
Consider adding a comment warning users not to commit real credentials.
Add a security warning comment:
credentials:
+ # WARNING: These are placeholder values. Never commit real credentials to version control!
access_key_id: "your access key id"
secret_access_key: "your secret access key"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
access_key_id: "your access key id" | |
secret_access_key: "your secret access key" | |
credentials: | |
# WARNING: These are placeholder values. Never commit real credentials to version control! | |
access_key_id: "your access key id" | |
secret_access_key: "your secret access key" |
🤖 Prompt for AI Agents
In examples/s3_output_example.yaml around lines 23 to 24, add a comment above
the access_key_id and secret_access_key fields warning users not to commit real
credentials to version control. The comment should clearly state that these are
example placeholders and real credentials must be kept secure and never
committed publicly.
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
#[serde(rename_all = "snake_case")] | ||
pub struct GcpCredentials { | ||
service_account_path: String, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make service_account_path
field public for consistency.
The service_account_path
field should be public to match the visibility of fields in AwsCredentials
.
Apply this diff:
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct GcpCredentials {
- service_account_path: String,
+ pub service_account_path: String,
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/s3.rs at lines 56 to 60, the
service_account_path field in the GcpCredentials struct is private but should be
made public to maintain consistency with the AwsCredentials struct. Change the
visibility of service_account_path by adding the pub keyword before it.
Thank you for initiating the pr. I will handle it as soon as possible. |
I am researching the code for datafusion, perhaps we can directly use its existing features. Just like this part: arkflow/crates/arkflow-plugin/src/input/sql.rs Lines 71 to 79 in b7096f4
|
Bumps [cc](https://github.com/rust-lang/cc-rs) from 1.2.24 to 1.2.26. - [Release notes](https://github.com/rust-lang/cc-rs/releases) - [Changelog](https://github.com/rust-lang/cc-rs/blob/main/CHANGELOG.md) - [Commits](rust-lang/cc-rs@cc-v1.2.24...cc-v1.2.26) --- updated-dependencies: - dependency-name: cc dependency-version: 1.2.26 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [mio](https://github.com/tokio-rs/mio) from 1.0.3 to 1.0.4. - [Release notes](https://github.com/tokio-rs/mio/releases) - [Changelog](https://github.com/tokio-rs/mio/blob/master/CHANGELOG.md) - [Commits](https://github.com/tokio-rs/mio/commits) --- updated-dependencies: - dependency-name: mio dependency-version: 1.0.4 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* chore: simplify the code for the python part * chore: simplify the code for the python part
…s#441) * chore(action): add sccache to speed up Rust project builds * x * x
Bumps [colorchoice](https://github.com/rust-cli/anstyle) from 1.0.3 to 1.0.4. - [Commits](rust-cli/anstyle@colorchoice-v1.0.3...colorchoice-v1.0.4) --- updated-dependencies: - dependency-name: colorchoice dependency-version: 1.0.4 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@chenquan |
Yes, I am still in the process of further verification. |
* feat(input): Split the `file` from the `sql` * x * x * x * x
@chenquan |
Look at this: In addition, partition writing should also be considered: |
…s#448) Bumps [tracing-attributes](https://github.com/tokio-rs/tracing) from 0.1.28 to 0.1.29. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](tokio-rs/tracing@tracing-attributes-0.1.28...tracing-attributes-0.1.29) --- updated-dependencies: - dependency-name: tracing-attributes dependency-version: 0.1.29 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [toml_datetime](https://github.com/toml-rs/toml) from 0.6.9 to 0.6.11. - [Commits](toml-rs/toml@toml_datetime-v0.6.9...toml_datetime-v0.6.11) --- updated-dependencies: - dependency-name: toml_datetime dependency-version: 0.6.11 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [anstyle](https://github.com/rust-cli/anstyle) from 1.0.10 to 1.0.11. - [Commits](rust-cli/anstyle@v1.0.10...v1.0.11) --- updated-dependencies: - dependency-name: anstyle dependency-version: 1.0.11 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [psl](https://github.com/addr-rs/psl) from 2.1.111 to 2.1.119. - [Release notes](https://github.com/addr-rs/psl/releases) - [Commits](addr-rs/psl@v2.1.111...v2.1.119) --- updated-dependencies: - dependency-name: psl dependency-version: 2.1.119 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [portable-atomic](https://github.com/taiki-e/portable-atomic) from 1.11.0 to 1.11.1. - [Release notes](https://github.com/taiki-e/portable-atomic/releases) - [Changelog](https://github.com/taiki-e/portable-atomic/blob/main/CHANGELOG.md) - [Commits](taiki-e/portable-atomic@v1.11.0...v1.11.1) --- updated-dependencies: - dependency-name: portable-atomic dependency-version: 1.11.1 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [bumpalo](https://github.com/fitzgen/bumpalo) from 3.17.0 to 3.18.1. - [Changelog](https://github.com/fitzgen/bumpalo/blob/main/CHANGELOG.md) - [Commits](fitzgen/bumpalo@3.17.0...v3.18.1) --- updated-dependencies: - dependency-name: bumpalo dependency-version: 3.18.1 dependency-type: indirect update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [anstyle-query](https://github.com/rust-cli/anstyle) from 1.1.2 to 1.1.3. - [Commits](rust-cli/anstyle@anstyle-query-v1.1.2...anstyle-query-v1.1.3) --- updated-dependencies: - dependency-name: anstyle-query dependency-version: 1.1.3 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* refactor(codec): refactoring the codec * x
@chenquan |
@chenquan let s3_client_clone = Arc::clone(&self.s3_client);
let ctx = SessionContext::new();
let url= Url::parse(&config_clone.url)
.map_err(|e| Error::Config(format!("Invalid URL: {}", e)))?;
ctx.register_object_store(&url, s3_client_clone);
ctx.register_batch("batch", msg.into())
.map_err(|e| Error::Process(format!("Failed to register batch: {}", e)))?;
let df = ctx
.sql("SELECT * FROM batch")
.await
.map_err(|e| Error::Process(format!("Failed to execute SQL: {}", e)))?;
// Approach 1 (seems better)
df.write_csv(path, options, writer_options);
// Approach 2
let logical_plan = df.logical_plan();
let state = ctx.state();
let physical_plan = state
.create_physical_plan(&logical_plan)
.await
.map_err(|e| Error::Process(format!("Failed to create physical plan: {}", e)))?;
let partitions = Arc::new(
RepartitionExec::try_new(physical_plan, Partitioning::RoundRobinBatch(1))
.map_err(|e| Error::Process(format!("Failed to repartition data: {}", e)))?,
);
ctx.write_csv(partitions, "s3://dummy/")
.await
.map_err(|e| Error::Process(format!("Failed to write CSV: {}", e)))?; |
Bumps [object_store](https://github.com/apache/arrow-rs-object-store) from 0.12.1 to 0.12.2. - [Changelog](https://github.com/apache/arrow-rs-object-store/blob/main/CHANGELOG-old.md) - [Commits](apache/arrow-rs-object-store@v0.12.1...v0.12.2) --- updated-dependencies: - dependency-name: object_store dependency-version: 0.12.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [psl](https://github.com/addr-rs/psl) from 2.1.119 to 2.1.120. - [Release notes](https://github.com/addr-rs/psl/releases) - [Commits](addr-rs/psl@v2.1.119...v2.1.120) --- updated-dependencies: - dependency-name: psl dependency-version: 2.1.120 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [aws-smithy-json](https://github.com/smithy-lang/smithy-rs) from 0.61.3 to 0.61.4. - [Release notes](https://github.com/smithy-lang/smithy-rs/releases) - [Changelog](https://github.com/smithy-lang/smithy-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/smithy-lang/smithy-rs/commits) --- updated-dependencies: - dependency-name: aws-smithy-json dependency-version: 0.61.4 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [adler2](https://github.com/oyvindln/adler2) from 2.0.0 to 2.0.1. - [Changelog](https://github.com/oyvindln/adler2/blob/main/CHANGELOG.md) - [Commits](https://github.com/oyvindln/adler2/commits) --- updated-dependencies: - dependency-name: adler2 dependency-version: 2.0.1 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [wasi](https://github.com/bytecodealliance/wasi) from 0.11.0+wasi-snapshot-preview1 to 0.11.1+wasi-snapshot-preview1. - [Commits](bytecodealliance/wasi-rs@0.11.0...0.11.1) --- updated-dependencies: - dependency-name: wasi dependency-version: 0.11.1+wasi-snapshot-preview1 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [miniz_oxide](https://github.com/Frommi/miniz_oxide) from 0.8.8 to 0.8.9. - [Changelog](https://github.com/Frommi/miniz_oxide/blob/master/CHANGELOG.md) - [Commits](https://github.com/Frommi/miniz_oxide/commits) --- updated-dependencies: - dependency-name: miniz_oxide dependency-version: 0.8.9 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* docs: add wx-group * x
Hi! @kyounghunJang |
Bumps [cfg-if](https://github.com/rust-lang/cfg-if) from 1.0.0 to 1.0.1. - [Release notes](https://github.com/rust-lang/cfg-if/releases) - [Changelog](https://github.com/rust-lang/cfg-if/blob/main/CHANGELOG.md) - [Commits](rust-lang/cfg-if@1.0.0...v1.0.1) --- updated-dependencies: - dependency-name: cfg-if dependency-version: 1.0.1 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [tracing-core](https://github.com/tokio-rs/tracing) from 0.1.33 to 0.1.34. - [Release notes](https://github.com/tokio-rs/tracing/releases) - [Commits](tokio-rs/tracing@tracing-core-0.1.33...tracing-core-0.1.34) --- updated-dependencies: - dependency-name: tracing-core dependency-version: 0.1.34 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [libc](https://github.com/rust-lang/libc) from 0.2.172 to 0.2.173. - [Release notes](https://github.com/rust-lang/libc/releases) - [Changelog](https://github.com/rust-lang/libc/blob/0.2.173/CHANGELOG.md) - [Commits](rust-lang/libc@0.2.172...0.2.173) --- updated-dependencies: - dependency-name: libc dependency-version: 0.2.173 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [rustc-demangle](https://github.com/rust-lang/rustc-demangle) from 0.1.24 to 0.1.25. - [Commits](https://github.com/rust-lang/rustc-demangle/commits) --- updated-dependencies: - dependency-name: rustc-demangle dependency-version: 0.1.25 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [memchr](https://github.com/BurntSushi/memchr) from 2.7.4 to 2.7.5. - [Commits](BurntSushi/memchr@2.7.4...2.7.5) --- updated-dependencies: - dependency-name: memchr dependency-version: 2.7.5 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps redox_syscall from 0.5.12 to 0.5.13. --- updated-dependencies: - dependency-name: redox_syscall dependency-version: 0.5.13 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [windows-link](https://github.com/microsoft/windows-rs) from 0.1.1 to 0.1.3. - [Release notes](https://github.com/microsoft/windows-rs/releases) - [Commits](https://github.com/microsoft/windows-rs/commits) --- updated-dependencies: - dependency-name: windows-link dependency-version: 0.1.3 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [base64ct](https://github.com/RustCrypto/formats) from 1.7.3 to 1.8.0. - [Commits](https://github.com/RustCrypto/formats/commits/base64ct/v1.8.0) --- updated-dependencies: - dependency-name: base64ct dependency-version: 1.8.0 dependency-type: indirect update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [libmimalloc-sys](https://github.com/purpleprotocol/mimalloc_rust) from 0.1.42 to 0.1.43. - [Release notes](https://github.com/purpleprotocol/mimalloc_rust/releases) - [Commits](purpleprotocol/mimalloc_rust@v0.1.42...v0.1.43) --- updated-dependencies: - dependency-name: libmimalloc-sys dependency-version: 0.1.43 dependency-type: indirect update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…arkflow into feature/s3-output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
examples/s3_output_example.yaml (1)
23-24
: Security concern already flagged in previous review.The security warning about not committing real credentials has already been addressed in a previous review comment.
crates/arkflow-plugin/src/output/s3.rs (1)
50-54
: Makeservice_account_path
field public for consistency.The
service_account_path
field should be public to match the visibility of fields inAwsCredentials
.Apply this diff:
#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct GcpCredentials { - service_account_path: String, + pub service_account_path: String, }
🧹 Nitpick comments (21)
README_zh.md (3)
3-5
: Unbalanced<p>
tags may break GitHub-Markdown rendering
<p align="center">
is opened twice but never closed before the next block, which can lead to unpredictable layout on some Markdown renderers.<p align="center"> <img align="center" width="150px" src="images/logo.svg"> -<p align="center"> +</p> +<p align="center">
22-24
:float="left"
is not a valid attribute for<p>
in GitHub MarkdownGitHub strips unknown attributes, so the intended side-by-side layout of the two images is lost.
Consider using a simple HTML table or inline-block<img>
tags instead.-<p float="left"> -<img src="images/cncf-logo.svg" width="200"/> -<img src="images/cncf-landscape-logo.svg" width="150"/> -</p> +<table><tr> + <td><img src="images/cncf-logo.svg" width="200"/></td> + <td><img src="images/cncf-landscape-logo.svg" width="150"/></td> +</tr></table>
290-295
: Improve accessibility of the WeChat QR-code imageUse a more descriptive
alt
text to help screen-reader users and add a<p>
wrapper for consistent spacing.-<img src="./images/wx-group.png" alt="wx" width="300" /> +<p align="center"> + <img src="images/wx-group.png" alt="ArkFlow 微信交流群二维码" width="300" /> +</p>README.md (2)
4-4
: Add alt text for accessibility.The logo image is missing alt text, which is important for accessibility. Consider adding a descriptive alt attribute.
-<img align="center" width="150px" src="images/logo.svg"> +<img align="center" width="150px" src="images/logo.svg" alt="ArkFlow Logo">
24-25
: Add alt text for CNCF logos.The CNCF logos are missing alt text for accessibility compliance.
-<img src="images/cncf-logo.svg" width="200"/> -<img src="images/cncf-landscape-logo.svg" width="150"/> +<img src="images/cncf-logo.svg" width="200" alt="CNCF Logo"/> +<img src="images/cncf-landscape-logo.svg" width="150" alt="CNCF Landscape Logo"/>examples/s3_output_example.yaml (4)
18-18
: Fix trailing spaces.Remove trailing spaces on this line.
- type: "s3" + type: "s3"
25-25
: Fix trailing spaces.Remove trailing spaces on this line.
- format: "csv" #json/csv/parquet + format: "csv" #json/csv/parquet
27-27
: Fix trailing spaces.Remove trailing spaces on this line.
- format_options: + format_options:
30-30
: Add missing newline at end of file.Add a newline character at the end of the file to comply with YAML formatting standards.
type: "stdout" +
docs/docs/components/4-temporary/redis.md (2)
7-7
: Fix heading level increment issue.The heading jumps from h2 to h4, which violates markdown best practices for proper document structure.
Apply this diff to fix the heading hierarchy:
-#### **mode** (required) +### **mode** (required)Also update the subsequent h4 headings at the same level to h3 for consistency:
- Line 21:
###### **type** (required)
→#### **type** (required)
- Line 27:
###### **url** (required)
→#### **url** (required)
- Line 44:
###### **type** (required)
→#### **type** (required)
- Line 50:
###### **urls** (required)
→#### **urls** (required)
- Line 56:
#### **redis_type** (required)
→### **redis_type** (required)
- Line 80:
#### **codec** (required)
→### **codec** (required)
90-91
: Consider adding comma for readability.While grammatically optional, adding a comma after "Currently" would improve readability of this sentence.
Apply this diff:
-Currently supports JSON codec for data encoding/decoding. +Currently, it supports JSON codec for data encoding/decoding.docs/docs/components/2-processors/python.md (2)
42-44
: Consider removing redundant "back" in description.The phrase "converts the returned PyArrow batches back to MessageBatch format" contains a potentially redundant "back".
Apply this diff for more concise wording:
-The Python processor converts the incoming MessageBatch to PyArrow format, passes it to the specified Python function, and then converts the returned PyArrow batches back to MessageBatch format. +The Python processor converts the incoming MessageBatch to PyArrow format, passes it to the specified Python function, and then converts the returned PyArrow batches to MessageBatch format.
306-306
: Fix heading formatting issue.The heading has multiple spaces after the hash character, which violates markdown formatting standards.
Apply this diff:
-### Using Polars for High-Performance Data Manipulation +### Using Polars for High-Performance Data Manipulationcrates/arkflow-plugin/src/input/file.rs (2)
196-225
: Consider documenting security implications ofallow_http
.While the implementation is correct, enabling
allow_http
could expose credentials over unencrypted connections. Consider adding a warning in the configuration documentation about using this option only in trusted environments.
411-412
: Improve error handling to avoid potential panic.Using
unwrap()
could cause a panic. Consider using the?
operator for cleaner error propagation.Apply this diff:
-let config: FileInputConfig = serde_json::from_value(config.clone().unwrap()) - .map_err(|e| Error::Config(format!("Failed to parse File input config: {}", e)))?; +let config_value = config.as_ref().ok_or_else(|| { + Error::Config("File input configuration is missing".to_string()) +})?; +let config: FileInputConfig = serde_json::from_value(config_value.clone()) + .map_err(|e| Error::Config(format!("Failed to parse File input config: {}", e)))?;docs/blog/2025/06/14-ArkFlow+Python-Easy-Real-time-AI.md (2)
16-16
: Grammar: Use singular form for compound modifier."Systems languages" should be "system languages" when used as a compound modifier.
-deep expertise in systems languages used for building stream processors +deep expertise in system languages used for building stream processors
28-28
: Consider removing the qualifier for stronger writing.The word "somewhat" weakens the statement. Consider removing it or rephrasing for more confident writing.
-have been somewhat separate +have been largely separatecrates/arkflow-plugin/src/output/s3.rs (4)
202-208
: Replace Korean comments with English.The Korean comment should be translated to English for better code maintainability and international collaboration.
Apply this diff:
- // writer_options가 없으면 기본값으로 생성 + // Create default writer options if none exist
294-306
: Extract compression type parsing to reduce code duplication.The compression type parsing logic is duplicated between CSV and JSON options. Consider extracting this into a helper method.
Add a helper method:
+ fn parse_compression_type(compression_str: &str) -> Result<CompressionTypeVariant, Error> { + match compression_str.to_lowercase().as_str() { + "gzip" => Ok(CompressionTypeVariant::GZIP), + "bzip2" => Ok(CompressionTypeVariant::BZIP2), + "xz" => Ok(CompressionTypeVariant::XZ), + "zstd" => Ok(CompressionTypeVariant::ZSTD), + unsupported => Err(Error::Config(format!( + "Unsupported compression type: {}", + unsupported + ))), + } + }Then use it in both methods:
- let compression_type = match compression_str.to_lowercase().as_str() { - "gzip" => CompressionTypeVariant::GZIP, - "bzip2" => CompressionTypeVariant::BZIP2, - "xz" => CompressionTypeVariant::XZ, - "zstd" => CompressionTypeVariant::ZSTD, - unsupported => { - return Err(Error::Config(format!( - "Unsupported compression type: {}", - unsupported - ))); - } - }; + let compression_type = Self::parse_compression_type(compression_str)?;Also applies to: 320-332
364-404
: Improve file extension handling consistency.The file extension handling logic could be simplified and made more maintainable by extracting the compression extension mapping.
Consider extracting the compression extension mapping:
+ fn get_compression_extension(compression: &str) -> &'static str { + match compression.to_lowercase().as_str() { + "gzip" => ".gz", + "bzip2" => ".bz2", + "xz" => ".xz", + "zstd" => ".zst", + _ => "", + } + }Then simplify the file path construction:
- let compression_ext = match compression.to_lowercase().as_str() { - "gzip" => ".gz", - "bzip2" => ".bz2", - "xz" => ".xz", - "zstd" => ".zst", - _ => "", - }; + let compression_ext = Self::get_compression_extension(compression);
256-257
: Handle table deregistration error properly.The table deregistration error is being ignored. Consider logging the error or handling it appropriately.
Apply this diff:
- let _ = self.ctx.deregister_table("batch"); + if let Err(e) = self.ctx.deregister_table("batch") { + tracing::warn!("Failed to deregister table 'batch': {}", e); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (7)
Cargo.lock
is excluded by!**/*.lock
docs/pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
docs/static/img/arkflow.svg
is excluded by!**/*.svg
images/cncf-landscape-logo.svg
is excluded by!**/*.svg
images/cncf-logo.svg
is excluded by!**/*.svg
images/logo.svg
is excluded by!**/*.svg
images/wx-group.png
is excluded by!**/*.png
📒 Files selected for processing (35)
.github/workflows/rust.yml
(2 hunks)Cargo.toml
(3 hunks)README.md
(1 hunks)README_zh.md
(2 hunks)crates/arkflow-core/Cargo.toml
(1 hunks)crates/arkflow-core/src/cli/mod.rs
(1 hunks)crates/arkflow-core/src/codec/mod.rs
(2 hunks)crates/arkflow-core/src/lib.rs
(2 hunks)crates/arkflow-core/src/stream/mod.rs
(1 hunks)crates/arkflow-plugin/Cargo.toml
(6 hunks)crates/arkflow-plugin/src/buffer/join.rs
(4 hunks)crates/arkflow-plugin/src/buffer/window.rs
(1 hunks)crates/arkflow-plugin/src/codec/json.rs
(2 hunks)crates/arkflow-plugin/src/input/file.rs
(1 hunks)crates/arkflow-plugin/src/input/mod.rs
(2 hunks)crates/arkflow-plugin/src/input/sql.rs
(0 hunks)crates/arkflow-plugin/src/output/s3.rs
(1 hunks)crates/arkflow-plugin/src/output/sql.rs
(11 hunks)crates/arkflow-plugin/src/processor/python.rs
(6 hunks)crates/arkflow-plugin/src/temporary/redis.rs
(6 hunks)docs/blog/2025/06/14-ArkFlow+Python-Easy-Real-time-AI.md
(1 hunks)docs/docs/components/0-inputs/multiple_inputs.md
(1 hunks)docs/docs/components/1-buffers/memory.md
(2 hunks)docs/docs/components/1-buffers/session_window.md
(2 hunks)docs/docs/components/1-buffers/sliding_window.md
(2 hunks)docs/docs/components/1-buffers/tumbling_window.md
(1 hunks)docs/docs/components/2-processors/python.md
(1 hunks)docs/docs/components/2-processors/sql.md
(1 hunks)docs/docs/components/4-temporary/_category_.json
(1 hunks)docs/docs/components/4-temporary/redis.md
(1 hunks)docs/docusaurus.config.ts
(1 hunks)examples/python_processor_example.yaml
(1 hunks)examples/redis_temporary_example.yaml
(1 hunks)examples/s3_output_example.yaml
(1 hunks)examples/sql_output_example.yaml
(1 hunks)
💤 Files with no reviewable changes (1)
- crates/arkflow-plugin/src/input/sql.rs
✅ Files skipped from review due to trivial changes (9)
- crates/arkflow-core/src/cli/mod.rs
- crates/arkflow-core/Cargo.toml
- crates/arkflow-core/src/stream/mod.rs
- examples/sql_output_example.yaml
- docs/docusaurus.config.ts
- docs/docs/components/4-temporary/category.json
- examples/redis_temporary_example.yaml
- docs/docs/components/1-buffers/memory.md
- docs/docs/components/0-inputs/multiple_inputs.md
🚧 Files skipped from review as they are similar to previous changes (2)
- Cargo.toml
- crates/arkflow-plugin/Cargo.toml
🧰 Additional context used
🧬 Code Graph Analysis (4)
crates/arkflow-plugin/src/input/mod.rs (6)
crates/arkflow-plugin/src/input/file.rs (1)
init
(417-420)crates/arkflow-plugin/src/input/sql.rs (1)
init
(339-341)crates/arkflow-plugin/src/input/modbus.rs (1)
init
(230-233)crates/arkflow-plugin/src/input/multiple_inputs.rs (1)
init
(178-181)crates/arkflow-plugin/src/input/kafka.rs (1)
init
(200-202)crates/arkflow-plugin/src/input/http.rs (1)
init
(200-202)
crates/arkflow-plugin/src/buffer/window.rs (1)
crates/arkflow-plugin/src/buffer/join.rs (1)
new
(46-60)
crates/arkflow-core/src/codec/mod.rs (1)
crates/arkflow-plugin/src/codec/json.rs (2)
encode
(24-37)decode
(41-46)
crates/arkflow-plugin/src/processor/python.rs (1)
crates/arkflow-core/src/lib.rs (1)
new_arrow
(187-192)
🪛 LanguageTool
README.md
[uncategorized] ~15-~15: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... 54px;" width="250" height="54" /> High performance Rust stream processing engine seamlessl...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
docs/docs/components/4-temporary/redis.md
[uncategorized] ~90-~90: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...ct` yaml codec: type: "json"
Currently supports JSON codec for data encoding/d...
(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)
docs/blog/2025/06/14-ArkFlow+Python-Easy-Real-time-AI.md
[grammar] ~16-~16: Nouns are not usually modified by plural nouns. Is it possible that you meant to use the singular or possessive form here?
Context: ...ed developers to have deep expertise in systems languages used for building stream proc...
(PLURAL_MODIFIER)
[style] ~28-~28: Qualifiers like “somewhat” can weaken your message and make your writing sound uncertain. Consider removing it or choosing an alternative to sound more confident.
Context: ...cessing (often in Java/Scala) have been somewhat separate. Deploying Python AI models in...
(SOMEWHAT)
docs/docs/components/2-processors/python.md
[style] ~43-~43: Using “back” with the verb “return” may be redundant.
Context: ...n converts the returned PyArrow batches back to MessageBatch format. ## Examples #...
(RETURN_BACK)
docs/docs/components/2-processors/sql.md
[uncategorized] ~53-~53: Loose punctuation mark.
Context: ...ing required:
true -
table_name`: Table name to use for this temporary da...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~59-~59: Loose punctuation mark.
Context: ...e: string
required: true
- key
: Key expression or value used to retriev...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~72-~72: Loose punctuation mark.
Context: ... required: false
- value
: Static string value to use as the key ...
(UNLIKELY_OPENING_PUNCTUATION)
🪛 markdownlint-cli2 (0.17.2)
README.md
4-4: Images should have alternate text (alt text)
null
(MD045, no-alt-text)
21-21: Multiple spaces after hash on atx style heading
null
(MD019, no-multiple-space-atx)
24-24: Images should have alternate text (alt text)
null
(MD045, no-alt-text)
25-25: Images should have alternate text (alt text)
null
(MD045, no-alt-text)
docs/docs/components/4-temporary/redis.md
7-7: Heading levels should only increment by one level at a time
Expected: h3; Actual: h4
(MD001, heading-increment)
docs/blog/2025/06/14-ArkFlow+Python-Easy-Real-time-AI.md
14-14: Heading levels should only increment by one level at a time
Expected: h2; Actual: h3
(MD001, heading-increment)
docs/docs/components/2-processors/python.md
306-306: Multiple spaces after hash on atx style heading
null
(MD019, no-multiple-space-atx)
🪛 YAMLlint (1.37.1)
examples/s3_output_example.yaml
[error] 18-18: trailing spaces
(trailing-spaces)
[error] 25-25: trailing spaces
(trailing-spaces)
[error] 27-27: trailing spaces
(trailing-spaces)
[error] 30-30: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (44)
.github/workflows/rust.yml (2)
12-13
: LGTM! Excellent CI optimization.Adding sccache environment variables will significantly improve Rust compilation times in CI by enabling distributed caching.
25-26
: LGTM! Proper sccache setup.The sccache action is correctly placed before the build step and uses a stable version. This will provide substantial build time improvements.
README.md (2)
12-13
: LGTM! Excellent documentation links.Adding direct links to both latest and development documentation improves user experience significantly.
16-19
: LGTM! Enhanced project description effectively highlights AI capabilities.The updated description better reflects ArkFlow's AI/ML integration features mentioned in the PR objectives, including machine learning model execution, streaming inference, and anomaly detection.
crates/arkflow-plugin/src/input/mod.rs (2)
21-21
: LGTM! Proper module declaration.The file module is correctly declared as public, following the same pattern as other input modules.
46-46
: LGTM! Consistent initialization pattern.The
file::init()?
call follows the established pattern used by other input modules, ensuring proper error propagation and module registration.crates/arkflow-plugin/src/buffer/window.rs (1)
77-83
: LGTM! Constructor call properly updated.The
JoinOperation::new
call correctly includes the newvalue_field
andthread_num
parameters, matching the updated constructor signature shown in the relevant code snippets fromjoin.rs
.examples/python_processor_example.yaml (4)
5-10
: LGTM! More realistic and diverse input data.Switching from generated data to memory-based input with multiple distinct JSON messages provides a better demonstration of the Python processor's capabilities with varied data.
16-16
: LGTM! Essential preprocessing step added.Adding the
json_to_arrow
processor before the Python processor is crucial for converting JSON input into Arrow format that PyArrow can process efficiently.
18-56
: LGTM! Comprehensive PyArrow demonstration.The inline Python script excellently demonstrates:
- PyArrow record batch manipulation
- Column extraction and computation
- Schema creation with new fields
- Zero-copy data processing capabilities
This provides a much more educational and practical example than external module references.
57-59
: LGTM! Clear function specification.Specifying the
transform_data
function name and emptypython_path
makes the configuration clear and self-contained. The commentedmodule
line shows the alternative approach without cluttering the example.crates/arkflow-core/src/codec/mod.rs (1)
24-28
: LGTM! Well-designed codec interface refactoring.The signature changes properly separate concerns by having codecs handle raw byte serialization/deserialization while MessageBatch remains a higher-level construct. This aligns well with the JSON codec implementation shown in the relevant snippets.
docs/docs/components/1-buffers/sliding_window.md (1)
27-43
: Excellent documentation improvements.The enhanced parameter descriptions and internal mechanism details significantly improve the documentation quality. The technical details about Tokio async runtime, Arrow batch concatenation, and cancellation tokens provide valuable insights for developers.
docs/docs/components/1-buffers/session_window.md (1)
9-93
: Comprehensive documentation enhancements.The additions provide excellent coverage of the session window buffer capabilities, including the new join configuration options and detailed internal mechanism explanations. The practical examples with SQL join operations make the component much more accessible to users.
docs/docs/components/4-temporary/redis.md (1)
97-119
: Verify RIGHT JOIN support and Redis temporary integrationPlease confirm that the example SQL syntax and Redis temporary setup align with the actual SQL processor implementation:
- Location:
docs/docs/components/4-temporary/redis.md
(lines 97–119)- The SQL processor uses DataFusion’s SQL parser under the hood—verify that
RIGHT JOIN
is supported in your target version.- Ensure the
temporary_list
entry maps correctly to theTemporaryConfig
struct (name, table_name, key) and that the Redis temporary component is available at runtime.- Consider adding a small integration test or documentation note if
RIGHT JOIN
isn’t currently supported by the SQL processor.docs/docs/components/2-processors/sql.md (1)
38-77
: Well-structured documentation for temporary_list configuration.The documentation clearly explains the temporary_list functionality with proper schema definitions. The hierarchical structure with required/optional fields and type information provides good developer guidance.
docs/docs/components/1-buffers/tumbling_window.md (3)
17-48
: Comprehensive join configuration documentation.The join configuration section provides clear documentation for SQL join operations within tumbling windows. The optional nature and detailed field descriptions are well-structured.
51-61
: Enhanced internal mechanism documentation.The updated internal mechanism section provides much better insight into the implementation details, including specific technologies used (Tokio, DataFusion, Arrow) and architectural patterns (RwLock, HashMap, cancellation tokens).
75-89
: Join configuration syntax is correctThe tumbling window’s
join
field maps toOption<JoinConfig>
supportingquery
, optionalvalue_field
,codec
, and a defaultthread_num
. The example aligns with the implementation; no changes are needed.crates/arkflow-plugin/src/processor/python.rs (4)
31-38
: Good configuration simplification with sensible defaults.The changes remove optional handling for
module
andpython_path
fields by providing sensible defaults ("__main__"
and empty vector respectively). This simplifies configuration while maintaining flexibility.
50-86
: Improved error handling and type safety.The refactored process method separates Python execution (returning
Vec<RecordBatch>
) from MessageBatch conversion, which improves type safety and error handling clarity. The double question mark operator correctly handles both task spawn errors and Python execution errors.
157-164
: Clean default value functions.The default value functions are simple and correctly implemented for serde deserialization.
112-114
: No breaking change for script-only configurationsThe
default_module()
still returns"__main__"
, so for configs that only supply ascript
(and omitmodule
),py.import("__main__")
will always succeed and your script’s functions remain available. Only setups that explicitly specify a custom module name will now error immediately on import failures—which matches the intended behavior.• Existing example configs (e.g.
examples/python_processor_example.yaml
) omitmodule
and continue to work unchanged.
• If you have any pipelines that setmodule
to a non-existent name, they will now fail fast; please update those names or provide the corresponding Python files.docs/docs/components/2-processors/python.md (2)
73-131
: Excellent comprehensive pipeline example.This complete pipeline example effectively demonstrates the Python processor integration with JSON-to-Arrow conversion, custom Python processing with PyArrow compute functions, and output formatting. The example is realistic and educational.
150-333
: Outstanding collection of practical examples.The variety of PyArrow, Pandas, and Polars examples covers real-world data processing scenarios comprehensively. The examples demonstrate filtering, transformation, aggregation, time series processing, data splitting, and high-performance manipulation. These provide excellent guidance for users.
crates/arkflow-plugin/src/codec/json.rs (4)
16-21
: Good simplification of the JSON codec design.Removing the configuration struct and making
JsonCodec
stateless improves code clarity and reduces complexity.
23-38
: Encoder implementation correctly handles the new interface.The conversion from Arrow to line-delimited JSON and then to
Vec<Bytes>
is properly implemented with appropriate error handling.
40-47
: Decoder correctly handles byte array input.The implementation properly joins the byte arrays with newlines before converting to Arrow format, maintaining consistency with the encoder's line-delimited format.
49-59
: Builder correctly reflects the stateless codec design.The simplified builder appropriately ignores the configuration parameter and returns a new
JsonCodec
instance.crates/arkflow-plugin/src/input/file.rs (2)
42-138
: Well-structured configuration with good security defaults.The configuration design uses proper type safety with enums and includes security-conscious defaults like
default_disallow_http
. The validation for mutually exclusive GCS authentication methods is particularly well handled.
140-183
: Solid async implementation with proper concurrency handling.The use of
Arc<Mutex<Option<SendableRecordBatchStream>>>
properly handles concurrent access to the stream. The file format handling is clean and extensible.crates/arkflow-plugin/src/buffer/join.rs (3)
28-60
: Good addition of parallelism configuration.The
thread_num
field with CPU-based default andvalue_field
for flexible data extraction enhance the join operation's capabilities.
81-90
: Efficient batch splitting with proper schema handling.The implementation correctly handles batch splitting for parallelism and gracefully handles empty batches with an empty schema.
134-147
: Proper integration with the new codec interface.The method correctly uses the configurable
value_field
with a sensible default fallback and properly converts data for the codec's decode method.crates/arkflow-plugin/src/temporary/redis.rs (2)
16-120
: Clean codec abstraction implementation.The Redis temporary storage now properly uses the codec abstraction for decoding, making it format-agnostic and consistent with the broader architecture changes.
160-174
: Builder properly integrates with the Resource system.The updated builder correctly uses the Resource to build the codec instance, maintaining consistency with the codec configuration pattern.
crates/arkflow-plugin/src/output/sql.rs (6)
35-42
: LGTM - Good encapsulation improvement.The visibility change from
pub
to package-private is appropriate sinceSqlValue
is an internal representation and doesn't need to be exposed outside the crate.
46-50
: LGTM - Enhanced configuration structure.The refactoring from simple enum variants to configuration structs improves maintainability and allows for more detailed database-specific settings.
143-153
: LGTM - Well-structured configuration.The separate configuration structs for MySQL and PostgreSQL provide clear separation and allow for database-specific options.
164-195
: LGTM - Consistent SSL configuration handling.The updated methods properly accept the new configuration structs and maintain consistent error handling patterns.
231-246
: LGTM - Simplified connection management.Removing the atomic connection flag simplifies the implementation while maintaining thread safety through the mutex-protected connection.
276-282
: LGTM - Clean pattern matching.The refactored connection initialization properly uses the new configuration structs with clear pattern matching.
crates/arkflow-plugin/src/output/s3.rs (2)
117-130
: LGTM - Well-structured constructor.The constructor properly initializes all components and validates the configuration during client creation.
180-187
: LGTM - Proper connection setup.The connection logic correctly registers the object store and prepares writer options.
pub fn split_batch(batch_to_split: RecordBatch, size: usize) -> Vec<RecordBatch> { | ||
let size = size.max(1); | ||
let total_rows = batch_to_split.num_rows(); | ||
if total_rows <= DEFAULT_RECORD_BATCH { | ||
return vec![batch_to_split]; | ||
} | ||
|
||
let (chunk_size, capacity) = if size * DEFAULT_RECORD_BATCH < total_rows { | ||
(total_rows.div_ceil(size), size) | ||
} else { | ||
( | ||
DEFAULT_RECORD_BATCH, | ||
total_rows.div_ceil(DEFAULT_RECORD_BATCH), | ||
) | ||
}; | ||
|
||
let mut chunks = Vec::with_capacity(capacity); | ||
let mut offset = 0; | ||
while offset < total_rows { | ||
let length = std::cmp::min(chunk_size, total_rows - offset); | ||
let slice = batch_to_split.slice(offset, length); | ||
chunks.push(slice); | ||
offset += length; | ||
} | ||
|
||
chunks | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Clarify parameter naming and simplify chunking logic.
The parameter name size
is ambiguous - it's unclear whether it represents chunk size or number of chunks. Based on the logic, it appears to represent the desired number of chunks.
Consider renaming the parameter and simplifying the logic:
-pub fn split_batch(batch_to_split: RecordBatch, size: usize) -> Vec<RecordBatch> {
- let size = size.max(1);
+pub fn split_batch(batch_to_split: RecordBatch, target_chunks: usize) -> Vec<RecordBatch> {
+ let target_chunks = target_chunks.max(1);
let total_rows = batch_to_split.num_rows();
if total_rows <= DEFAULT_RECORD_BATCH {
return vec![batch_to_split];
}
- let (chunk_size, capacity) = if size * DEFAULT_RECORD_BATCH < total_rows {
- (total_rows.div_ceil(size), size)
- } else {
- (
- DEFAULT_RECORD_BATCH,
- total_rows.div_ceil(DEFAULT_RECORD_BATCH),
- )
- };
+ // Use smaller of: target chunks or chunks needed to stay under DEFAULT_RECORD_BATCH
+ let chunks_needed = total_rows.div_ceil(DEFAULT_RECORD_BATCH);
+ let actual_chunks = target_chunks.min(chunks_needed);
+ let chunk_size = total_rows.div_ceil(actual_chunks);
- let mut chunks = Vec::with_capacity(capacity);
+ let mut chunks = Vec::with_capacity(actual_chunks);
This makes the intent clearer and simplifies the logic.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/lib.rs around lines 284 to 310, the parameter named
`size` is ambiguous as it is unclear if it represents the chunk size or the
number of chunks. Rename this parameter to something like `num_chunks` to
clarify that it represents the desired number of chunks. Then simplify the
chunking logic accordingly by calculating the chunk size based on the total rows
divided by `num_chunks` and adjusting the capacity for the vector of chunks.
This will make the function's intent clearer and the logic easier to follow.
### SQL Query with Temporary Data Sources | ||
```yaml | ||
- temporary: | ||
- name: user_profiles | ||
type: "redis" | ||
mode: | ||
type: single | ||
url: redis://127.0.0.1:6379 | ||
redis_type: | ||
type: string | ||
|
||
processor: | ||
type: "sql" | ||
query: "SELECT u.id, u.name, p.title FROM users u JOIN profiles p ON u.id = p.user_id" | ||
table_name: "users" | ||
temporary_list: | ||
- name: "user_profiles" | ||
table_name: "profiles" | ||
key: | ||
expr: "user_id" | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify Redis temporary configuration in example.
The example shows a Redis temporary configuration, but ensure the codec
property is correctly placed and the Redis temporary component supports the configuration shown.
🏁 Script executed:
#!/bin/bash
# Verify Redis temporary component configuration structure
ast-grep --pattern $'struct RedisTemporaryConfig {
$$$
codec: $_,
$$$
}'
Length of output: 79
🏁 Script executed:
#!/bin/bash
# Find where RedisTemporaryConfig is defined
rg -n "RedisTemporaryConfig" -g '*.rs'
# Search for any `codec` usage in temporary component configs
rg -n "codec" -g '*.rs' | grep -i temporary
Length of output: 1377
Add Missing codec
Property to Redis Temporary Example
The RedisTemporaryConfig
struct defines a required codec: CodecConfig
field, so the YAML snippet in docs should include it. Update the example under temporary
to include a codec
section and verify the Redis temporary component supports that configuration.
• File: docs/docs/components/2-processors/sql.md (lines 90–111)
• Add after redis_type
:
redis_type:
type: string
+ codec:
+ type: "json"
Ensure that the Redis temporary processor accepts and correctly builds this codec configuration.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
### SQL Query with Temporary Data Sources | |
```yaml | |
- temporary: | |
- name: user_profiles | |
type: "redis" | |
mode: | |
type: single | |
url: redis://127.0.0.1:6379 | |
redis_type: | |
type: string | |
processor: | |
type: "sql" | |
query: "SELECT u.id, u.name, p.title FROM users u JOIN profiles p ON u.id = p.user_id" | |
table_name: "users" | |
temporary_list: | |
- name: "user_profiles" | |
table_name: "profiles" | |
key: | |
expr: "user_id" | |
``` | |
- temporary: | |
- name: user_profiles | |
type: "redis" | |
mode: | |
type: single | |
url: redis://127.0.0.1:6379 | |
redis_type: | |
type: string | |
codec: | |
type: "json" | |
processor: | |
type: "sql" | |
query: "SELECT u.id, u.name, p.title FROM users u JOIN profiles p ON u.id = p.user_id" | |
table_name: "users" | |
temporary_list: | |
- name: "user_profiles" | |
table_name: "profiles" | |
key: | |
expr: "user_id" |
🤖 Prompt for AI Agents
In docs/docs/components/2-processors/sql.md around lines 90 to 111, the YAML
example for the Redis temporary data source is missing the required codec
property as defined in RedisTemporaryConfig. Add a codec section after
redis_type in the temporary configuration, specifying the codec details
according to CodecConfig. Verify that the Redis temporary processor supports
this codec configuration and that the example aligns with the expected
structure.
if let Some(endpoint) = &config.endpoint { | ||
azure_builder = azure_builder | ||
.with_endpoint(endpoint.clone()) | ||
.with_allow_http(true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security concern: allow_http
is unconditionally enabled with custom endpoints.
When a custom endpoint is specified, the code automatically enables allow_http
. This could inadvertently expose Azure credentials over unencrypted connections. Consider making this configurable like in the S3 implementation.
Apply this diff to make allow_http
configurable:
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct MicrosoftAzureConfig {
+ // ... existing fields ...
+ /// Allow HTTP connections (defaults to false for security)
+ #[serde(default = "default_disallow_http")]
+ allow_http: bool,
+}
if let Some(endpoint) = &config.endpoint {
azure_builder = azure_builder
.with_endpoint(endpoint.clone())
- .with_allow_http(true);
+ .with_allow_http(config.allow_http);
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/input/file.rs around lines 281 to 285, the code
unconditionally enables allow_http when a custom Azure endpoint is set, which
risks exposing credentials over unencrypted connections. Modify the code to make
allow_http configurable via the config, similar to the S3 implementation, by
checking a configuration flag before enabling allow_http instead of always
enabling it.
The core of the real-time capability we pursue is not just about speed, but about the *timeliness* of insights. In dynamic operational environments such as fraud detection, algorithmic trading, or critical system monitoring, an insight delayed by even a few seconds can lose all its value. The value of predictions, classifications, or other actionable insights generated by AI models decays rapidly over time in many critical business and operational scenarios. Therefore, we are committed to enabling you to process data and apply AI algorithms simultaneously as the data flows in, which is crucial for maximizing the utility and impact of these insights. This places stringent demands on the underlying technology platform, and it is precisely why we created ArkFlow—to provide a solution that seamlessly integrates high-performance stream processing with the execution of complex AI models. | ||
|
||
|
||
### The Python Processor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix heading hierarchy.
The heading jumps from h1 to h3. It should be h2 for proper document structure.
-### The Python Processor
+## The Python Processor
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
### The Python Processor | |
## The Python Processor |
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
14-14: Heading levels should only increment by one level at a time
Expected: h2; Actual: h3
(MD001, heading-increment)
🤖 Prompt for AI Agents
In docs/blog/2025/06/14-ArkFlow+Python-Easy-Real-time-AI.md at line 14, the
heading currently uses an h3 tag but should be changed to an h2 tag to maintain
proper heading hierarchy and document structure. Update the markdown heading
syntax from ### to ## for this line.
Hi, @chenquan |
Description
This PR introduces a new output plugin that supports writing data to cloud storage buckets. Currently, the plugin supports both AWS and GCP buckets, with support for Azure planned in future updates.
The plugin can be configured as follows:
Example
#274
@chenquan
Future Plans
Add support for Azure Blob Storage
Extend support for other output formats and compression methods
Summary by CodeRabbit
New Features
Bug Fixes
Chores