From e2c1cfb137a736ca68c300d0aa6b12a383d7cd90 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Tue, 16 Sep 2025 16:31:39 +0800 Subject: [PATCH 01/20] fix --- src/connector/src/parser/mysql.rs | 43 ++++++++++++++++++++++-- src/connector/src/parser/unified/json.rs | 32 ++++++++++++++++-- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index dddc2804e35d6..4cd1351139e2a 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -15,6 +15,7 @@ use std::sync::LazyLock; use mysql_async::Row as MysqlRow; +use mysql_common::constants::ColumnFlags; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::row::OwnedRow; @@ -62,6 +63,42 @@ macro_rules! handle_data_type { }}; } +macro_rules! handle_data_type_with_signed { + ( + $mysql_row:expr, + $mysql_datum_index:expr, + $column_name:expr, + $signed_type:ty, + $unsigned_type:ty + ) => {{ + let column_flags = $mysql_row.columns()[$mysql_datum_index].flags(); + + if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) { + // UNSIGNED type: use unsigned type conversion, then convert to signed + match $mysql_row.take_opt::, _>($mysql_datum_index) { + Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))), + Some(Ok(None)) => Ok(None), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $column_name, + $mysql_datum_index, + stringify!($unsigned_type), + ))), + None => bail!( + "no value found at column: {}, index: {}", + $column_name, + $mysql_datum_index + ), + } + } else { + // SIGNED type: use default signed type conversion + handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type) + } + }}; +} + /// The decoding result can be interpreted as follows: /// Ok(value) => The value was found and successfully decoded. /// Err(error) => The value was found but could not be decoded, @@ -107,13 +144,13 @@ pub fn mysql_datum_to_rw_datum( } } DataType::Int16 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i16, u16) } DataType::Int32 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32) } DataType::Int64 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64) } DataType::Float32 => { handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 4782a1659de65..57168f62e48dd 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -277,7 +277,21 @@ impl JsonParseOptions { ( DataType::Int16, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => value.try_as_i16().map_err(|_| create_error())?.into(), + ) => { + // Try signed conversion first + if let Ok(val) = value.try_as_i16() { + val.into() + } else { + // If signed conversion fails, try unsigned conversion (for UNSIGNED types) + // Use try_as_i64() and then cast to u16, then to i16 + let val = value.try_as_i64().map_err(|_| create_error())?; + if val >= 0 && val <= u16::MAX as i64 { + (val as u16 as i16).into() + } else { + Err(create_error())? + } + } + } (DataType::Int16, ValueType::String) if matches!( @@ -298,7 +312,21 @@ impl JsonParseOptions { ( DataType::Int32, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => value.try_as_i32().map_err(|_| create_error())?.into(), + ) => { + // Try signed conversion first + if let Ok(val) = value.try_as_i32() { + val.into() + } else { + // If signed conversion fails, try unsigned conversion (for UNSIGNED types) + // Use try_as_i64() and then cast to u32, then to i32 + let val = value.try_as_i64().map_err(|_| create_error())?; + if val >= 0 && val <= u32::MAX as i64 { + (val as u32 as i32).into() + } else { + Err(create_error())? + } + } + } (DataType::Int32, ValueType::String) if matches!( From 2ee2f47386ef0573626848ce47c943885800d614 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Tue, 16 Sep 2025 16:45:00 +0800 Subject: [PATCH 02/20] revert change in debezium json --- src/connector/src/parser/unified/json.rs | 33 ++---------------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 57168f62e48dd..aacafef96dda7 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -277,21 +277,7 @@ impl JsonParseOptions { ( DataType::Int16, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => { - // Try signed conversion first - if let Ok(val) = value.try_as_i16() { - val.into() - } else { - // If signed conversion fails, try unsigned conversion (for UNSIGNED types) - // Use try_as_i64() and then cast to u16, then to i16 - let val = value.try_as_i64().map_err(|_| create_error())?; - if val >= 0 && val <= u16::MAX as i64 { - (val as u16 as i16).into() - } else { - Err(create_error())? - } - } - } + ) => value.try_as_i16().map_err(|_| create_error())?.into(), (DataType::Int16, ValueType::String) if matches!( @@ -312,22 +298,7 @@ impl JsonParseOptions { ( DataType::Int32, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, - ) => { - // Try signed conversion first - if let Ok(val) = value.try_as_i32() { - val.into() - } else { - // If signed conversion fails, try unsigned conversion (for UNSIGNED types) - // Use try_as_i64() and then cast to u32, then to i32 - let val = value.try_as_i64().map_err(|_| create_error())?; - if val >= 0 && val <= u32::MAX as i64 { - (val as u32 as i32).into() - } else { - Err(create_error())? - } - } - } - + ) => value.try_as_i32().map_err(|_| create_error())?.into(), (DataType::Int32, ValueType::String) if matches!( self.numeric_handling, From cda73e16b65be0dec99a91e7c0f1528a8000d591 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Tue, 16 Sep 2025 16:45:44 +0800 Subject: [PATCH 03/20] fmt --- src/connector/src/parser/unified/json.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index aacafef96dda7..4782a1659de65 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -299,6 +299,7 @@ impl JsonParseOptions { DataType::Int32, ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, ) => value.try_as_i32().map_err(|_| create_error())?.into(), + (DataType::Int32, ValueType::String) if matches!( self.numeric_handling, From dc91e9974249a2ff77d34e690a4987fd7d7398fd Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 12:33:50 +0800 Subject: [PATCH 04/20] need to cast --- src/connector/src/source/cdc/external/mod.rs | 2 +- .../src/source/cdc/external/mysql.rs | 130 +++++++++++++++--- 2 files changed, 110 insertions(+), 22 deletions(-) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 6ebdc1b8f11b3..0a61b1669da05 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -95,7 +95,7 @@ impl ExternalCdcTableType { ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( - MySqlExternalTableReader::new(config, schema)?, + MySqlExternalTableReader::new(config, schema).await?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 2851c66072c29..f70789d0f36c3 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -344,6 +344,7 @@ pub struct MySqlExternalTableReader { rw_schema: Schema, field_names: String, pool: mysql_async::Pool, + mysql_column_infos: Vec<(String, String)>, // (column_name, column_type) } impl ExternalTableReader for MySqlExternalTableReader { @@ -398,7 +399,10 @@ impl ExternalTableReader for MySqlExternalTableReader { } impl MySqlExternalTableReader { - pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { + pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { + let database = config.database.clone(); + let table = config.table.clone(); + let mut opts_builder = mysql_async::OptsBuilder::default() .user(Some(config.username)) .pass(Some(config.password)) @@ -425,10 +429,21 @@ impl MySqlExternalTableReader { .map(|f| Self::quote_column(f.name.as_str())) .join(","); + // Query MySQL column infos + let mysql_column_infos = Self::query_column_infos(&pool, &database, &table).await?; + + // Print column infos for verification + println!("=== MySQL Column Infos ==="); + for (col_name, col_type) in &mysql_column_infos { + println!("PK Column: {} -> Type: {}", col_name, col_type); + } + println!("=== End Column Infos ==="); + Ok(Self { rw_schema, field_names, pool, + mysql_column_infos, }) } @@ -445,6 +460,87 @@ impl MySqlExternalTableReader { }) } + /// Query MySQL primary key column types using direct SQL + async fn query_column_infos( + pool: &mysql_async::Pool, + database: &str, + table: &str, + ) -> ConnectorResult> { + let mut conn = pool.get_conn().await?; + + // Query primary key columns and their data types + let sql = format!( + "SELECT + COLUMN_NAME, + DATA_TYPE, + COLUMN_TYPE, + IS_NULLABLE, + COLUMN_KEY, + COLUMN_DEFAULT, + EXTRA, + COLUMN_COMMENT + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{}' + AND TABLE_NAME = '{}' + AND COLUMN_KEY = 'PRI' + ORDER BY ORDINAL_POSITION", + database, table + ); + + let rs = conn.query::(sql).await?; + + let mut column_infos = Vec::new(); + + println!("=== Primary Key Column Types ==="); + for row in rs.iter() { + let column_name: String = row.get(0).unwrap(); + let data_type: String = row.get(1).unwrap(); + let column_type: String = row.get(2).unwrap(); + let is_nullable: String = row.get(3).unwrap(); + let column_key: String = row.get(4).unwrap(); + let column_default: Option = row.get(5).unwrap(); + let extra: String = row.get(6).unwrap(); + let column_comment: String = row.get(7).unwrap(); + + println!("PK Column: {}", column_name); + println!(" DATA_TYPE: {}", data_type); + println!(" COLUMN_TYPE: {}", column_type); + println!(" IS_NULLABLE: {}", is_nullable); + println!(" COLUMN_KEY: {}", column_key); + println!(" COLUMN_DEFAULT: {:?}", column_default); + println!(" EXTRA: {}", extra); + println!(" COLUMN_COMMENT: {}", column_comment); + println!("---"); + + // Store column name and type + column_infos.push((column_name, column_type)); + } + println!("=== End Primary Key Column Types ==="); + + drop(conn); + + Ok(column_infos) + } + + /// Check if a column is unsigned based on its MySQL column type + fn is_unsigned_column(&self, column_name: &str) -> bool { + for (col_name, col_type) in &self.mysql_column_infos { + if col_name == column_name { + return col_type.contains("unsigned"); + } + } + false + } + + /// Quote a column with potential CAST for unsigned types + fn quote_column_with_cast(&self, column_name: &str) -> String { + if self.is_unsigned_column(column_name) { + format!("CAST({} AS UNSIGNED)", Self::quote_column(column_name)) + } else { + Self::quote_column(column_name) + } + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -465,7 +561,7 @@ impl MySqlExternalTableReader { order_key, ) } else { - let filter_expr = Self::filter_expression(&primary_keys); + let filter_expr = self.filter_expression(&primary_keys); format!( "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}", self.field_names, @@ -551,12 +647,12 @@ impl MySqlExternalTableReader { // mysql cannot leverage the given key to narrow down the range of scan, // we need to rewrite the comparison conditions by our own. // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y)) - fn filter_expression(columns: &[String]) -> String { + fn filter_expression(&self, columns: &[String]) -> String { let mut conditions = vec![]; // push the first condition conditions.push(format!( "({} > :{})", - Self::quote_column(&columns[0]), + self.quote_column_with_cast(&columns[0]), columns[0].to_lowercase() )); for i in 2..=columns.len() { @@ -566,13 +662,13 @@ impl MySqlExternalTableReader { if j == 0 { condition.push_str(&format!( "({} = :{})", - Self::quote_column(col), + self.quote_column_with_cast(col), col.to_lowercase() )); } else { condition.push_str(&format!( " AND ({} = :{})", - Self::quote_column(col), + self.quote_column_with_cast(col), col.to_lowercase() )); } @@ -580,7 +676,7 @@ impl MySqlExternalTableReader { // '>' condition condition.push_str(&format!( " AND ({} > :{})", - Self::quote_column(&columns[i - 1]), + self.quote_column_with_cast(&columns[i - 1]), columns[i - 1].to_lowercase() )); conditions.push(format!("({})", condition)); @@ -635,19 +731,11 @@ mod tests { println!("primary keys: {:?}", &table.pk_names); } - #[test] - fn test_mysql_filter_expr() { - let cols = vec!["id".to_owned()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!(expr, "(`id` > :id)"); - - let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!( - expr, - "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" - ); - } + // TODO: Fix test after implementing dynamic cast logic + // #[test] + // fn test_mysql_filter_expr() { + // // Test will be updated once we have proper mock setup + // } #[test] fn test_mysql_binlog_offset() { @@ -693,7 +781,7 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema).await.unwrap(); let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); From b25e5324588cd3c73821898b519da8463c197d45 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 12:49:29 +0800 Subject: [PATCH 05/20] work --- .../src/source/cdc/external/mysql.rs | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index f70789d0f36c3..477e290e2edc0 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -534,6 +534,7 @@ impl MySqlExternalTableReader { /// Quote a column with potential CAST for unsigned types fn quote_column_with_cast(&self, column_name: &str) -> String { + // 为 unsigned 列添加 CAST 以处理 BIGINT UNSIGNED 溢出问题 if self.is_unsigned_column(column_name) { format!("CAST({} AS UNSIGNED)", Self::quote_column(column_name)) } else { @@ -541,6 +542,16 @@ impl MySqlExternalTableReader { } } + /// Quote a parameter with potential CAST for unsigned types + fn quote_param_with_cast(&self, column_name: &str) -> String { + // 为 unsigned 列添加 CAST 以处理 BIGINT UNSIGNED 溢出问题 + if self.is_unsigned_column(column_name) { + format!("CAST(:{} AS UNSIGNED)", column_name.to_lowercase()) + } else { + format!(":{}", column_name.to_lowercase()) + } + } + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -570,7 +581,7 @@ impl MySqlExternalTableReader { order_key, ) }; - + println!("完整sql: {}", sql); let mut conn = self.pool.get_conn().await?; // Set session timezone to UTC conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; @@ -651,9 +662,9 @@ impl MySqlExternalTableReader { let mut conditions = vec![]; // push the first condition conditions.push(format!( - "({} > :{})", + "({} > {})", self.quote_column_with_cast(&columns[0]), - columns[0].to_lowercase() + self.quote_param_with_cast(&columns[0]) )); for i in 2..=columns.len() { // '=' condition @@ -661,23 +672,23 @@ impl MySqlExternalTableReader { for (j, col) in columns.iter().enumerate().take(i - 1) { if j == 0 { condition.push_str(&format!( - "({} = :{})", + "({} = {})", self.quote_column_with_cast(col), - col.to_lowercase() + self.quote_param_with_cast(col) )); } else { condition.push_str(&format!( - " AND ({} = :{})", + " AND ({} = {})", self.quote_column_with_cast(col), - col.to_lowercase() + self.quote_param_with_cast(col) )); } } // '>' condition condition.push_str(&format!( - " AND ({} > :{})", + " AND ({} > {})", self.quote_column_with_cast(&columns[i - 1]), - columns[i - 1].to_lowercase() + self.quote_param_with_cast(&columns[i - 1]) )); conditions.push(format!("({})", condition)); } From a51d93d4848f35a8d224c43dc7468cdf93915944 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 13:18:54 +0800 Subject: [PATCH 06/20] clippy happy --- .../src/source/cdc/external/mysql.rs | 119 ++++++++---------- .../backfill/cdc/upstream_table/snapshot.rs | 4 +- 2 files changed, 54 insertions(+), 69 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 477e290e2edc0..a9437fbab83f3 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -344,7 +344,7 @@ pub struct MySqlExternalTableReader { rw_schema: Schema, field_names: String, pool: mysql_async::Pool, - mysql_column_infos: Vec<(String, String)>, // (column_name, column_type) + upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type) } impl ExternalTableReader for MySqlExternalTableReader { @@ -402,7 +402,7 @@ impl MySqlExternalTableReader { pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { let database = config.database.clone(); let table = config.table.clone(); - + let mut opts_builder = mysql_async::OptsBuilder::default() .user(Some(config.username)) .pass(Some(config.password)) @@ -429,21 +429,15 @@ impl MySqlExternalTableReader { .map(|f| Self::quote_column(f.name.as_str())) .join(","); - // Query MySQL column infos - let mysql_column_infos = Self::query_column_infos(&pool, &database, &table).await?; - - // Print column infos for verification - println!("=== MySQL Column Infos ==="); - for (col_name, col_type) in &mysql_column_infos { - println!("PK Column: {} -> Type: {}", col_name, col_type); - } - println!("=== End Column Infos ==="); + // Query MySQL primary key infos for type casting. + let upstream_mysql_pk_infos = + Self::query_upstream_pk_infos(&pool, &database, &table).await?; Ok(Self { rw_schema, field_names, pool, - mysql_column_infos, + upstream_mysql_pk_infos, }) } @@ -460,25 +454,17 @@ impl MySqlExternalTableReader { }) } - /// Query MySQL primary key column types using direct SQL - async fn query_column_infos( + /// Query upstream primary key data types, used for generating filter conditions with proper type casting. + async fn query_upstream_pk_infos( pool: &mysql_async::Pool, database: &str, table: &str, ) -> ConnectorResult> { let mut conn = pool.get_conn().await?; - + // Query primary key columns and their data types let sql = format!( - "SELECT - COLUMN_NAME, - DATA_TYPE, - COLUMN_TYPE, - IS_NULLABLE, - COLUMN_KEY, - COLUMN_DEFAULT, - EXTRA, - COLUMN_COMMENT + "SELECT COLUMN_NAME, COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' @@ -486,69 +472,65 @@ impl MySqlExternalTableReader { ORDER BY ORDINAL_POSITION", database, table ); - + let rs = conn.query::(sql).await?; - + let mut column_infos = Vec::new(); - - println!("=== Primary Key Column Types ==="); - for row in rs.iter() { + for row in &rs { let column_name: String = row.get(0).unwrap(); - let data_type: String = row.get(1).unwrap(); - let column_type: String = row.get(2).unwrap(); - let is_nullable: String = row.get(3).unwrap(); - let column_key: String = row.get(4).unwrap(); - let column_default: Option = row.get(5).unwrap(); - let extra: String = row.get(6).unwrap(); - let column_comment: String = row.get(7).unwrap(); - - println!("PK Column: {}", column_name); - println!(" DATA_TYPE: {}", data_type); - println!(" COLUMN_TYPE: {}", column_type); - println!(" IS_NULLABLE: {}", is_nullable); - println!(" COLUMN_KEY: {}", column_key); - println!(" COLUMN_DEFAULT: {:?}", column_default); - println!(" EXTRA: {}", extra); - println!(" COLUMN_COMMENT: {}", column_comment); - println!("---"); - - // Store column name and type + let column_type: String = row.get(1).unwrap(); column_infos.push((column_name, column_type)); } - println!("=== End Primary Key Column Types ==="); - + drop(conn); - + Ok(column_infos) } - /// Check if a column is unsigned based on its MySQL column type - fn is_unsigned_column(&self, column_name: &str) -> bool { - for (col_name, col_type) in &self.mysql_column_infos { + /// Get the MySQL column type for a given column name, converting to CAST-compatible type + fn get_column_type(&self, column_name: &str) -> &str { + for (col_name, col_type) in &self.upstream_mysql_pk_infos { if col_name == column_name { - return col_type.contains("unsigned"); + // Convert MySQL column types to CAST-compatible types + return match col_type.as_str() { + "bigint unsigned" => "UNSIGNED", + "int unsigned" => "UNSIGNED", + "mediumint unsigned" => "UNSIGNED", + "smallint unsigned" => "UNSIGNED", + "tinyint unsigned" => "UNSIGNED", + "bigint" => "SIGNED", + "int" => "SIGNED", + "mediumint" => "SIGNED", + "smallint" => "SIGNED", + "tinyint" => "SIGNED", + _ => "SIGNED", // Default fallback + }; } } - false + "" // Return empty string if not found } - /// Quote a column with potential CAST for unsigned types + /// Quote a column with CAST to its original MySQL type fn quote_column_with_cast(&self, column_name: &str) -> String { - // 为 unsigned 列添加 CAST 以处理 BIGINT UNSIGNED 溢出问题 - if self.is_unsigned_column(column_name) { - format!("CAST({} AS UNSIGNED)", Self::quote_column(column_name)) - } else { + let column_type = self.get_column_type(column_name); + if column_type.is_empty() { Self::quote_column(column_name) + } else { + format!( + "CAST({} AS {})", + Self::quote_column(column_name), + column_type + ) } } - /// Quote a parameter with potential CAST for unsigned types + /// Quote a parameter with CAST to its original MySQL type fn quote_param_with_cast(&self, column_name: &str) -> String { - // 为 unsigned 列添加 CAST 以处理 BIGINT UNSIGNED 溢出问题 - if self.is_unsigned_column(column_name) { - format!("CAST(:{} AS UNSIGNED)", column_name.to_lowercase()) - } else { + let column_type = self.get_column_type(column_name); + if column_type.is_empty() { format!(":{}", column_name.to_lowercase()) + } else { + format!("CAST(:{} AS {})", column_name.to_lowercase(), column_type) } } @@ -581,7 +563,6 @@ impl MySqlExternalTableReader { order_key, ) }; - println!("完整sql: {}", sql); let mut conn = self.pool.get_conn().await?; // Set session timezone to UTC conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; @@ -792,7 +773,9 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema).await.unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema) + .await + .unwrap(); let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index b539d269cf018..66f597d8a2234 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -400,7 +400,9 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema.clone()) + .await + .unwrap(); let mut cnt: usize = 0; let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))])); From 4f360c9b6791cd6c488eb6774ba411470fd98b89 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 13:47:27 +0800 Subject: [PATCH 07/20] minor fix --- src/connector/src/source/cdc/external/mysql.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index a9437fbab83f3..9926391f4d837 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -487,23 +487,15 @@ impl MySqlExternalTableReader { Ok(column_infos) } - /// Get the MySQL column type for a given column name, converting to CAST-compatible type + /// Get the MySQL column type for a given column name, only return CAST type for unsigned integers fn get_column_type(&self, column_name: &str) -> &str { for (col_name, col_type) in &self.upstream_mysql_pk_infos { if col_name == column_name { - // Convert MySQL column types to CAST-compatible types + // Only CAST unsigned integer types, others return empty string (no CAST) return match col_type.as_str() { - "bigint unsigned" => "UNSIGNED", - "int unsigned" => "UNSIGNED", - "mediumint unsigned" => "UNSIGNED", - "smallint unsigned" => "UNSIGNED", - "tinyint unsigned" => "UNSIGNED", - "bigint" => "SIGNED", - "int" => "SIGNED", - "mediumint" => "SIGNED", - "smallint" => "SIGNED", - "tinyint" => "SIGNED", - _ => "SIGNED", // Default fallback + "bigint unsigned" | "int unsigned" | "mediumint unsigned" + | "smallint unsigned" | "tinyint unsigned" => "UNSIGNED", + _ => "", // No CAST for signed types, varchar, timestamp, date, etc. }; } } From 4c39c7ca9a66d029244f3bdf716a54df9fcf6f42 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 14:06:58 +0800 Subject: [PATCH 08/20] use contain --- .../src/source/cdc/external/mysql.rs | 32 ++++++------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 9926391f4d837..67b095fcca173 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -492,30 +492,16 @@ impl MySqlExternalTableReader { for (col_name, col_type) in &self.upstream_mysql_pk_infos { if col_name == column_name { // Only CAST unsigned integer types, others return empty string (no CAST) - return match col_type.as_str() { - "bigint unsigned" | "int unsigned" | "mediumint unsigned" - | "smallint unsigned" | "tinyint unsigned" => "UNSIGNED", - _ => "", // No CAST for signed types, varchar, timestamp, date, etc. - }; + if col_type.to_lowercase().contains("unsigned") { + return "UNSIGNED"; + } else { + return ""; + } } } "" // Return empty string if not found } - /// Quote a column with CAST to its original MySQL type - fn quote_column_with_cast(&self, column_name: &str) -> String { - let column_type = self.get_column_type(column_name); - if column_type.is_empty() { - Self::quote_column(column_name) - } else { - format!( - "CAST({} AS {})", - Self::quote_column(column_name), - column_type - ) - } - } - /// Quote a parameter with CAST to its original MySQL type fn quote_param_with_cast(&self, column_name: &str) -> String { let column_type = self.get_column_type(column_name); @@ -636,7 +622,7 @@ impl MySqlExternalTableReader { // push the first condition conditions.push(format!( "({} > {})", - self.quote_column_with_cast(&columns[0]), + Self::quote_column(&columns[0]), self.quote_param_with_cast(&columns[0]) )); for i in 2..=columns.len() { @@ -646,13 +632,13 @@ impl MySqlExternalTableReader { if j == 0 { condition.push_str(&format!( "({} = {})", - self.quote_column_with_cast(col), + Self::quote_column(col), self.quote_param_with_cast(col) )); } else { condition.push_str(&format!( " AND ({} = {})", - self.quote_column_with_cast(col), + Self::quote_column(col), self.quote_param_with_cast(col) )); } @@ -660,7 +646,7 @@ impl MySqlExternalTableReader { // '>' condition condition.push_str(&format!( " AND ({} > {})", - self.quote_column_with_cast(&columns[i - 1]), + Self::quote_column(&columns[i - 1]), self.quote_param_with_cast(&columns[i - 1]) )); conditions.push(format!("({})", condition)); From d38c3a22434ac26f7553a6cfd0a17ed9ab5d8d0d Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 14:20:42 +0800 Subject: [PATCH 09/20] directly convert value --- .../src/source/cdc/external/mysql.rs | 78 ++++++++++++------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 67b095fcca173..6dce701919c7e 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -487,31 +487,32 @@ impl MySqlExternalTableReader { Ok(column_infos) } - /// Get the MySQL column type for a given column name, only return CAST type for unsigned integers - fn get_column_type(&self, column_name: &str) -> &str { + /// Check if a column is unsigned type + fn is_unsigned_type(&self, column_name: &str) -> bool { for (col_name, col_type) in &self.upstream_mysql_pk_infos { if col_name == column_name { - // Only CAST unsigned integer types, others return empty string (no CAST) - if col_type.to_lowercase().contains("unsigned") { - return "UNSIGNED"; - } else { - return ""; - } + return col_type.to_lowercase().contains("unsigned"); } } - "" // Return empty string if not found + false } - /// Quote a parameter with CAST to its original MySQL type - fn quote_param_with_cast(&self, column_name: &str) -> String { - let column_type = self.get_column_type(column_name); - if column_type.is_empty() { - format!(":{}", column_name.to_lowercase()) - } else { - format!("CAST(:{} AS {})", column_name.to_lowercase(), column_type) - } + /// Convert negative i64 to unsigned u64 based on column type + fn convert_negative_to_unsigned(&self, negative_val: i64, _column_name: &str) -> u64 { + negative_val as u64 } + /// Convert negative i32 to unsigned u32 based on column type + fn convert_negative_to_unsigned_i32(&self, negative_val: i32, _column_name: &str) -> u32 { + negative_val as u32 + } + + /// Convert negative i16 to unsigned u16 based on column type + fn convert_negative_to_unsigned_i16(&self, negative_val: i16, _column_name: &str) -> u16 { + negative_val as u16 + } + + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -575,9 +576,30 @@ impl MySqlExternalTableReader { let ty = field_map.get(pk.as_str()).unwrap(); let val = match ty { DataType::Boolean => Value::from(value.into_bool()), - DataType::Int16 => Value::from(value.into_int16()), - DataType::Int32 => Value::from(value.into_int32()), - DataType::Int64 => Value::from(value.into_int64()), + DataType::Int16 => { + let int16_val = value.into_int16(); + if int16_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned_i16(int16_val, pk.as_str())) + } else { + Value::from(int16_val) + } + }, + DataType::Int32 => { + let int32_val = value.into_int32(); + if int32_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned_i32(int32_val, pk.as_str())) + } else { + Value::from(int32_val) + } + }, + DataType::Int64 => { + let int64_val = value.into_int64(); + if int64_val < 0 && self.is_unsigned_type(pk.as_str()) { + Value::from(self.convert_negative_to_unsigned(int64_val, pk.as_str())) + } else { + Value::from(int64_val) + } + }, DataType::Float32 => Value::from(value.into_float32().into_inner()), DataType::Float64 => Value::from(value.into_float64().into_inner()), DataType::Varchar => Value::from(String::from(value.into_utf8())), @@ -621,9 +643,9 @@ impl MySqlExternalTableReader { let mut conditions = vec![]; // push the first condition conditions.push(format!( - "({} > {})", + "({} > :{})", Self::quote_column(&columns[0]), - self.quote_param_with_cast(&columns[0]) + columns[0].to_lowercase() )); for i in 2..=columns.len() { // '=' condition @@ -631,23 +653,23 @@ impl MySqlExternalTableReader { for (j, col) in columns.iter().enumerate().take(i - 1) { if j == 0 { condition.push_str(&format!( - "({} = {})", + "({} = :{})", Self::quote_column(col), - self.quote_param_with_cast(col) + col.to_lowercase() )); } else { condition.push_str(&format!( - " AND ({} = {})", + " AND ({} = :{})", Self::quote_column(col), - self.quote_param_with_cast(col) + col.to_lowercase() )); } } // '>' condition condition.push_str(&format!( - " AND ({} > {})", + " AND ({} > :{})", Self::quote_column(&columns[i - 1]), - self.quote_param_with_cast(&columns[i - 1]) + columns[i - 1].to_lowercase() )); conditions.push(format!("({})", condition)); } From d36fb1363ccada4efcea9fbb2c36286db131d8ac Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 14:24:41 +0800 Subject: [PATCH 10/20] minor --- .../src/source/cdc/external/mysql.rs | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 6dce701919c7e..ffd28e741899a 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -512,7 +512,6 @@ impl MySqlExternalTableReader { negative_val as u16 } - #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -579,27 +578,39 @@ impl MySqlExternalTableReader { DataType::Int16 => { let int16_val = value.into_int16(); if int16_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned_i16(int16_val, pk.as_str())) + Value::from( + self.convert_negative_to_unsigned_i16( + int16_val, + pk.as_str(), + ), + ) } else { Value::from(int16_val) } - }, + } DataType::Int32 => { let int32_val = value.into_int32(); if int32_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned_i32(int32_val, pk.as_str())) + Value::from( + self.convert_negative_to_unsigned_i32( + int32_val, + pk.as_str(), + ), + ) } else { Value::from(int32_val) } - }, + } DataType::Int64 => { let int64_val = value.into_int64(); if int64_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned(int64_val, pk.as_str())) + Value::from( + self.convert_negative_to_unsigned(int64_val, pk.as_str()), + ) } else { Value::from(int64_val) } - }, + } DataType::Float32 => Value::from(value.into_float32().into_inner()), DataType::Float64 => Value::from(value.into_float64().into_inner()), DataType::Varchar => Value::from(String::from(value.into_utf8())), @@ -722,12 +733,19 @@ mod tests { println!("columns: {:?}", &table.column_descs); println!("primary keys: {:?}", &table.pk_names); } - - // TODO: Fix test after implementing dynamic cast logic - // #[test] - // fn test_mysql_filter_expr() { - // // Test will be updated once we have proper mock setup - // } + #[test] + fn test_mysql_filter_expr() { + let cols = vec!["id".to_owned()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(`id` > :id)"); + + let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!( + expr, + "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" + ); + } #[test] fn test_mysql_binlog_offset() { From 76231b4a9e5caa382bf7c3ed6828b97b0d13d60a Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 14:30:19 +0800 Subject: [PATCH 11/20] minor --- .../src/source/cdc/external/mysql.rs | 52 ++++++++----------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index ffd28e741899a..2f82f6c96e371 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -498,17 +498,17 @@ impl MySqlExternalTableReader { } /// Convert negative i64 to unsigned u64 based on column type - fn convert_negative_to_unsigned(&self, negative_val: i64, _column_name: &str) -> u64 { + fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 { negative_val as u64 } /// Convert negative i32 to unsigned u32 based on column type - fn convert_negative_to_unsigned_i32(&self, negative_val: i32, _column_name: &str) -> u32 { + fn convert_negative_to_unsigned_i32(&self, negative_val: i32) -> u32 { negative_val as u32 } /// Convert negative i16 to unsigned u16 based on column type - fn convert_negative_to_unsigned_i16(&self, negative_val: i16, _column_name: &str) -> u16 { + fn convert_negative_to_unsigned_i16(&self, negative_val: i16) -> u16 { negative_val as u16 } @@ -578,12 +578,7 @@ impl MySqlExternalTableReader { DataType::Int16 => { let int16_val = value.into_int16(); if int16_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from( - self.convert_negative_to_unsigned_i16( - int16_val, - pk.as_str(), - ), - ) + Value::from(self.convert_negative_to_unsigned_i16(int16_val)) } else { Value::from(int16_val) } @@ -591,12 +586,7 @@ impl MySqlExternalTableReader { DataType::Int32 => { let int32_val = value.into_int32(); if int32_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from( - self.convert_negative_to_unsigned_i32( - int32_val, - pk.as_str(), - ), - ) + Value::from(self.convert_negative_to_unsigned_i32(int32_val)) } else { Value::from(int32_val) } @@ -604,9 +594,7 @@ impl MySqlExternalTableReader { DataType::Int64 => { let int64_val = value.into_int64(); if int64_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from( - self.convert_negative_to_unsigned(int64_val, pk.as_str()), - ) + Value::from(self.convert_negative_to_unsigned(int64_val)) } else { Value::from(int64_val) } @@ -733,19 +721,21 @@ mod tests { println!("columns: {:?}", &table.column_descs); println!("primary keys: {:?}", &table.pk_names); } - #[test] - fn test_mysql_filter_expr() { - let cols = vec!["id".to_owned()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!(expr, "(`id` > :id)"); - - let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; - let expr = MySqlExternalTableReader::filter_expression(&cols); - assert_eq!( - expr, - "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" - ); - } + // #[test] + // fn test_mysql_filter_expr() { + // // This test is commented out because filter_expression now requires &self + // // and we need a proper MySqlExternalTableReader instance to test it + // let cols = vec!["id".to_owned()]; + // let expr = MySqlExternalTableReader::filter_expression(&cols); + // assert_eq!(expr, "(`id` > :id)"); + + // let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; + // let expr = MySqlExternalTableReader::filter_expression(&cols); + // assert_eq!( + // expr, + // "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" + // ); + // } #[test] fn test_mysql_binlog_offset() { From 1fa38af39f00cdd9b3230b84417d1a99b2b93f52 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Thu, 18 Sep 2025 14:33:06 +0800 Subject: [PATCH 12/20] minor --- .../src/source/cdc/external/mysql.rs | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 2f82f6c96e371..c5e66fa35c46e 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -532,7 +532,7 @@ impl MySqlExternalTableReader { order_key, ) } else { - let filter_expr = self.filter_expression(&primary_keys); + let filter_expr = Self::filter_expression(&primary_keys); format!( "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}", self.field_names, @@ -638,7 +638,7 @@ impl MySqlExternalTableReader { // mysql cannot leverage the given key to narrow down the range of scan, // we need to rewrite the comparison conditions by our own. // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y)) - fn filter_expression(&self, columns: &[String]) -> String { + fn filter_expression(columns: &[String]) -> String { let mut conditions = vec![]; // push the first condition conditions.push(format!( @@ -721,21 +721,22 @@ mod tests { println!("columns: {:?}", &table.column_descs); println!("primary keys: {:?}", &table.pk_names); } - // #[test] - // fn test_mysql_filter_expr() { - // // This test is commented out because filter_expression now requires &self - // // and we need a proper MySqlExternalTableReader instance to test it - // let cols = vec!["id".to_owned()]; - // let expr = MySqlExternalTableReader::filter_expression(&cols); - // assert_eq!(expr, "(`id` > :id)"); - - // let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; - // let expr = MySqlExternalTableReader::filter_expression(&cols); - // assert_eq!( - // expr, - // "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" - // ); - // } + + #[test] + fn test_mysql_filter_expr() { + // This test is commented out because filter_expression now requires &self + // and we need a proper MySqlExternalTableReader instance to test it + let cols = vec!["id".to_owned()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(`id` > :id)"); + + let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!( + expr, + "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))" + ); + } #[test] fn test_mysql_binlog_offset() { From b4caaf04e1c243d1254ee2cfaebd5684fd1438f1 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Fri, 19 Sep 2025 13:39:50 +0800 Subject: [PATCH 13/20] fmt --- src/connector/src/source/cdc/external/mysql.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index c5e66fa35c46e..573f665b6dc52 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -465,9 +465,9 @@ impl MySqlExternalTableReader { // Query primary key columns and their data types let sql = format!( "SELECT COLUMN_NAME, COLUMN_TYPE - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = '{}' - AND TABLE_NAME = '{}' + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{}' + AND TABLE_NAME = '{}' AND COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION", database, table @@ -502,7 +502,7 @@ impl MySqlExternalTableReader { negative_val as u64 } - /// Convert negative i32 to unsigned u32 based on column type + /// Convert negative i32 to unsigned u32 based on column type fn convert_negative_to_unsigned_i32(&self, negative_val: i32) -> u32 { negative_val as u32 } From a88c6e765ac7e3bdc46266feb60b3d1ce1ec1849 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Fri, 19 Sep 2025 15:36:01 +0800 Subject: [PATCH 14/20] save worl --- .../source/common/MySqlValidator.java | 3 ++- src/connector/src/parser/mysql.rs | 1 + src/connector/src/parser/unified/json.rs | 21 +++++++++++++++++-- .../src/source/cdc/external/mysql.rs | 1 + src/connector/src/source/cdc/source/reader.rs | 3 +++ 5 files changed, 26 insertions(+), 3 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index f53354f201db3..4e5603fe79041 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -310,7 +310,8 @@ private boolean isDataTypeCompatible( return Data.DataType.TypeName.INT32_VALUE <= val && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": - return val == Data.DataType.TypeName.INT64_VALUE; + return val == Data.DataType.TypeName.INT64_VALUE + || val == Data.DataType.TypeName.DECIMAL_VALUE; case "boolean": case "bool": return val == Data.DataType.TypeName.BOOLEAN_VALUE; diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index dddc2804e35d6..7b9cdb12f9518 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -122,6 +122,7 @@ pub fn mysql_datum_to_rw_datum( handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) } DataType::Decimal => { + println!("这里decimal, column_name: {}, mysql_datum_index: {}", column_name, mysql_datum_index); handle_data_type!( mysql_row, mysql_datum_index, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 4782a1659de65..404915a7a70e7 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -392,8 +392,25 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (DataType::Decimal, ValueType::I64 | ValueType::U64) => { - Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into() + (DataType::Decimal, ValueType::I64) => { + let i64_val = value.try_as_i64().map_err(|_| create_error())?; + println!("debezium decimal i64: {}", i64_val); + Decimal::from(i64_val).into() + } + (DataType::Decimal, ValueType::U64) => { + let u64_val = value.try_as_u64().map_err(|_| create_error())?; + // 如果 u64 值大于 i64::MAX,说明是溢出值,需要转换为对应的正数 + let decimal_val = if u64_val > i64::MAX as u64 { + // 这是溢出值,按位转换为 i64 再转回 u64 得到原始值 + let wrapped_i64 = u64_val as i64; + let original_u64 = wrapped_i64 as u64; + println!("debezium decimal u64 overflow: u64_val={}, wrapped_i64={}, original_u64={}", u64_val, wrapped_i64, original_u64); + Decimal::from(original_u64) + } else { + println!("debezium decimal u64 normal: {}", u64_val); + Decimal::from(u64_val) + }; + decimal_val.into() } (DataType::Decimal, ValueType::F64) => { diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 2851c66072c29..c9fb2c5b83bb4 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -518,6 +518,7 @@ impl MySqlExternalTableReader { DataType::Date => Value::from(value.into_date().0), DataType::Time => Value::from(value.into_time().0), DataType::Timestamp => Value::from(value.into_timestamp().0), + DataType::Decimal => Value::from(value.into_decimal().to_string()), _ => bail!("unsupported primary key data type: {}", ty), }; ConnectorResult::Ok((pk.to_lowercase(), val)) diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 6c11be36ec2ea..b611f5624294a 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -222,6 +222,9 @@ impl CdcSplitReader { while let Some(result) = rx.recv().await { match result { Ok(GetEventStreamResponse { events, .. }) => { + for event in events.clone() { + println!("debezium过来的的event: {:?}", event); + } tracing::trace!("receive {} cdc events ", events.len()); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; From 1ae19dfeda1a801765fe7bf81413bf81e7f293f1 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Fri, 19 Sep 2025 16:49:31 +0800 Subject: [PATCH 15/20] save work, can work --- src/connector/src/parser/mysql.rs | 5 +- src/connector/src/parser/unified/json.rs | 83 ++++++++++++++++++++---- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 749c5dda36bb6..b909428a933c4 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -159,7 +159,10 @@ pub fn mysql_datum_to_rw_datum( handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) } DataType::Decimal => { - println!("这里decimal, column_name: {}, mysql_datum_index: {}", column_name, mysql_datum_index); + println!( + "这里decimal, column_name: {}, mysql_datum_index: {}", + column_name, mysql_datum_index + ); handle_data_type!( mysql_row, mysql_datum_index, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 404915a7a70e7..e0f8cb642e0f2 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -397,6 +397,72 @@ impl JsonParseOptions { println!("debezium decimal i64: {}", i64_val); Decimal::from(i64_val).into() } + (DataType::Decimal, ValueType::String) => { + let str_val = value.as_str().unwrap(); + println!("debezium decimal string: {}", str_val); + + // 首先检查特殊字符串 + match str_val { + "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))), + "POSITIVE_INFINITY" => { + return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal( + Decimal::PositiveInf, + )))); + } + "NEGATIVE_INFINITY" => { + return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal( + Decimal::NegativeInf, + )))); + } + _ => {} + } + + // 检查是否是base64编码的decimal数据(debezium precise模式) + if str_val.len() > 0 + && str_val + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=') + { + // 尝试base64解码 + match base64::engine::general_purpose::STANDARD.decode(str_val) { + Ok(decoded_bytes) => { + println!("base64 decoded bytes: {:?}", decoded_bytes); + // 将字节数组转换为BigInt,然后转换为Decimal + if decoded_bytes.len() <= 8 { + // 对于8字节以内的数据,转换为u64 + let mut bytes_array = [0u8; 8]; + let start_idx = 8 - decoded_bytes.len(); + bytes_array[start_idx..].copy_from_slice(&decoded_bytes); + let u64_val = u64::from_be_bytes(bytes_array); + println!("converted to u64: {}", u64_val); + Decimal::from(u64_val).into() + } else { + // 对于更大的数据,使用BigInt + use num_bigint::BigInt; + let big_int = + BigInt::from_bytes_be(num_bigint::Sign::Plus, &decoded_bytes); + println!("converted to BigInt: {}", big_int); + // 将BigInt转换为Decimal + Decimal::from_str(&big_int.to_string()) + .map_err(|_| create_error())? + .into() + } + } + Err(_) => { + // 如果不是有效的base64,尝试直接解析为decimal + println!("not valid base64, trying direct decimal parse"); + Decimal::from_str(str_val) + .map_err(|_| create_error())? + .into() + } + } + } else { + // 不是base64格式,直接解析为decimal + Decimal::from_str(str_val) + .map_err(|_| create_error())? + .into() + } + } (DataType::Decimal, ValueType::U64) => { let u64_val = value.try_as_u64().map_err(|_| create_error())?; // 如果 u64 值大于 i64::MAX,说明是溢出值,需要转换为对应的正数 @@ -404,7 +470,10 @@ impl JsonParseOptions { // 这是溢出值,按位转换为 i64 再转回 u64 得到原始值 let wrapped_i64 = u64_val as i64; let original_u64 = wrapped_i64 as u64; - println!("debezium decimal u64 overflow: u64_val={}, wrapped_i64={}, original_u64={}", u64_val, wrapped_i64, original_u64); + println!( + "debezium decimal u64 overflow: u64_val={}, wrapped_i64={}, original_u64={}", + u64_val, wrapped_i64, original_u64 + ); Decimal::from(original_u64) } else { println!("debezium decimal u64 normal: {}", u64_val); @@ -418,18 +487,6 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (DataType::Decimal, ValueType::String) => { - let str = value.as_str().unwrap(); - // the following values are special string generated by Debezium and should be handled separately - match str { - "NAN" => ScalarImpl::Decimal(Decimal::NaN), - "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf), - "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf), - _ => { - ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?) - } - } - } (DataType::Decimal, ValueType::Object) => { // ref https://github.com/risingwavelabs/risingwave/issues/10628 // handle debezium json (variable scale): {"scale": int, "value": bytes} From 1b90263da8a5d4c93b2abfb92e0a43fced355594 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Mon, 22 Sep 2025 14:11:16 +0800 Subject: [PATCH 16/20] save work --- src/connector/src/parser/mysql.rs | 4 ---- src/connector/src/parser/unified/json.rs | 3 --- 2 files changed, 7 deletions(-) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index b909428a933c4..4cd1351139e2a 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -159,10 +159,6 @@ pub fn mysql_datum_to_rw_datum( handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) } DataType::Decimal => { - println!( - "这里decimal, column_name: {}, mysql_datum_index: {}", - column_name, mysql_datum_index - ); handle_data_type!( mysql_row, mysql_datum_index, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index e0f8cb642e0f2..36ad13a91a0af 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -394,12 +394,10 @@ impl JsonParseOptions { } (DataType::Decimal, ValueType::I64) => { let i64_val = value.try_as_i64().map_err(|_| create_error())?; - println!("debezium decimal i64: {}", i64_val); Decimal::from(i64_val).into() } (DataType::Decimal, ValueType::String) => { let str_val = value.as_str().unwrap(); - println!("debezium decimal string: {}", str_val); // 首先检查特殊字符串 match str_val { @@ -426,7 +424,6 @@ impl JsonParseOptions { // 尝试base64解码 match base64::engine::general_purpose::STANDARD.decode(str_val) { Ok(decoded_bytes) => { - println!("base64 decoded bytes: {:?}", decoded_bytes); // 将字节数组转换为BigInt,然后转换为Decimal if decoded_bytes.len() <= 8 { // 对于8字节以内的数据,转换为u64 From 0c23b79d33125dc4a4fc57ab39ae461d62556936 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Mon, 22 Sep 2025 22:39:42 +0800 Subject: [PATCH 17/20] save work --- src/connector/src/parser/unified/json.rs | 84 +++++++----------------- 1 file changed, 23 insertions(+), 61 deletions(-) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 36ad13a91a0af..bf04107999ba6 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -392,7 +392,7 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (DataType::Decimal, ValueType::I64) => { + (DataType::Decimal, ValueType::I64 | ValueType::U64) => { let i64_val = value.try_as_i64().map_err(|_| create_error())?; Decimal::from(i64_val).into() } @@ -415,69 +415,31 @@ impl JsonParseOptions { _ => {} } - // 检查是否是base64编码的decimal数据(debezium precise模式) - if str_val.len() > 0 - && str_val - .chars() - .all(|c| c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=') - { - // 尝试base64解码 - match base64::engine::general_purpose::STANDARD.decode(str_val) { - Ok(decoded_bytes) => { - // 将字节数组转换为BigInt,然后转换为Decimal - if decoded_bytes.len() <= 8 { - // 对于8字节以内的数据,转换为u64 - let mut bytes_array = [0u8; 8]; - let start_idx = 8 - decoded_bytes.len(); - bytes_array[start_idx..].copy_from_slice(&decoded_bytes); - let u64_val = u64::from_be_bytes(bytes_array); - println!("converted to u64: {}", u64_val); - Decimal::from(u64_val).into() - } else { - // 对于更大的数据,使用BigInt - use num_bigint::BigInt; - let big_int = - BigInt::from_bytes_be(num_bigint::Sign::Plus, &decoded_bytes); - println!("converted to BigInt: {}", big_int); - // 将BigInt转换为Decimal - Decimal::from_str(&big_int.to_string()) - .map_err(|_| create_error())? - .into() - } - } - Err(_) => { - // 如果不是有效的base64,尝试直接解析为decimal - println!("not valid base64, trying direct decimal parse"); - Decimal::from_str(str_val) - .map_err(|_| create_error())? - .into() - } + match Decimal::from_str(str_val) { + Ok(decimal) => decimal.into(), + Err(_) => { + // When processing CDC data, if the upstream system has unsigned bigint (e.g., MySQL CDC), + // the best practice is to upcast, i.e., convert unsigned bigint to decimal. + // Only when users configure `debezium.bigint.unsigned.handling.mode='precise'`, + // Debezium will convert unsigned bigint to base64-encoded decimal. + // Reference: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode + // + // Therefore, here we check if the string falls into the above case after Decimal::from_str returns an error. + + // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema + // instead of string, as described in https://github.com/risingwavelabs/risingwave/issues/16852. + // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this + // after implementing that functionality. + let value = base64::engine::general_purpose::STANDARD + .decode(str_val) + .map_err(|_| create_error())?; + let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value); + Decimal::from_str(&unscaled.to_string()) + .map_err(|_| create_error())? + .into() } - } else { - // 不是base64格式,直接解析为decimal - Decimal::from_str(str_val) - .map_err(|_| create_error())? - .into() } } - (DataType::Decimal, ValueType::U64) => { - let u64_val = value.try_as_u64().map_err(|_| create_error())?; - // 如果 u64 值大于 i64::MAX,说明是溢出值,需要转换为对应的正数 - let decimal_val = if u64_val > i64::MAX as u64 { - // 这是溢出值,按位转换为 i64 再转回 u64 得到原始值 - let wrapped_i64 = u64_val as i64; - let original_u64 = wrapped_i64 as u64; - println!( - "debezium decimal u64 overflow: u64_val={}, wrapped_i64={}, original_u64={}", - u64_val, wrapped_i64, original_u64 - ); - Decimal::from(original_u64) - } else { - println!("debezium decimal u64 normal: {}", u64_val); - Decimal::from(u64_val) - }; - decimal_val.into() - } (DataType::Decimal, ValueType::F64) => { Decimal::try_from(value.try_as_f64().map_err(|_| create_error())?) From 23e29e196b04f1ab00afb0806b02aa7f72707995 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Mon, 22 Sep 2025 23:11:11 +0800 Subject: [PATCH 18/20] clippy happy --- src/connector/src/parser/config.rs | 4 ++ .../src/parser/debezium/debezium_parser.rs | 4 ++ .../src/parser/debezium/simd_json_parser.rs | 7 ++- .../src/parser/maxwell/simd_json_parser.rs | 1 + src/connector/src/parser/mod.rs | 4 +- src/connector/src/parser/plain_parser.rs | 1 + src/connector/src/parser/unified/json.rs | 61 +++++++++++++------ .../src/source/datagen/source/generator.rs | 1 + .../src/executor/backfill/cdc/cdc_backfill.rs | 17 ++++-- .../executor/backfill/cdc/cdc_backill_v2.rs | 11 +++- 10 files changed, 85 insertions(+), 26 deletions(-) diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 07f48a636bab3..0ee9e3d53e81e 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -19,6 +19,7 @@ use risingwave_common::secret::LocalSecretManager; use risingwave_connector_codec::decoder::avro::MapHandling; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo}; +use super::unified::json::BigintUnsignedHandlingMode; use super::utils::get_kafka_topic; use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling}; use crate::WithOptionsSecResolved; @@ -94,6 +95,7 @@ impl SpecificParserConfig { timestamp_handling: None, timestamptz_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Plain, @@ -272,6 +274,7 @@ impl SpecificParserConfig { &format_encode_options_with_secret, )?, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), (SourceFormat::DebeziumMongo, SourceEncode::Json) => { @@ -357,6 +360,7 @@ pub struct JsonProperties { pub timestamp_handling: Option, pub timestamptz_handling: Option, pub time_handling: Option, + pub bigint_unsigned_handling: Option, pub handle_toast_columns: bool, } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 5c6ae354ad75c..2336108378561 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -77,6 +77,7 @@ async fn build_accessor_builder( .timestamp_handling .unwrap_or(TimestampHandling::GuessNumberUnit), json_config.time_handling.unwrap_or(TimeHandling::Micro), + json_config.bigint_unsigned_handling, json_config.handle_toast_columns, )?, )), @@ -120,6 +121,7 @@ impl DebeziumParser { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), @@ -227,6 +229,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), @@ -302,6 +305,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 7e56fe9b429cc..66894630ccee4 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -22,7 +22,8 @@ use crate::error::ConnectorResult; use crate::parser::unified::AccessImpl; use crate::parser::unified::debezium::MongoJsonAccess; use crate::parser::unified::json::{ - JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling, + BigintUnsignedHandlingMode, JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, + TimestamptzHandling, }; use crate::parser::{AccessBuilder, MongoProperties}; @@ -37,6 +38,7 @@ impl DebeziumJsonAccessBuilder { timestamptz_handling: TimestamptzHandling, timestamp_handling: TimestampHandling, time_handling: TimeHandling, + bigint_unsigned_handling: Option, handle_toast_columns: bool, ) -> ConnectorResult { Ok(Self { @@ -45,6 +47,7 @@ impl DebeziumJsonAccessBuilder { timestamptz_handling, timestamp_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ), }) @@ -98,6 +101,7 @@ impl DebeziumMongoJsonAccessBuilder { TimestamptzHandling::GuessNumberUnit, TimestampHandling::GuessNumberUnit, TimeHandling::Micro, + None, // bigint_unsigned_handling false, ), strong_schema: props.strong_schema, @@ -168,6 +172,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 440cab57ad7b3..e998ac9761ac2 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -40,6 +40,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Maxwell, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b4f2e80d9b674..e8a925804cc43 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -41,7 +41,9 @@ use self::plain_parser::PlainParser; pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row}; pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row}; pub use self::unified::Access; -pub use self::unified::json::{JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling}; +pub use self::unified::json::{ + BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use self::upsert_parser::UpsertParser; use crate::error::ConnectorResult; use crate::parser::maxwell::MaxwellParser; diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 22000800293c0..522d9e7f2fbaf 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -74,6 +74,7 @@ impl PlainParser { TimestamptzHandling::GuessNumberUnit, TimestampHandling::GuessNumberUnit, TimeHandling::Micro, + None, // bigint_unsigned_handling false, )?, )); diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index bf04107999ba6..efeb979b1ec00 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -49,6 +49,14 @@ pub enum TimeHandling { Micro, } +#[derive(Clone, Debug)] +pub enum BigintUnsignedHandlingMode { + /// Convert unsigned bigint to signed bigint (default) + Long, + /// Use base64-encoded decimal for unsigned bigint (Debezium precise mode) + Precise, +} + #[derive(Clone, Debug)] pub enum TimestamptzHandling { /// `"2024-04-11T02:00:00.123456Z"` @@ -148,6 +156,7 @@ pub struct JsonParseOptions { pub boolean_handling: BooleanHandling, pub varchar_handling: VarcharHandling, pub struct_handling: StructHandling, + pub bigint_unsigned_handling: BigintUnsignedHandlingMode, pub ignoring_keycase: bool, pub handle_toast_columns: bool, } @@ -174,6 +183,7 @@ impl JsonParseOptions { }, varchar_handling: VarcharHandling::Strict, struct_handling: StructHandling::Strict, + bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode ignoring_keycase: true, handle_toast_columns: false, }; @@ -189,6 +199,7 @@ impl JsonParseOptions { boolean_handling: BooleanHandling::Strict, varchar_handling: VarcharHandling::OnlyPrimaryTypes, struct_handling: StructHandling::AllowJsonString, + bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode ignoring_keycase: true, handle_toast_columns: false, }; @@ -197,6 +208,7 @@ impl JsonParseOptions { timestamptz_handling: TimestamptzHandling, timestamp_handling: TimestampHandling, time_handling: TimeHandling, + bigint_unsigned_handling: Option, handle_toast_columns: bool, ) -> Self { Self { @@ -214,6 +226,8 @@ impl JsonParseOptions { }, varchar_handling: VarcharHandling::Strict, struct_handling: StructHandling::Strict, + bigint_unsigned_handling: bigint_unsigned_handling + .unwrap_or(BigintUnsignedHandlingMode::Long), ignoring_keycase: true, handle_toast_columns, } @@ -418,25 +432,34 @@ impl JsonParseOptions { match Decimal::from_str(str_val) { Ok(decimal) => decimal.into(), Err(_) => { - // When processing CDC data, if the upstream system has unsigned bigint (e.g., MySQL CDC), - // the best practice is to upcast, i.e., convert unsigned bigint to decimal. - // Only when users configure `debezium.bigint.unsigned.handling.mode='precise'`, - // Debezium will convert unsigned bigint to base64-encoded decimal. - // Reference: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode - // - // Therefore, here we check if the string falls into the above case after Decimal::from_str returns an error. - - // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema - // instead of string, as described in https://github.com/risingwavelabs/risingwave/issues/16852. - // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this - // after implementing that functionality. - let value = base64::engine::general_purpose::STANDARD - .decode(str_val) - .map_err(|_| create_error())?; - let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value); - Decimal::from_str(&unscaled.to_string()) - .map_err(|_| create_error())? - .into() + // Only attempt base64 decoding in Precise mode + match self.bigint_unsigned_handling { + BigintUnsignedHandlingMode::Precise => { + // When processing CDC data, if the upstream system has unsigned bigint (e.g., MySQL CDC), + // the best practice is to upcast, i.e., convert unsigned bigint to decimal. + // Only when users configure `debezium.bigint.unsigned.handling.mode='precise'`, + // Debezium will convert unsigned bigint to base64-encoded decimal. + // Reference: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode + // + // Therefore, here we check if the string falls into the above case after Decimal::from_str returns an error. + + // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema + // instead of string, as described in https://github.com/risingwavelabs/risingwave/issues/16852. + // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this + // after implementing that functionality. + let value = base64::engine::general_purpose::STANDARD + .decode(str_val) + .map_err(|_| create_error())?; + let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value); + Decimal::from_str(&unscaled.to_string()) + .map_err(|_| create_error())? + .into() + } + BigintUnsignedHandlingMode::Long => { + // In Long mode, return the original error directly + return Err(create_error()); + } + } } } } diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index be23167c8253e..40c61b6dbc784 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -279,6 +279,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), }, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 14897083b0a4e..763b7d7932c76 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -24,9 +24,9 @@ use risingwave_common::array::DataChunk; use risingwave_common::bail; use risingwave_common::catalog::ColumnDesc; use risingwave_connector::parser::{ - ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, - ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, TimeHandling, - TimestampHandling, TimestamptzHandling, + BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps, + EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, + SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling, }; use risingwave_connector::source::cdc::CdcScanOptions; use risingwave_connector::source::cdc::external::{ @@ -226,6 +226,12 @@ impl CdcBackfillExecutor { .map(|v| v == "connect") .unwrap_or(false) .then_some(TimeHandling::Milli); + let bigint_unsigned_handling: Option = self + .properties + .get("debezium.bigint.unsigned.handling.mode") + .map(|v| v == "precise") + .unwrap_or(false) + .then_some(BigintUnsignedHandlingMode::Precise); // Only postgres-cdc connector may trigger TOAST. let handle_toast_columns: bool = self.external_table.table_type() == &ExternalCdcTableType::Postgres; @@ -236,6 +242,7 @@ impl CdcBackfillExecutor { timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ) .boxed(); @@ -829,6 +836,7 @@ pub async fn transform_upstream( timestamp_handling: Option, timestamptz_handling: Option, time_handling: Option, + bigint_unsigned_handling: Option, handle_toast_columns: bool, ) { let props = SpecificParserConfig { @@ -837,6 +845,7 @@ pub async fn transform_upstream( timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, }), // the cdc message is generated internally so the key must exist. @@ -1003,7 +1012,7 @@ mod tests { ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz), ]; - let parsed_stream = transform_upstream(upstream, columns, None, None, None, false); + let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false); pin_mut!(parsed_stream); // the output chunk must contain the offset column if let Some(message) = parsed_stream.next().await { diff --git a/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs b/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs index 37ada3afcfdd8..418859e19ad50 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs @@ -23,7 +23,9 @@ use risingwave_common::catalog::{ColumnDesc, Field}; use risingwave_common::row::RowDeserializer; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{OrderType, cmp_datum}; -use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling}; +use risingwave_connector::parser::{ + BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use risingwave_connector::source::cdc::CdcScanOptions; use risingwave_connector::source::cdc::external::{ CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl, @@ -153,6 +155,12 @@ impl ParallelizedCdcBackfillExecutor { .map(|v| v == "connect") .unwrap_or(false) .then_some(TimeHandling::Milli); + let bigint_unsigned_handling: Option = self + .properties + .get("debezium.bigint.unsigned.handling.mode") + .map(|v| v == "precise") + .unwrap_or(false) + .then_some(BigintUnsignedHandlingMode::Precise); // Only postgres-cdc connector may trigger TOAST. let handle_toast_columns: bool = self.external_table.table_type() == &ExternalCdcTableType::Postgres; @@ -162,6 +170,7 @@ impl ParallelizedCdcBackfillExecutor { timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ) .boxed(); From 96edc04683988b2e68ff1789b1544ef9358f5b81 Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Mon, 22 Sep 2025 23:20:12 +0800 Subject: [PATCH 19/20] clippy happy --- src/connector/src/parser/mysql.rs | 3 +- src/connector/src/parser/unified/json.rs | 3 +- src/connector/src/source/cdc/external/mod.rs | 2 +- .../src/source/cdc/external/mysql.rs | 102 +----------------- src/connector/src/source/cdc/source/reader.rs | 3 - .../backfill/cdc/upstream_table/snapshot.rs | 4 +- 6 files changed, 10 insertions(+), 107 deletions(-) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 4cd1351139e2a..de0e4f30ddfbf 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -150,7 +150,8 @@ pub fn mysql_datum_to_rw_datum( handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32) } DataType::Int64 => { - handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64) + // for bigint unsigned, should up cast to decimal. + handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) } DataType::Float32 => { handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index efeb979b1ec00..e846d025353dc 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -412,8 +412,7 @@ impl JsonParseOptions { } (DataType::Decimal, ValueType::String) => { let str_val = value.as_str().unwrap(); - - // 首先检查特殊字符串 + // the following values are special string generated by Debezium and should be handled separately match str_val { "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))), "POSITIVE_INFINITY" => { diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 0a61b1669da05..6ebdc1b8f11b3 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -95,7 +95,7 @@ impl ExternalCdcTableType { ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( - MySqlExternalTableReader::new(config, schema).await?, + MySqlExternalTableReader::new(config, schema)?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name) diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index fd6407fa39812..aa63c6ba4c0a6 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -344,7 +344,6 @@ pub struct MySqlExternalTableReader { rw_schema: Schema, field_names: String, pool: mysql_async::Pool, - upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type) } impl ExternalTableReader for MySqlExternalTableReader { @@ -399,10 +398,7 @@ impl ExternalTableReader for MySqlExternalTableReader { } impl MySqlExternalTableReader { - pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { - let database = config.database.clone(); - let table = config.table.clone(); - + pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { let mut opts_builder = mysql_async::OptsBuilder::default() .user(Some(config.username)) .pass(Some(config.password)) @@ -429,15 +425,10 @@ impl MySqlExternalTableReader { .map(|f| Self::quote_column(f.name.as_str())) .join(","); - // Query MySQL primary key infos for type casting. - let upstream_mysql_pk_infos = - Self::query_upstream_pk_infos(&pool, &database, &table).await?; - Ok(Self { rw_schema, field_names, pool, - upstream_mysql_pk_infos, }) } @@ -454,64 +445,6 @@ impl MySqlExternalTableReader { }) } - /// Query upstream primary key data types, used for generating filter conditions with proper type casting. - async fn query_upstream_pk_infos( - pool: &mysql_async::Pool, - database: &str, - table: &str, - ) -> ConnectorResult> { - let mut conn = pool.get_conn().await?; - - // Query primary key columns and their data types - let sql = format!( - "SELECT COLUMN_NAME, COLUMN_TYPE - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = '{}' - AND TABLE_NAME = '{}' - AND COLUMN_KEY = 'PRI' - ORDER BY ORDINAL_POSITION", - database, table - ); - - let rs = conn.query::(sql).await?; - - let mut column_infos = Vec::new(); - for row in &rs { - let column_name: String = row.get(0).unwrap(); - let column_type: String = row.get(1).unwrap(); - column_infos.push((column_name, column_type)); - } - - drop(conn); - - Ok(column_infos) - } - - /// Check if a column is unsigned type - fn is_unsigned_type(&self, column_name: &str) -> bool { - for (col_name, col_type) in &self.upstream_mysql_pk_infos { - if col_name == column_name { - return col_type.to_lowercase().contains("unsigned"); - } - } - false - } - - /// Convert negative i64 to unsigned u64 based on column type - fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 { - negative_val as u64 - } - - /// Convert negative i32 to unsigned u32 based on column type - fn convert_negative_to_unsigned_i32(&self, negative_val: i32) -> u32 { - negative_val as u32 - } - - /// Convert negative i16 to unsigned u16 based on column type - fn convert_negative_to_unsigned_i16(&self, negative_val: i16) -> u16 { - negative_val as u16 - } - #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] async fn snapshot_read_inner( &self, @@ -575,30 +508,9 @@ impl MySqlExternalTableReader { let ty = field_map.get(pk.as_str()).unwrap(); let val = match ty { DataType::Boolean => Value::from(value.into_bool()), - DataType::Int16 => { - let int16_val = value.into_int16(); - if int16_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned_i16(int16_val)) - } else { - Value::from(int16_val) - } - } - DataType::Int32 => { - let int32_val = value.into_int32(); - if int32_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned_i32(int32_val)) - } else { - Value::from(int32_val) - } - } - DataType::Int64 => { - let int64_val = value.into_int64(); - if int64_val < 0 && self.is_unsigned_type(pk.as_str()) { - Value::from(self.convert_negative_to_unsigned(int64_val)) - } else { - Value::from(int64_val) - } - } + DataType::Int16 => Value::from(value.into_int16()), + DataType::Int32 => Value::from(value.into_int32()), + DataType::Int64 => Value::from(value.into_int64()), DataType::Float32 => Value::from(value.into_float32().into_inner()), DataType::Float64 => Value::from(value.into_float64().into_inner()), DataType::Varchar => Value::from(String::from(value.into_utf8())), @@ -725,8 +637,6 @@ mod tests { #[test] fn test_mysql_filter_expr() { - // This test is commented out because filter_expression now requires &self - // and we need a proper MySqlExternalTableReader instance to test it let cols = vec!["id".to_owned()]; let expr = MySqlExternalTableReader::filter_expression(&cols); assert_eq!(expr, "(`id` > :id)"); @@ -783,9 +693,7 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema) - .await - .unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap(); let offset = reader.current_cdc_offset().await.unwrap(); println!("BinlogOffset: {:?}", offset); diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index b611f5624294a..6c11be36ec2ea 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -222,9 +222,6 @@ impl CdcSplitReader { while let Some(result) = rx.recv().await { match result { Ok(GetEventStreamResponse { events, .. }) => { - for event in events.clone() { - println!("debezium过来的的event: {:?}", event); - } tracing::trace!("receive {} cdc events ", events.len()); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 66f597d8a2234..b539d269cf018 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -400,9 +400,7 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = MySqlExternalTableReader::new(config, rw_schema.clone()) - .await - .unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema.clone()).unwrap(); let mut cnt: usize = 0; let mut start_pk = Some(OwnedRow::new(vec![Some(ScalarImpl::Int64(0))])); From 4be0d70745a640f1ae14ca55443a9c93552e3f5d Mon Sep 17 00:00:00 2001 From: wcy-fdu Date: Tue, 23 Sep 2025 09:23:42 +0800 Subject: [PATCH 20/20] add test for unsigned int --- .../cdc/mysql/test_unsigned_int.slt | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt diff --git a/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt b/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt new file mode 100644 index 0000000000000..a2b2e426ea985 --- /dev/null +++ b/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt @@ -0,0 +1,168 @@ +# Test MySQL CDC with UNSIGNED BIGINT handling +# This test verifies that UNSIGNED BIGINT values are handled correctly in CDC backfill +# with debezium.bigint.unsigned.handling.mode='precise' + +control substitution on + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; + +system ok +mysql -e " + CREATE DATABASE IF NOT EXISTS risedev; + USE risedev; + DROP TABLE IF EXISTS test_unsigned_int; + CREATE TABLE test_unsigned_int ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + unsigned_bigint BIGINT UNSIGNED NOT NULL DEFAULT '0', + unsigned_int INT UNSIGNED NOT NULL DEFAULT '0', + unsigned_smallint SMALLINT UNSIGNED NOT NULL DEFAULT '0', + unsigned_tinyint TINYINT UNSIGNED NOT NULL DEFAULT '0', + info VARCHAR(50) NOT NULL DEFAULT '', + PRIMARY KEY (id, unsigned_bigint) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +" + +# Insert 50 backfill rows into MySQL (20 non-overflow + 30 overflow) +system ok +mysql -e " + USE risedev; + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (1, 1, 1, 1, 'backfill'), + (12345, 12345, 12345, 123, 'backfill'), + (123456, 123456, 12345, 123, 'backfill'), + (1234567, 1234567, 12345, 123, 'backfill'), + (12345678, 12345678, 12345, 123, 'backfill'), + (123456789, 123456789, 12345, 123, 'backfill'), + (1234567890, 1234567890, 12345, 123, 'backfill'), + (12345678901, 1234567890, 12345, 123, 'backfill'), + (123456789012, 1234567890, 12345, 123, 'backfill'), + (1234567890123, 1234567890, 12345, 123, 'backfill'), + (12345678901234, 1234567890, 12345, 123, 'backfill'), + (123456789012345, 1234567890, 12345, 123, 'backfill'), + (1234567890123456, 1234567890, 12345, 123, 'backfill'), + (12345678901234567, 1234567890, 12345, 123, 'backfill'), + (123456789012345678, 1234567890, 12345, 123, 'backfill'), + (1234567890123456789, 1234567890, 12345, 123, 'backfill'), + (12345678901234567890, 1234567890, 12345, 123, 'backfill'), + (9223372036854775800, 2147483647, 65535, 255, 'backfill'), + (9223372036854775805, 2147483647, 65535, 255, 'backfill'), + (9223372036854775807, 2147483647, 65535, 255, 'backfill'); + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (9223372036854775808, 4294967295, 65535, 255, 'backfill'), + (9223372036854775809, 4294967295, 65535, 255, 'backfill'), + (9223372036854775810, 4294967295, 65535, 255, 'backfill'), + (9223372036854775811, 4294967295, 65535, 255, 'backfill'), + (9223372036854775812, 4294967295, 65535, 255, 'backfill'), + (9223372036854775813, 4294967295, 65535, 255, 'backfill'), + (9223372036854775814, 4294967295, 65535, 255, 'backfill'), + (9223372036854775815, 4294967295, 65535, 255, 'backfill'), + (9223372036854775816, 4294967295, 65535, 255, 'backfill'), + (9223372036854775817, 4294967295, 65535, 255, 'backfill'), + (9223372036854775818, 4294967295, 65535, 255, 'backfill'), + (9223372036854775819, 4294967295, 65535, 255, 'backfill'), + (9223372036854775820, 4294967295, 65535, 255, 'backfill'), + (9223372036854775821, 4294967295, 65535, 255, 'backfill'), + (9223372036854775822, 4294967295, 65535, 255, 'backfill'), + (9223372036854775823, 4294967295, 65535, 255, 'backfill'), + (9223372036854775824, 4294967295, 65535, 255, 'backfill'), + (9223372036854775825, 4294967295, 65535, 255, 'backfill'), + (9223372036854775826, 4294967295, 65535, 255, 'backfill'), + (9223372036854775827, 4294967295, 65535, 255, 'backfill'), + (9223372036854775828, 4294967295, 65535, 255, 'backfill'), + (9223372036854775829, 4294967295, 65535, 255, 'backfill'), + (9223372036854775830, 4294967295, 65535, 255, 'backfill'), + (9223372036854775831, 4294967295, 65535, 255, 'backfill'), + (9223372036854775832, 4294967295, 65535, 255, 'backfill'), + (9223372036854775833, 4294967295, 65535, 255, 'backfill'), + (9223372036854775834, 4294967295, 65535, 255, 'backfill'), + (9223372036854775835, 4294967295, 65535, 255, 'backfill'), + (9223372036854775836, 4294967295, 65535, 255, 'backfill'), + (18446251075179777772, 4294967295, 65535, 255, 'backfill'); +" + +sleep 2s + +statement ok +create source s_unsigned_int with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = '${RISEDEV_MYSQL_USER}', + password = '${MYSQL_PWD}', + database.name = 'risedev', + debezium.bigint.unsigned.handling.mode = 'precise' +); + +sleep 2s + +statement ok +create table rw_unsigned_int ( + id DECIMAL, + unsigned_bigint DECIMAL, + unsigned_int BIGINT, + unsigned_smallint INT, + unsigned_tinyint SMALLINT, + info VARCHAR, + primary key (id, unsigned_bigint) +) with ( + snapshot = 'true', + snapshot.batch_size = 10, +) from s_unsigned_int table 'risedev.test_unsigned_int'; + +sleep 5s + +# Insert 3 incremental data (debezium) +system ok +mysql -e " + USE risedev; + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (18446251075179777770, 4294967295, 65535, 255, 'debezium'), + (18446251075179777771, 4294967295, 65535, 255, 'debezium'), + (18446251075179777772, 4294967295, 65535, 255, 'debezium'); +" + +sleep 3s + +# Verify total count in RisingWave (50 backfill + 3 debezium = 53) +query I +select count(*) as row_cnt from rw_unsigned_int; +---- +53 + +# Verify we have both backfill and debezium data +query T +select count(*) as backfill_count from rw_unsigned_int where info = 'backfill'; +---- +50 + +query T +select count(*) as debezium_count from rw_unsigned_int where info = 'debezium'; +---- +3 + +# Verify specific rows match inserted data +# Check id=50 (last backfill row with overflow data) +query TTTTTT +select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 50; +---- +50 18446251075179777772 4294967295 65535 255 backfill + +# Check id=53 (last debezium row with overflow data) +query TTTTTT +select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 53; +---- +53 18446251075179777772 4294967295 65535 255 debezium + + + +# Clean up +statement ok +drop table rw_unsigned_int; + +statement ok +drop source s_unsigned_int cascade; + +system ok +mysql -e " + USE risedev; + DROP TABLE IF EXISTS test_unsigned_int; +"