-
Notifications
You must be signed in to change notification settings - Fork 686
feat: batch connector bigquery
support
#23236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for BigQuery as a batch connector in RisingWave, allowing users to create sources that read data from BigQuery tables. The implementation follows the existing batch source pattern and includes schema inference capabilities.
Key changes:
- Implements a new
batch_bigquery
connector with split enumeration and reading capabilities - Adds BigQuery schema extraction and type mapping functionality
- Integrates the new connector into the existing batch source infrastructure
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
src/sqlparser/src/ast/statement.rs |
Adds format validation for bigquery connectors |
src/frontend/src/handler/create_source/validate.rs |
Registers bigquery connector format compatibility |
src/frontend/src/handler/create_source/external_schema/bigquery.rs |
Implements BigQuery schema extraction for source creation |
src/frontend/src/handler/create_source/external_schema.rs |
Integrates BigQuery schema extraction into the external schema handler |
src/connector/src/with_options.rs |
Adds helper methods to identify BigQuery connectors |
src/connector/src/source/mod.rs |
Exports BigQuery batch connector components |
src/connector/src/source/big_query_batch.rs |
Core BigQuery batch source implementation with split enumeration and reading |
src/connector/src/source/batch.rs |
Integrates BigQuery splits into the batch source framework |
src/connector/src/source/base.rs |
Adds BigQuery split conversion support |
src/connector/src/sink/big_query.rs |
Refactors and adds shared BigQuery utilities for type mapping and schema operations |
src/connector/src/macros.rs |
Registers BigQuery batch source in the macro-generated source classifications |
|
||
use std::collections::{BTreeMap, HashMap}; | ||
|
||
use anyhow::{Context, anyhow}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import anyhow
macro consistently. Use either use anyhow::anyhow;
or use anyhow::{Context, anyhow};
but avoid mixing both patterns in the same import statement.
use anyhow::{Context, anyhow}; | |
use anyhow::Context; | |
use anyhow::anyhow; |
Copilot uses AI. Check for mistakes.
// For batch source, we return exactly one split with auto-generated "SELECT * FROM table" query | ||
let query = self.properties.build_query(); | ||
Ok(vec![BatchBigQuerySplit::new( | ||
"bigquery_batch_split".into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider defining this split ID as a constant to avoid magic strings and improve maintainability across the codebase.
Copilot uses AI. Check for mistakes.
let config = serde_json::from_value::<BatchBigQueryProperties>( | ||
serde_json::to_value(properties).unwrap(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This double serialization/deserialization (to_value then from_value) is inefficient. Consider using serde_json::from_str with the original JSON or implementing a direct conversion from BTreeMap.
Copilot uses AI. Check for mistakes.
serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64) | ||
.map_err(|e| SinkError::BigQuery(anyhow!("Failed to parse base64-decoded service account key: {}. Make sure the base64 string contains valid JSON.", e)))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling logic for base64 decoding and JSON parsing is duplicated between build_client
and build_writer_client
. Consider extracting this into a helper method to reduce code duplication.
Copilot uses AI. Check for mistakes.
Looks like this PR extends new SQL syntax or updates existing ones. Make sure that:
|
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
as replacement for #22744
following #23024
pending item:
What's changed and what's your intention?
Checklist
Documentation
Release note