diff --git a/e2e_test/source_inline/cdc/mysql/mysql/mysql_timestamptz_as_pk.slt b/e2e_test/source_inline/cdc/mysql/mysql_timestamptz_as_pk.slt similarity index 100% rename from e2e_test/source_inline/cdc/mysql/mysql/mysql_timestamptz_as_pk.slt rename to e2e_test/source_inline/cdc/mysql/mysql_timestamptz_as_pk.slt diff --git a/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt b/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt new file mode 100644 index 0000000000000..a2b2e426ea985 --- /dev/null +++ b/e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt @@ -0,0 +1,168 @@ +# Test MySQL CDC with UNSIGNED BIGINT handling +# This test verifies that UNSIGNED BIGINT values are handled correctly in CDC backfill +# with debezium.bigint.unsigned.handling.mode='precise' + +control substitution on + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; + +system ok +mysql -e " + CREATE DATABASE IF NOT EXISTS risedev; + USE risedev; + DROP TABLE IF EXISTS test_unsigned_int; + CREATE TABLE test_unsigned_int ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + unsigned_bigint BIGINT UNSIGNED NOT NULL DEFAULT '0', + unsigned_int INT UNSIGNED NOT NULL DEFAULT '0', + unsigned_smallint SMALLINT UNSIGNED NOT NULL DEFAULT '0', + unsigned_tinyint TINYINT UNSIGNED NOT NULL DEFAULT '0', + info VARCHAR(50) NOT NULL DEFAULT '', + PRIMARY KEY (id, unsigned_bigint) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +" + +# Insert 50 backfill rows into MySQL (20 non-overflow + 30 overflow) +system ok +mysql -e " + USE risedev; + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (1, 1, 1, 1, 'backfill'), + (12345, 12345, 12345, 123, 'backfill'), + (123456, 123456, 12345, 123, 'backfill'), + (1234567, 1234567, 12345, 123, 'backfill'), + (12345678, 12345678, 12345, 123, 'backfill'), + (123456789, 123456789, 12345, 123, 'backfill'), + (1234567890, 1234567890, 12345, 123, 'backfill'), + (12345678901, 1234567890, 12345, 123, 'backfill'), + (123456789012, 1234567890, 12345, 123, 'backfill'), + (1234567890123, 1234567890, 12345, 123, 'backfill'), + (12345678901234, 1234567890, 12345, 123, 'backfill'), + (123456789012345, 1234567890, 12345, 123, 'backfill'), + (1234567890123456, 1234567890, 12345, 123, 'backfill'), + (12345678901234567, 1234567890, 12345, 123, 'backfill'), + (123456789012345678, 1234567890, 12345, 123, 'backfill'), + (1234567890123456789, 1234567890, 12345, 123, 'backfill'), + (12345678901234567890, 1234567890, 12345, 123, 'backfill'), + (9223372036854775800, 2147483647, 65535, 255, 'backfill'), + (9223372036854775805, 2147483647, 65535, 255, 'backfill'), + (9223372036854775807, 2147483647, 65535, 255, 'backfill'); + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (9223372036854775808, 4294967295, 65535, 255, 'backfill'), + (9223372036854775809, 4294967295, 65535, 255, 'backfill'), + (9223372036854775810, 4294967295, 65535, 255, 'backfill'), + (9223372036854775811, 4294967295, 65535, 255, 'backfill'), + (9223372036854775812, 4294967295, 65535, 255, 'backfill'), + (9223372036854775813, 4294967295, 65535, 255, 'backfill'), + (9223372036854775814, 4294967295, 65535, 255, 'backfill'), + (9223372036854775815, 4294967295, 65535, 255, 'backfill'), + (9223372036854775816, 4294967295, 65535, 255, 'backfill'), + (9223372036854775817, 4294967295, 65535, 255, 'backfill'), + (9223372036854775818, 4294967295, 65535, 255, 'backfill'), + (9223372036854775819, 4294967295, 65535, 255, 'backfill'), + (9223372036854775820, 4294967295, 65535, 255, 'backfill'), + (9223372036854775821, 4294967295, 65535, 255, 'backfill'), + (9223372036854775822, 4294967295, 65535, 255, 'backfill'), + (9223372036854775823, 4294967295, 65535, 255, 'backfill'), + (9223372036854775824, 4294967295, 65535, 255, 'backfill'), + (9223372036854775825, 4294967295, 65535, 255, 'backfill'), + (9223372036854775826, 4294967295, 65535, 255, 'backfill'), + (9223372036854775827, 4294967295, 65535, 255, 'backfill'), + (9223372036854775828, 4294967295, 65535, 255, 'backfill'), + (9223372036854775829, 4294967295, 65535, 255, 'backfill'), + (9223372036854775830, 4294967295, 65535, 255, 'backfill'), + (9223372036854775831, 4294967295, 65535, 255, 'backfill'), + (9223372036854775832, 4294967295, 65535, 255, 'backfill'), + (9223372036854775833, 4294967295, 65535, 255, 'backfill'), + (9223372036854775834, 4294967295, 65535, 255, 'backfill'), + (9223372036854775835, 4294967295, 65535, 255, 'backfill'), + (9223372036854775836, 4294967295, 65535, 255, 'backfill'), + (18446251075179777772, 4294967295, 65535, 255, 'backfill'); +" + +sleep 2s + +statement ok +create source s_unsigned_int with ( + ${RISEDEV_MYSQL_WITH_OPTIONS_COMMON}, + username = '${RISEDEV_MYSQL_USER}', + password = '${MYSQL_PWD}', + database.name = 'risedev', + debezium.bigint.unsigned.handling.mode = 'precise' +); + +sleep 2s + +statement ok +create table rw_unsigned_int ( + id DECIMAL, + unsigned_bigint DECIMAL, + unsigned_int BIGINT, + unsigned_smallint INT, + unsigned_tinyint SMALLINT, + info VARCHAR, + primary key (id, unsigned_bigint) +) with ( + snapshot = 'true', + snapshot.batch_size = 10, +) from s_unsigned_int table 'risedev.test_unsigned_int'; + +sleep 5s + +# Insert 3 incremental data (debezium) +system ok +mysql -e " + USE risedev; + INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES + (18446251075179777770, 4294967295, 65535, 255, 'debezium'), + (18446251075179777771, 4294967295, 65535, 255, 'debezium'), + (18446251075179777772, 4294967295, 65535, 255, 'debezium'); +" + +sleep 3s + +# Verify total count in RisingWave (50 backfill + 3 debezium = 53) +query I +select count(*) as row_cnt from rw_unsigned_int; +---- +53 + +# Verify we have both backfill and debezium data +query T +select count(*) as backfill_count from rw_unsigned_int where info = 'backfill'; +---- +50 + +query T +select count(*) as debezium_count from rw_unsigned_int where info = 'debezium'; +---- +3 + +# Verify specific rows match inserted data +# Check id=50 (last backfill row with overflow data) +query TTTTTT +select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 50; +---- +50 18446251075179777772 4294967295 65535 255 backfill + +# Check id=53 (last debezium row with overflow data) +query TTTTTT +select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 53; +---- +53 18446251075179777772 4294967295 65535 255 debezium + + + +# Clean up +statement ok +drop table rw_unsigned_int; + +statement ok +drop source s_unsigned_int cascade; + +system ok +mysql -e " + USE risedev; + DROP TABLE IF EXISTS test_unsigned_int; +" diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index f53354f201db3..4e5603fe79041 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -310,7 +310,8 @@ private boolean isDataTypeCompatible( return Data.DataType.TypeName.INT32_VALUE <= val && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": - return val == Data.DataType.TypeName.INT64_VALUE; + return val == Data.DataType.TypeName.INT64_VALUE + || val == Data.DataType.TypeName.DECIMAL_VALUE; case "boolean": case "bool": return val == Data.DataType.TypeName.BOOLEAN_VALUE; diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 07f48a636bab3..0ee9e3d53e81e 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -19,6 +19,7 @@ use risingwave_common::secret::LocalSecretManager; use risingwave_connector_codec::decoder::avro::MapHandling; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo}; +use super::unified::json::BigintUnsignedHandlingMode; use super::utils::get_kafka_topic; use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling}; use crate::WithOptionsSecResolved; @@ -94,6 +95,7 @@ impl SpecificParserConfig { timestamp_handling: None, timestamptz_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Plain, @@ -272,6 +274,7 @@ impl SpecificParserConfig { &format_encode_options_with_secret, )?, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), (SourceFormat::DebeziumMongo, SourceEncode::Json) => { @@ -357,6 +360,7 @@ pub struct JsonProperties { pub timestamp_handling: Option, pub timestamptz_handling: Option, pub time_handling: Option, + pub bigint_unsigned_handling: Option, pub handle_toast_columns: bool, } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 5c6ae354ad75c..3823ea4f4aa19 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -20,7 +20,9 @@ use super::simd_json_parser::DebeziumJsonAccessBuilder; use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; use crate::error::ConnectorResult; use crate::parser::unified::debezium::DebeziumChangeEvent; -use crate::parser::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling}; +use crate::parser::unified::json::{ + BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult, @@ -77,6 +79,9 @@ async fn build_accessor_builder( .timestamp_handling .unwrap_or(TimestampHandling::GuessNumberUnit), json_config.time_handling.unwrap_or(TimeHandling::Micro), + json_config + .bigint_unsigned_handling + .unwrap_or(BigintUnsignedHandlingMode::Long), json_config.handle_toast_columns, )?, )), @@ -120,6 +125,7 @@ impl DebeziumParser { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), @@ -227,6 +233,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), @@ -302,6 +309,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 7e56fe9b429cc..1682f89c6ecae 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -22,7 +22,8 @@ use crate::error::ConnectorResult; use crate::parser::unified::AccessImpl; use crate::parser::unified::debezium::MongoJsonAccess; use crate::parser::unified::json::{ - JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling, + BigintUnsignedHandlingMode, JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, + TimestamptzHandling, }; use crate::parser::{AccessBuilder, MongoProperties}; @@ -37,6 +38,7 @@ impl DebeziumJsonAccessBuilder { timestamptz_handling: TimestamptzHandling, timestamp_handling: TimestampHandling, time_handling: TimeHandling, + bigint_unsigned_handling: BigintUnsignedHandlingMode, handle_toast_columns: bool, ) -> ConnectorResult { Ok(Self { @@ -45,6 +47,7 @@ impl DebeziumJsonAccessBuilder { timestamptz_handling, timestamp_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ), }) @@ -98,6 +101,7 @@ impl DebeziumMongoJsonAccessBuilder { TimestamptzHandling::GuessNumberUnit, TimestampHandling::GuessNumberUnit, TimeHandling::Micro, + BigintUnsignedHandlingMode::Long, false, ), strong_schema: props.strong_schema, @@ -168,6 +172,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 440cab57ad7b3..e998ac9761ac2 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -40,6 +40,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), protocol_config: ProtocolProperties::Maxwell, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b4f2e80d9b674..e8a925804cc43 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -41,7 +41,9 @@ use self::plain_parser::PlainParser; pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row}; pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row}; pub use self::unified::Access; -pub use self::unified::json::{JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling}; +pub use self::unified::json::{ + BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use self::upsert_parser::UpsertParser; use crate::error::ConnectorResult; use crate::parser::maxwell::MaxwellParser; diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index dddc2804e35d6..de0e4f30ddfbf 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -15,6 +15,7 @@ use std::sync::LazyLock; use mysql_async::Row as MysqlRow; +use mysql_common::constants::ColumnFlags; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::row::OwnedRow; @@ -62,6 +63,42 @@ macro_rules! handle_data_type { }}; } +macro_rules! handle_data_type_with_signed { + ( + $mysql_row:expr, + $mysql_datum_index:expr, + $column_name:expr, + $signed_type:ty, + $unsigned_type:ty + ) => {{ + let column_flags = $mysql_row.columns()[$mysql_datum_index].flags(); + + if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) { + // UNSIGNED type: use unsigned type conversion, then convert to signed + match $mysql_row.take_opt::, _>($mysql_datum_index) { + Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))), + Some(Ok(None)) => Ok(None), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $column_name, + $mysql_datum_index, + stringify!($unsigned_type), + ))), + None => bail!( + "no value found at column: {}, index: {}", + $column_name, + $mysql_datum_index + ), + } + } else { + // SIGNED type: use default signed type conversion + handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type) + } + }}; +} + /// The decoding result can be interpreted as follows: /// Ok(value) => The value was found and successfully decoded. /// Err(error) => The value was found but could not be decoded, @@ -107,12 +144,13 @@ pub fn mysql_datum_to_rw_datum( } } DataType::Int16 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i16, u16) } DataType::Int32 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) + handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32) } DataType::Int64 => { + // for bigint unsigned, should up cast to decimal. handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) } DataType::Float32 => { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 22000800293c0..a1acc538154c9 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -15,7 +15,9 @@ use risingwave_common::bail; use thiserror_ext::AsReport; -use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling}; +use super::unified::json::{ + BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use super::unified::kv_event::KvEvent; use super::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter, @@ -74,6 +76,7 @@ impl PlainParser { TimestamptzHandling::GuessNumberUnit, TimestampHandling::GuessNumberUnit, TimeHandling::Micro, + BigintUnsignedHandlingMode::Long, false, )?, )); diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index aefbe75314302..b0fe6d61f1c58 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -49,6 +49,14 @@ pub enum TimeHandling { Micro, } +#[derive(Clone, Debug)] +pub enum BigintUnsignedHandlingMode { + /// Convert unsigned bigint to signed bigint (default) + Long, + /// Use base64-encoded decimal for unsigned bigint (Debezium precise mode) + Precise, +} + #[derive(Clone, Debug)] pub enum TimestamptzHandling { /// `"2024-04-11T02:00:00.123456Z"` @@ -148,6 +156,7 @@ pub struct JsonParseOptions { pub boolean_handling: BooleanHandling, pub varchar_handling: VarcharHandling, pub struct_handling: StructHandling, + pub bigint_unsigned_handling: BigintUnsignedHandlingMode, pub ignoring_keycase: bool, pub handle_toast_columns: bool, } @@ -174,6 +183,7 @@ impl JsonParseOptions { }, varchar_handling: VarcharHandling::Strict, struct_handling: StructHandling::Strict, + bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode ignoring_keycase: true, handle_toast_columns: false, }; @@ -189,6 +199,7 @@ impl JsonParseOptions { boolean_handling: BooleanHandling::Strict, varchar_handling: VarcharHandling::OnlyPrimaryTypes, struct_handling: StructHandling::AllowJsonString, + bigint_unsigned_handling: BigintUnsignedHandlingMode::Long, // default to long mode ignoring_keycase: true, handle_toast_columns: false, }; @@ -197,6 +208,7 @@ impl JsonParseOptions { timestamptz_handling: TimestamptzHandling, timestamp_handling: TimestampHandling, time_handling: TimeHandling, + bigint_unsigned_handling: BigintUnsignedHandlingMode, handle_toast_columns: bool, ) -> Self { Self { @@ -214,6 +226,7 @@ impl JsonParseOptions { }, varchar_handling: VarcharHandling::Strict, struct_handling: StructHandling::Strict, + bigint_unsigned_handling, ignoring_keycase: true, handle_toast_columns, } @@ -393,7 +406,60 @@ impl JsonParseOptions { .into() } (DataType::Decimal, ValueType::I64 | ValueType::U64) => { - Decimal::from(value.try_as_i64().map_err(|_| create_error())?).into() + let i64_val = value.try_as_i64().map_err(|_| create_error())?; + Decimal::from(i64_val).into() + } + (DataType::Decimal, ValueType::String) => { + let str_val = value.as_str().unwrap(); + // the following values are special string generated by Debezium and should be handled separately + match str_val { + "NAN" => return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal(Decimal::NaN)))), + "POSITIVE_INFINITY" => { + return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal( + Decimal::PositiveInf, + )))); + } + "NEGATIVE_INFINITY" => { + return Ok(DatumCow::Owned(Some(ScalarImpl::Decimal( + Decimal::NegativeInf, + )))); + } + _ => {} + } + + match Decimal::from_str(str_val) { + Ok(decimal) => decimal.into(), + Err(_) => { + // Only attempt base64 decoding in Precise mode + match self.bigint_unsigned_handling { + BigintUnsignedHandlingMode::Precise => { + // When processing CDC data, if the upstream system has unsigned bigint (e.g., MySQL CDC), + // the best practice is to upcast, i.e., convert unsigned bigint to decimal. + // Only when users configure `debezium.bigint.unsigned.handling.mode='precise'`, + // Debezium will convert unsigned bigint to base64-encoded decimal. + // Reference: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-bigint-unsigned-handling-mode + // + // Therefore, here we check if the string falls into the above case after Decimal::from_str returns an error. + + // A better approach would be to get bytes + org.apache.kafka.connect.data.Decimal from schema + // instead of string, as described in https://github.com/risingwavelabs/risingwave/issues/16852. + // However, Rust doesn't have a library to parse Kafka Connect metadata, so we'll refactor this + // after implementing that functionality. + let value = base64::engine::general_purpose::STANDARD + .decode(str_val) + .map_err(|_| create_error())?; + let unscaled = num_bigint::BigInt::from_signed_bytes_be(&value); + Decimal::from_str(&unscaled.to_string()) + .map_err(|_| create_error())? + .into() + } + BigintUnsignedHandlingMode::Long => { + // In Long mode, return the original error directly + return Err(create_error()); + } + } + } + } } (DataType::Decimal, ValueType::F64) => { @@ -401,18 +467,6 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into() } - (DataType::Decimal, ValueType::String) => { - let str = value.as_str().unwrap(); - // the following values are special string generated by Debezium and should be handled separately - match str { - "NAN" => ScalarImpl::Decimal(Decimal::NaN), - "POSITIVE_INFINITY" => ScalarImpl::Decimal(Decimal::PositiveInf), - "NEGATIVE_INFINITY" => ScalarImpl::Decimal(Decimal::NegativeInf), - _ => { - ScalarImpl::Decimal(Decimal::from_str(str).map_err(|_err| create_error())?) - } - } - } (DataType::Decimal, ValueType::Object) => { // ref https://github.com/risingwavelabs/risingwave/issues/10628 // handle debezium json (variable scale): {"scale": int, "value": bytes} diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 0521dd7c3e3ec..b34592ce6f8a4 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -474,7 +474,6 @@ impl MySqlExternalTableReader { order_key, ) }; - let mut conn = self.pool.get_conn().await?; // Set session timezone to UTC conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; @@ -518,6 +517,7 @@ impl MySqlExternalTableReader { DataType::Date => Value::from(value.into_date().0), DataType::Time => Value::from(value.into_time().0), DataType::Timestamp => Value::from(value.into_timestamp().0), + DataType::Decimal => Value::from(value.into_decimal().to_string()), DataType::Timestamptz => { // Convert timestamptz to NaiveDateTime for MySQL TIMESTAMP comparison // MySQL expects NaiveDateTime for TIMESTAMP parameters diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index be23167c8253e..40c61b6dbc784 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -279,6 +279,7 @@ mod tests { timestamptz_handling: None, timestamp_handling: None, time_handling: None, + bigint_unsigned_handling: None, handle_toast_columns: false, }), }, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 14897083b0a4e..763b7d7932c76 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -24,9 +24,9 @@ use risingwave_common::array::DataChunk; use risingwave_common::bail; use risingwave_common::catalog::ColumnDesc; use risingwave_connector::parser::{ - ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, - ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, TimeHandling, - TimestampHandling, TimestamptzHandling, + BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps, + EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, + SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling, }; use risingwave_connector::source::cdc::CdcScanOptions; use risingwave_connector::source::cdc::external::{ @@ -226,6 +226,12 @@ impl CdcBackfillExecutor { .map(|v| v == "connect") .unwrap_or(false) .then_some(TimeHandling::Milli); + let bigint_unsigned_handling: Option = self + .properties + .get("debezium.bigint.unsigned.handling.mode") + .map(|v| v == "precise") + .unwrap_or(false) + .then_some(BigintUnsignedHandlingMode::Precise); // Only postgres-cdc connector may trigger TOAST. let handle_toast_columns: bool = self.external_table.table_type() == &ExternalCdcTableType::Postgres; @@ -236,6 +242,7 @@ impl CdcBackfillExecutor { timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ) .boxed(); @@ -829,6 +836,7 @@ pub async fn transform_upstream( timestamp_handling: Option, timestamptz_handling: Option, time_handling: Option, + bigint_unsigned_handling: Option, handle_toast_columns: bool, ) { let props = SpecificParserConfig { @@ -837,6 +845,7 @@ pub async fn transform_upstream( timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, }), // the cdc message is generated internally so the key must exist. @@ -1003,7 +1012,7 @@ mod tests { ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz), ]; - let parsed_stream = transform_upstream(upstream, columns, None, None, None, false); + let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false); pin_mut!(parsed_stream); // the output chunk must contain the offset column if let Some(message) = parsed_stream.next().await { diff --git a/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs b/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs index 37ada3afcfdd8..418859e19ad50 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs @@ -23,7 +23,9 @@ use risingwave_common::catalog::{ColumnDesc, Field}; use risingwave_common::row::RowDeserializer; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{OrderType, cmp_datum}; -use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling}; +use risingwave_connector::parser::{ + BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling, +}; use risingwave_connector::source::cdc::CdcScanOptions; use risingwave_connector::source::cdc::external::{ CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl, @@ -153,6 +155,12 @@ impl ParallelizedCdcBackfillExecutor { .map(|v| v == "connect") .unwrap_or(false) .then_some(TimeHandling::Milli); + let bigint_unsigned_handling: Option = self + .properties + .get("debezium.bigint.unsigned.handling.mode") + .map(|v| v == "precise") + .unwrap_or(false) + .then_some(BigintUnsignedHandlingMode::Precise); // Only postgres-cdc connector may trigger TOAST. let handle_toast_columns: bool = self.external_table.table_type() == &ExternalCdcTableType::Postgres; @@ -162,6 +170,7 @@ impl ParallelizedCdcBackfillExecutor { timestamp_handling, timestamptz_handling, time_handling, + bigint_unsigned_handling, handle_toast_columns, ) .boxed();