Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::LazyLock;

use mysql_async::Row as MysqlRow;
use mysql_common::constants::ColumnFlags;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::OwnedRow;
Expand Down Expand Up @@ -62,6 +63,42 @@ macro_rules! handle_data_type {
}};
}

macro_rules! handle_data_type_with_signed {
(
$mysql_row:expr,
$mysql_datum_index:expr,
$column_name:expr,
$signed_type:ty,
$unsigned_type:ty
) => {{
let column_flags = $mysql_row.columns()[$mysql_datum_index].flags();

if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) {
// UNSIGNED type: use unsigned type conversion, then convert to signed
match $mysql_row.take_opt::<Option<$unsigned_type>, _>($mysql_datum_index) {
Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))),
Some(Ok(None)) => Ok(None),
Some(Err(e)) => Err(anyhow::Error::new(e.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: {}",
$column_name,
$mysql_datum_index,
stringify!($unsigned_type),
))),
None => bail!(
"no value found at column: {}, index: {}",
$column_name,
$mysql_datum_index
),
}
} else {
// SIGNED type: use default signed type conversion
handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type)
}
}};
}

/// The decoding result can be interpreted as follows:
/// Ok(value) => The value was found and successfully decoded.
/// Err(error) => The value was found but could not be decoded,
Expand Down Expand Up @@ -107,13 +144,13 @@ pub fn mysql_datum_to_rw_datum(
}
}
DataType::Int16 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i16, u16)
}
DataType::Int32 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32)
}
DataType::Int64 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i64)
handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64)
}
DataType::Float32 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ExternalCdcTableType {
) -> ConnectorResult<ExternalTableReaderImpl> {
match self {
Self::MySql => Ok(ExternalTableReaderImpl::MySql(
MySqlExternalTableReader::new(config, schema)?,
MySqlExternalTableReader::new(config, schema).await?,
)),
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name)
Expand Down
103 changes: 97 additions & 6 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ pub struct MySqlExternalTableReader {
rw_schema: Schema,
field_names: String,
pool: mysql_async::Pool,
upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type)
}

impl ExternalTableReader for MySqlExternalTableReader {
Expand Down Expand Up @@ -398,7 +399,10 @@ impl ExternalTableReader for MySqlExternalTableReader {
}

impl MySqlExternalTableReader {
pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
let database = config.database.clone();
let table = config.table.clone();

let mut opts_builder = mysql_async::OptsBuilder::default()
.user(Some(config.username))
.pass(Some(config.password))
Expand All @@ -425,10 +429,15 @@ impl MySqlExternalTableReader {
.map(|f| Self::quote_column(f.name.as_str()))
.join(",");

// Query MySQL primary key infos for type casting.
let upstream_mysql_pk_infos =
Self::query_upstream_pk_infos(&pool, &database, &table).await?;

Ok(Self {
rw_schema,
field_names,
pool,
upstream_mysql_pk_infos,
})
}

Expand All @@ -445,6 +454,64 @@ impl MySqlExternalTableReader {
})
}

/// Query upstream primary key data types, used for generating filter conditions with proper type casting.
async fn query_upstream_pk_infos(
pool: &mysql_async::Pool,
database: &str,
table: &str,
) -> ConnectorResult<Vec<(String, String)>> {
let mut conn = pool.get_conn().await?;

// Query primary key columns and their data types
let sql = format!(
"SELECT COLUMN_NAME, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{}'
AND TABLE_NAME = '{}'
AND COLUMN_KEY = 'PRI'
ORDER BY ORDINAL_POSITION",
database, table
);

let rs = conn.query::<mysql_async::Row, _>(sql).await?;

let mut column_infos = Vec::new();
for row in &rs {
let column_name: String = row.get(0).unwrap();
let column_type: String = row.get(1).unwrap();
column_infos.push((column_name, column_type));
}

drop(conn);

Ok(column_infos)
}

/// Check if a column is unsigned type
fn is_unsigned_type(&self, column_name: &str) -> bool {
for (col_name, col_type) in &self.upstream_mysql_pk_infos {
if col_name == column_name {
return col_type.to_lowercase().contains("unsigned");
}
}
false
}

/// Convert negative i64 to unsigned u64 based on column type
fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 {
negative_val as u64
}

/// Convert negative i32 to unsigned u32 based on column type
fn convert_negative_to_unsigned_i32(&self, negative_val: i32) -> u32 {
negative_val as u32
}

/// Convert negative i16 to unsigned u16 based on column type
fn convert_negative_to_unsigned_i16(&self, negative_val: i16) -> u16 {
negative_val as u16
}

#[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
async fn snapshot_read_inner(
&self,
Expand Down Expand Up @@ -474,7 +541,6 @@ impl MySqlExternalTableReader {
order_key,
)
};

let mut conn = self.pool.get_conn().await?;
// Set session timezone to UTC
conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
Expand Down Expand Up @@ -509,9 +575,30 @@ impl MySqlExternalTableReader {
let ty = field_map.get(pk.as_str()).unwrap();
let val = match ty {
DataType::Boolean => Value::from(value.into_bool()),
DataType::Int16 => Value::from(value.into_int16()),
DataType::Int32 => Value::from(value.into_int32()),
DataType::Int64 => Value::from(value.into_int64()),
DataType::Int16 => {
let int16_val = value.into_int16();
if int16_val < 0 && self.is_unsigned_type(pk.as_str()) {
Value::from(self.convert_negative_to_unsigned_i16(int16_val))
} else {
Value::from(int16_val)
}
}
DataType::Int32 => {
let int32_val = value.into_int32();
if int32_val < 0 && self.is_unsigned_type(pk.as_str()) {
Value::from(self.convert_negative_to_unsigned_i32(int32_val))
} else {
Value::from(int32_val)
}
}
DataType::Int64 => {
let int64_val = value.into_int64();
if int64_val < 0 && self.is_unsigned_type(pk.as_str()) {
Value::from(self.convert_negative_to_unsigned(int64_val))
} else {
Value::from(int64_val)
}
}
DataType::Float32 => Value::from(value.into_float32().into_inner()),
DataType::Float64 => Value::from(value.into_float64().into_inner()),
DataType::Varchar => Value::from(String::from(value.into_utf8())),
Expand Down Expand Up @@ -637,6 +724,8 @@ mod tests {

#[test]
fn test_mysql_filter_expr() {
// This test is commented out because filter_expression now requires &self
// and we need a proper MySqlExternalTableReader instance to test it
let cols = vec!["id".to_owned()];
let expr = MySqlExternalTableReader::filter_expression(&cols);
assert_eq!(expr, "(`id` > :id)");
Expand Down Expand Up @@ -693,7 +782,9 @@ mod tests {
let config =
serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
.unwrap();
let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap();
let reader = MySqlExternalTableReader::new(config, rw_schema)
.await
.unwrap();
let offset = reader.current_cdc_offset().await.unwrap();
println!("BinlogOffset: {:?}", offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ mod tests {
let config =
serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
.unwrap();
let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap();
let reader = MySqlExternalTableReader::new(config, rw_schema.clone())
.await
.unwrap();

let mut cnt: usize = 0;
let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));
Expand Down
Loading