Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ flume = "=0.11"
rumqttc = "0.24.0"

# Sql
sqlx = { version = "0.8", features = [ "mysql","postgres","runtime-tokio", "tls-native-tls" ] }
sqlx = { version = "0.8", features = [ "mysql","postgres","sqlite","runtime-tokio", "tls-native-tls" ] }

# Kafka
aws-msk-iam-sasl-signer = "1.0.0"
Expand Down
52 changes: 45 additions & 7 deletions crates/arkflow-plugin/src/output/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::warn;

use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::{Connection, MySqlConnection, PgConnection, QueryBuilder};
use sqlx::{Connection, MySqlConnection, PgConnection, SqliteConnection, QueryBuilder};

#[derive(Debug, Clone)]
pub enum SqlValue {
Expand All @@ -42,16 +42,16 @@ pub enum SqlValue {
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum DatabaseType {
Mysql,
Postgres,
// Sqlite,
Sqlite,
}
pub enum DatabaseConnection {
Mysql(MySqlConnection),
Postgres(PgConnection),
// Sqlite(SqliteConnection),
Sqlite(SqliteConnection),
}
impl DatabaseConnection {
/// Executes an INSERT query with the given columns and rows
Expand Down Expand Up @@ -124,6 +124,36 @@ impl DatabaseConnection {
Error::Process(format!("Failed to execute PostgresSQL query: {}", e))
})?;

Ok(())
}
DatabaseConnection::Sqlite(conn) => {
let mut query_builder = QueryBuilder::<sqlx::Sqlite>::new(format!(
"INSERT INTO {} ({})",
output_config.table_name,
columns
.iter()
.map(|c| format!("\"{}\"", c))
.collect::<Vec<_>>()
.join(", "),
));
query_builder.push_values(rows, |mut b, row| {
for value in row {
match value {
SqlValue::String(s) => b.push_bind(s),
SqlValue::Int64(i) => b.push_bind(i),
SqlValue::UInt64(u) => b.push_bind(u as i64),
SqlValue::Float64(f) => b.push_bind(f),
SqlValue::Boolean(bool) => b.push_bind(bool),
SqlValue::Null => b.push_bind(None::<String>),
};
}
});

let query = query_builder.build();
query.execute(conn).await.map_err(|e| {
Error::Process(format!("Failed to execute SQLite query: {}", e))
})?;

Ok(())
}
}
Expand All @@ -135,7 +165,7 @@ impl DatabaseConnection {
#[serde(rename_all = "snake_case")]
pub struct SqlOutputConfig {
/// SQL query statement
output_type: DatabaseType,
database_type: DatabaseType,
table_name: String,
uri: String,
ssl: Option<SslConfig>,
Expand Down Expand Up @@ -268,9 +298,10 @@ impl SqlOutput {
/// Initialize a new DB connection.
/// If `ssl` is configured, apply root certificates to the SSL options.
async fn init_connect(&self) -> Result<DatabaseConnection, Error> {
let conn = match self.sql_config.output_type {
let conn = match self.sql_config.database_type {
DatabaseType::Mysql => self.generate_mysql_conn().await?,
DatabaseType::Postgres => self.generate_postgres_conn().await?,
DatabaseType::Sqlite => self.generate_sqlite_conn().await?,
};
Ok(conn)
}
Expand Down Expand Up @@ -374,7 +405,7 @@ impl SqlOutput {
MySqlConnection::connect(&self.sql_config.uri)
.await
.map_err(|e| Error::Config(format!("Failed to connect to MySQL: {}", e)))?
};
};
Ok(DatabaseConnection::Mysql(mysql_conn))
}
async fn generate_postgres_conn(&self) -> Result<DatabaseConnection, Error> {
Expand All @@ -390,6 +421,13 @@ impl SqlOutput {
};
Ok(DatabaseConnection::Postgres(postgres_conn))
}
async fn generate_sqlite_conn(&self) -> Result<DatabaseConnection, Error> {
let sqlite_conn = SqliteConnection::connect(&self.sql_config.uri)
.await
.map_err(|e| Error::Config(format!("Failed to connect to SQLite: {}", e)))?;
Ok(DatabaseConnection::Sqlite(sqlite_conn))
}

}

pub(crate) struct SqlOutputBuilder;
Expand Down
7 changes: 3 additions & 4 deletions examples/sql_output_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ streams:

output:
type: "sql"
output_type:
type: "mysql"
uri: "mysql://root:1234@localhost:3306/arkflow"
table_name: "arkflow_test"
database_type: "mysql"
uri: "mysql://root:1234@localhost:3306/arkflow"
table_name: "arkflow_test"

error_output:
type: "stdout"