Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
727 changes: 665 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 17 additions & 1 deletion crates/arkflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,20 @@ clap = { workspace = true }
colored = { workspace = true }
flume = { workspace = true }
axum = { workspace = true }
num_cpus = "1.17.0"
uuid = { version = "1.8", features = ["v4"] }
tempfile = "3.10"
num_cpus = "1.17.0"

# Object Storage dependencies
aws-sdk-s3 = { version = "1.8", features = ["rt-tokio"] }
aws-config = { version = "1.8", features = ["behavior-version-latest"] }
azure_storage = { version = "0.20" }
azure_storage_blobs = { version = "0.20" }
google-cloud-storage = { version = "0.15", default-features = false, features = ["auth"] }
http = "1.1"
md-5 = "0.10"
base64 = "0.22"
crc32fast = "1.4"
chrono = { version = "0.4", features = ["serde"] }
flate2 = "1.0"
log = "0.4"
87 changes: 87 additions & 0 deletions crates/arkflow-core/examples/distributed_ack_example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Distributed Acknowledgment Configuration Example

[streams]
name = "distributed_ack_stream"

# Input configuration with distributed acknowledgment support
[streams.input]
type = "distributed_ack_input"

# Inner input configuration
[streams.input.inner_input]
type = "kafka"
brokers = ["localhost:9092"]
topic = "test-topic"
group_id = "distributed_ack_group"

# Distributed acknowledgment configuration
[streams.input.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Object storage configuration
[streams.input.distributed_ack.object_storage]
type = "s3"
bucket = "distributed-ack-bucket"
region = "us-east-1"
access_key_id = "your-access-key"
secret_access_key = "your-secret-key"

# WAL configuration
[streams.input.distributed_ack.wal]
type = "rocksdb"
path = "./distributed_ack_wal"

# Processor configuration
[[streams.pipeline.processors]]
type = "distributed_ack_processor"

# Inner processor configuration
[streams.pipeline.processors.inner_processor]
type = "transform"
script = "data.value = data.value.toUpperCase()"

# Distributed acknowledgment configuration for processor
[streams.pipeline.processors.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Output configuration
[streams.output]
type = "kafka"
brokers = ["localhost:9092"]
topic = "output-topic"

# Stream-level distributed acknowledgment configuration (alternative approach)
[streams.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Object storage configuration for stream-level
[streams.distributed_ack.object_storage]
type = "s3"
bucket = "distributed-ack-bucket"
region = "us-east-1"
access_key_id = "your-access-key"
secret_access_key = "your-secret-key"

# WAL configuration for stream-level
[streams.distributed_ack.wal]
type = "rocksdb"
path = "./distributed_ack_wal"

# Performance configuration
[streams.distributed_ack.performance]
max_pending_acks = 10000
batch_size = 100
flush_interval_ms = 1000
retry_config = { max_retries = 5, initial_delay_ms = 1000, max_delay_ms = 30000, backoff_multiplier = 2.0 }

# Recovery configuration
[streams.distributed_ack.recovery]
enable_recovery = true
recovery_interval_ms = 30000
checkpoint_interval_ms = 60000
Loading
Loading