-
Notifications
You must be signed in to change notification settings - Fork 37
feat: Add SQLite support and simplify SQL output configuration #374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes introduce SQLite support to the SQL output plugin by updating dependency features, extending enums and connection logic, and adjusting insert execution to handle SQLite. Configuration fields are renamed and restructured for clarity, and the example YAML is updated to match the new configuration format. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant SqlOutput
participant DatabaseConnection
participant SqliteConnection
User->>SqlOutput: init_connect()
SqlOutput->>SqlOutput: determine database_type
alt database_type is Sqlite
SqlOutput->>SqliteConnection: generate_sqlite_conn()
SqliteConnection-->>SqlOutput: SqliteConnection instance
SqlOutput-->>User: DatabaseConnection::Sqlite
else database_type is MySQL/Postgres
SqlOutput->>DatabaseConnection: generate respective connection
DatabaseConnection-->>SqlOutput: Connection instance
SqlOutput-->>User: DatabaseConnection variant
end
User->>SqlOutput: execute_insert(config, columns, rows)
SqlOutput->>DatabaseConnection: execute_insert()
alt DatabaseConnection is Sqlite
DatabaseConnection->>SqliteConnection: build and execute insert query
else DatabaseConnection is MySQL/Postgres
DatabaseConnection->>respective connection: build and execute insert query
end
DatabaseConnection-->>SqlOutput: Result
SqlOutput-->>User: Result
Possibly related PRs
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 30th. To opt out, configure ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
crates/arkflow-plugin/src/output/sql.rs (4)
424-429
: Consider adding SQLite connection validation.While the implementation works, consider adding validation for the SQLite URI format to prevent runtime errors with invalid connection strings.
async fn generate_sqlite_conn(&self) -> Result<DatabaseConnection, Error> { + // Validate SQLite URI format + if !self.sql_config.uri.starts_with("sqlite:") { + return Err(Error::Config(format!( + "Invalid SQLite URI format: {}. URI should start with 'sqlite:'", + self.sql_config.uri + ))); + } + let sqlite_conn = SqliteConnection::connect(&self.sql_config.uri) .await .map_err(|e| Error::Config(format!("Failed to connect to SQLite: {}", e)))?; Ok(DatabaseConnection::Sqlite(sqlite_conn)) }
60-158
: Consider reducing code duplication in execute_insert.There's significant code duplication between the Postgres and SQLite implementations in
execute_insert
. Consider refactoring to reduce duplication.You could extract common logic to a helper function:
pub async fn execute_insert( &mut self, output_config: &SqlOutputConfig, columns: Vec<String>, rows: Vec<Vec<SqlValue>>, ) -> Result<(), Error> { match self { DatabaseConnection::Mysql(conn) => { let mut query_builder = QueryBuilder::<sqlx::MySql>::new(format!( "INSERT INTO {} ({})", output_config.table_name, columns .iter() .map(|c| format!("`{}`", c)) .collect::<Vec<_>>() .join(", "), )); // MySQL specific implementation... } - DatabaseConnection::Postgres(conn) => { - let mut query_builder = QueryBuilder::<sqlx::Postgres>::new(format!( - "INSERT INTO {} ({})", - output_config.table_name, - columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect::<Vec<_>>() - .join(", "), - )); - query_builder.push_values(rows, |mut b, row| { - for value in row { - match value { - SqlValue::String(s) => b.push_bind(s), - SqlValue::Int64(i) => b.push_bind(i), - SqlValue::UInt64(u) => b.push_bind(u as i64), - SqlValue::Float64(f) => b.push_bind(f), - SqlValue::Boolean(bool) => b.push_bind(bool), - SqlValue::Null => b.push_bind(None::<String>), - }; - } - }); - - let query = query_builder.build(); - query.execute(conn).await.map_err(|e| { - Error::Process(format!("Failed to execute PostgresSQL query: {}", e)) - })?; - - Ok(()) - } - DatabaseConnection::Sqlite(conn) => { - let mut query_builder = QueryBuilder::<sqlx::Sqlite>::new(format!( - "INSERT INTO {} ({})", - output_config.table_name, - columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect::<Vec<_>>() - .join(", "), - )); - query_builder.push_values(rows, |mut b, row| { - for value in row { - match value { - SqlValue::String(s) => b.push_bind(s), - SqlValue::Int64(i) => b.push_bind(i), - SqlValue::UInt64(u) => b.push_bind(u as i64), - SqlValue::Float64(f) => b.push_bind(f), - SqlValue::Boolean(bool) => b.push_bind(bool), - SqlValue::Null => b.push_bind(None::<String>), - }; - } - }); - - let query = query_builder.build(); - query.execute(conn).await.map_err(|e| { - Error::Process(format!("Failed to execute SQLite query: {}", e)) - })?; - - Ok(()) - } + DatabaseConnection::Postgres(conn) => { + self.execute_quoted_insert::<sqlx::Postgres>(conn, output_config, columns, rows, "PostgresSQL").await + } + DatabaseConnection::Sqlite(conn) => { + self.execute_quoted_insert::<sqlx::Sqlite>(conn, output_config, columns, rows, "SQLite").await + } } } + +async fn execute_quoted_insert<'a, DB>( + &self, + conn: &mut DB::Connection, + output_config: &SqlOutputConfig, + columns: Vec<String>, + rows: Vec<Vec<SqlValue>>, + db_name: &str +) -> Result<(), Error> +where + DB: sqlx::Database, + DB::Connection: sqlx::Executor<'a>, + for<'b> <DB as sqlx::database::HasArguments<'b>>::Arguments: sqlx::IntoArguments<'b, DB>, +{ + let mut query_builder = QueryBuilder::<DB>::new(format!( + "INSERT INTO {} ({})", + output_config.table_name, + columns + .iter() + .map(|c| format!("\"{}\"", c)) + .collect::<Vec<_>>() + .join(", "), + )); + query_builder.push_values(rows, |mut b, row| { + for value in row { + match value { + SqlValue::String(s) => b.push_bind(s), + SqlValue::Int64(i) => b.push_bind(i), + SqlValue::UInt64(u) => b.push_bind(u as i64), + SqlValue::Float64(f) => b.push_bind(f), + SqlValue::Boolean(bool) => b.push_bind(bool), + SqlValue::Null => b.push_bind(None::<String>), + }; + } + }); + + let query = query_builder.build(); + query.execute(conn).await.map_err(|e| { + Error::Process(format!("Failed to execute {} query: {}", db_name, e)) + })?; + + Ok(()) +}Note: The above refactoring might require additional type constraints or adjustments based on the specific requirements of the sqlx library.
300-307
: Consider adding documentation for the SQLite option.The
init_connect
method should have its documentation updated to reflect the addition of SQLite support./// Initialize a new DB connection. /// If `ssl` is configured, apply root certificates to the SSL options. + /// Supports MySQL, PostgreSQL, and SQLite connection types. async fn init_connect(&self) -> Result<DatabaseConnection, Error> { let conn = match self.sql_config.database_type { DatabaseType::Mysql => self.generate_mysql_conn().await?, DatabaseType::Postgres => self.generate_postgres_conn().await?, DatabaseType::Sqlite => self.generate_sqlite_conn().await?, }; Ok(conn) }
424-429
: Consider documenting absence of SSL support for SQLite.Unlike MySQL and PostgreSQL, your SQLite implementation doesn't include SSL configuration options. Consider adding a documented reason for this difference.
+ /// Generates a SQLite connection + /// Note: SSL configuration is not applicable for SQLite as it's a file-based database async fn generate_sqlite_conn(&self) -> Result<DatabaseConnection, Error> { + // SQLite uses file-based connections, so SSL is not applicable let sqlite_conn = SqliteConnection::connect(&self.sql_config.uri) .await .map_err(|e| Error::Config(format!("Failed to connect to SQLite: {}", e)))?; Ok(DatabaseConnection::Sqlite(sqlite_conn)) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.toml
(1 hunks)crates/arkflow-plugin/src/output/sql.rs
(7 hunks)examples/sql_output_example.yaml
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (8)
Cargo.toml (1)
47-47
: Good addition of SQLite feature to sqlx dependency.Adding the "sqlite" feature to sqlx dependency enables the SQLite database backend integration in the SQL output plugin, which aligns perfectly with the PR objective of adding SQLite support.
examples/sql_output_example.yaml (1)
19-21
: Configuration structure improvement looks good.The configuration has been simplified by replacing the nested
output_type
with a flattened structure usingdatabase_type
, making it more intuitive and user-friendly.crates/arkflow-plugin/src/output/sql.rs (6)
32-32
: Correctly updated import for SqliteConnection.The import statement has been appropriately updated to include
SqliteConnection
, which is necessary for the SQLite implementation.
45-50
: Good use of snake_case serialization and addition of SQLite variant.Adding the
Sqlite
variant to theDatabaseType
enum and ensuring consistent snake_case serialization enhances the API's usability.
54-54
: Correctly extended DatabaseConnection enum.The DatabaseConnection enum has been properly extended to include the new SQLite connection type.
129-157
: SQLite insert implementation follows good patterns.The SQLite implementation mirrors the PostgreSQL approach, which maintains consistency in the codebase. The code correctly handles:
- Proper column name quoting
- Type conversions (especially UInt64 to i64)
- Error handling with descriptive messages
168-168
: Improved field naming from output_type to database_type.Renaming from
output_type
todatabase_type
more accurately reflects the field's purpose, as it specifies the database type rather than the output type.
304-305
: Good integration of SQLite in the connection initialization flow.The SQLite connection handling has been seamlessly integrated into the existing connection initialization flow.
This PR introduces the following changes:
@chenquan Please take a look when you get a chance — your feedback would be appreciated!
#274
Summary by CodeRabbit
New Features
Improvements
Compatibility