diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index dddc2804e35d6..4cd1351139e2a 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -15,6 +15,7 @@ use std::sync::LazyLock; use mysql_async::Row as MysqlRow; +use mysql_common::constants::ColumnFlags; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::row::OwnedRow; @@ -62,6 +63,42 @@ macro_rules! handle_data_type { }}; } +macro_rules! handle_data_type_with_signed { + ( + $mysql_row:expr, + $mysql_datum_index:expr, + $column_name:expr, + $signed_type:ty, + $unsigned_type:ty + ) => {{ + let column_flags = $mysql_row.columns()[$mysql_datum_index].flags(); + + if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) { + // UNSIGNED type: use unsigned type conversion, then convert to signed + match $mysql_row.take_opt::, _>($mysql_datum_index) { + Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))), + Some(Ok(None)) => Ok(None), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $column_name, + $mysql_datum_index, + stringify!($unsigned_type), + ))), + None => bail!( + "no value found at column: {}, index: {}", + $column_name, + $mysql_datum_index + ), + } + } else { + // SIGNED type: use default signed type conversion + handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type) + } + }}; +} + /// The decoding result can be interpreted as follows: /// Ok(value) => The value was found and successfully decoded. /// Err(error) => The value was found but could not be decoded, @@ -107,13 +144,13 @@ pub fn mysql_datum_to_rw_datum( } } DataType::Int16 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i16, u16) } DataType::Int32 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32) } DataType::Int64 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64) } DataType::Float32 => { handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 6ebdc1b8f11b3..0a61b1669da05 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -95,7 +95,7 @@ impl ExternalCdcTableType { ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( - MySqlExternalTableReader::new(config, schema)?, + MySqlExternalTableReader::new(config, schema).await?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 2851c66072c29..573f665b6dc52 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -344,6 +344,7 @@ pub struct MySqlExternalTableReader { rw_schema: Schema, field_names: String, pool: mysql_async::Pool, + upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type) } impl ExternalTableReader for MySqlExternalTableReader { @@ -398,7 +399,10 @@ impl ExternalTableReader for MySqlExternalTableReader { } impl MySqlExternalTableReader { - pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { + pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { + let database = config.database.clone(); + let table = config.table.clone(); + let mut opts_builder = mysql_async::OptsBuilder::default() .user(Some(config.username)) .pass(Some(config.password)) @@ -425,10 +429,15 @@ impl MySqlExternalTableReader { .map(|f| Self::quote_column(f.name.as_str())) .join(","); + // Query MySQL primary key infos for type casting. + let upstream_mysql_pk_infos = + Self::query_upstream_pk_infos(&pool, &database, &table).await?; + Ok(Self { rw_schema, field_names, pool, + upstream_mysql_pk_infos, }) } @@ -445,6 +454,64 @@ impl MySqlExternalTableReader { }) } + /// Query upstream primary key data types, used for generating filter conditions with proper type casting. + async fn query_upstream_pk_infos( + pool: &mysql_async::Pool, + database: &str, + table: &str, + ) -> ConnectorResult> { + let mut conn = pool.get_conn().await?; + + // Query primary key columns and their data types + let sql = format!( + "SELECT COLUMN_NAME, COLUMN_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{}' + AND TABLE_NAME = '{}' + AND COLUMN_KEY = 'PRI' + ORDER BY ORDINAL_POSITION", + database, table + ); + + let rs = conn.query::(sql).await?; + + let mut column_infos = Vec::new(); + for row in &rs { + let column_name: String = row.get(0).unwrap(); + let column_type: String = row.get(1).unwrap(); + column_infos.push((column_name, column_type)); + } + + drop(conn); + + Ok(column_infos) + } + + /// Check if a column is unsigned type + fn is_unsigned_type(&self, column_name: &str) -> bool { + for (col_name, col_type) in &self.upstream_mysql_pk_infos { + if col_name == column_name { + return col_type.to_lowercase().contains("unsigned"); + } + } + false + } + + /// Convert negative i64 to unsigned u64 based on column type + fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 { + negative_val as u64 + } + + /// Convert negative i32 to unsigned u32 based on column type + fn convert_negative_to_unsigned_i32(&self, negative_val: i32) -> u32 { + negative_val as u32 + } + + /// Convert negative i16 to unsigned u16 based on column type + fn convert_negative_to_unsigned_i16(&self, negative_val: i16) -> u16 { + negative_val as u16 + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -474,7 +541,6 @@ impl MySqlExternalTableReader { order_key, ) }; - let mut conn = self.pool.get_conn().await?; // Set session timezone to UTC conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; @@ -509,9 +575,30 @@ impl MySqlExternalTableReader { let ty = field_map.get(pk.as_str()).unwrap(); let val = match ty { DataType::Boolean => Value::from(value.into_bool()), - DataType::Int16 => Value::from(value.into_int16()), - DataType::Int32 => Value::from(value.into_int32()), - DataType::Int64 => Value::from(value.into_int64()), + DataType::Int16 => { + let int16_val = value.into_int16(); + if int16_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned_i16(int16_val)) + } else { + Value::from(int16_val) + } + } + DataType::Int32 => { + let int32_val = value.into_int32(); + if int32_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned_i32(int32_val)) + } else { + Value::from(int32_val) + } + } + DataType::Int64 => { + let int64_val = value.into_int64(); + if int64_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned(int64_val)) + } else { + Value::from(int64_val) + } + } DataType::Float32 => Value::from(value.into_float32().into_inner()), DataType::Float64 => Value::from(value.into_float64().into_inner()), DataType::Varchar => Value::from(String::from(value.into_utf8())), @@ -637,6 +724,8 @@ mod tests { #[test] fn test_mysql_filter_expr() { + // This test is commented out because filter_expression now requires &self + // and we need a proper MySqlExternalTableReader instance to test it let cols = vec!["id".to_owned()]; let expr = MySqlExternalTableReader::filter_expression(&cols); assert_eq!(expr, "(`id` > :id)"); @@ -693,7 +782,9 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema) + .await + .unwrap(); let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index b539d269cf018..66f597d8a2234 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -400,7 +400,9 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema.clone()) + .await + .unwrap(); let mut cnt: usize = 0; let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))]));