Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions e2e_test/source_inline/cdc/mysql/test_unsigned_int.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Test MySQL CDC with UNSIGNED BIGINT handling
# This test verifies that UNSIGNED BIGINT values are handled correctly in CDC backfill
# with debezium.bigint.unsigned.handling.mode='precise'

control substitution on

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1;

system ok
mysql -e "
CREATE DATABASE IF NOT EXISTS risedev;
USE risedev;
DROP TABLE IF EXISTS test_unsigned_int;
CREATE TABLE test_unsigned_int (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
unsigned_bigint BIGINT UNSIGNED NOT NULL DEFAULT '0',
unsigned_int INT UNSIGNED NOT NULL DEFAULT '0',
unsigned_smallint SMALLINT UNSIGNED NOT NULL DEFAULT '0',
unsigned_tinyint TINYINT UNSIGNED NOT NULL DEFAULT '0',
info VARCHAR(50) NOT NULL DEFAULT '',
PRIMARY KEY (id, unsigned_bigint)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
"

# Insert 50 backfill rows into MySQL (20 non-overflow + 30 overflow)
system ok
mysql -e "
USE risedev;
INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES
(1, 1, 1, 1, 'backfill'),
(12345, 12345, 12345, 123, 'backfill'),
(123456, 123456, 12345, 123, 'backfill'),
(1234567, 1234567, 12345, 123, 'backfill'),
(12345678, 12345678, 12345, 123, 'backfill'),
(123456789, 123456789, 12345, 123, 'backfill'),
(1234567890, 1234567890, 12345, 123, 'backfill'),
(12345678901, 1234567890, 12345, 123, 'backfill'),
(123456789012, 1234567890, 12345, 123, 'backfill'),
(1234567890123, 1234567890, 12345, 123, 'backfill'),
(12345678901234, 1234567890, 12345, 123, 'backfill'),
(123456789012345, 1234567890, 12345, 123, 'backfill'),
(1234567890123456, 1234567890, 12345, 123, 'backfill'),
(12345678901234567, 1234567890, 12345, 123, 'backfill'),
(123456789012345678, 1234567890, 12345, 123, 'backfill'),
(1234567890123456789, 1234567890, 12345, 123, 'backfill'),
(12345678901234567890, 1234567890, 12345, 123, 'backfill'),
(9223372036854775800, 2147483647, 65535, 255, 'backfill'),
(9223372036854775805, 2147483647, 65535, 255, 'backfill'),
(9223372036854775807, 2147483647, 65535, 255, 'backfill');
INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES
(9223372036854775808, 4294967295, 65535, 255, 'backfill'),
(9223372036854775809, 4294967295, 65535, 255, 'backfill'),
(9223372036854775810, 4294967295, 65535, 255, 'backfill'),
(9223372036854775811, 4294967295, 65535, 255, 'backfill'),
(9223372036854775812, 4294967295, 65535, 255, 'backfill'),
(9223372036854775813, 4294967295, 65535, 255, 'backfill'),
(9223372036854775814, 4294967295, 65535, 255, 'backfill'),
(9223372036854775815, 4294967295, 65535, 255, 'backfill'),
(9223372036854775816, 4294967295, 65535, 255, 'backfill'),
(9223372036854775817, 4294967295, 65535, 255, 'backfill'),
(9223372036854775818, 4294967295, 65535, 255, 'backfill'),
(9223372036854775819, 4294967295, 65535, 255, 'backfill'),
(9223372036854775820, 4294967295, 65535, 255, 'backfill'),
(9223372036854775821, 4294967295, 65535, 255, 'backfill'),
(9223372036854775822, 4294967295, 65535, 255, 'backfill'),
(9223372036854775823, 4294967295, 65535, 255, 'backfill'),
(9223372036854775824, 4294967295, 65535, 255, 'backfill'),
(9223372036854775825, 4294967295, 65535, 255, 'backfill'),
(9223372036854775826, 4294967295, 65535, 255, 'backfill'),
(9223372036854775827, 4294967295, 65535, 255, 'backfill'),
(9223372036854775828, 4294967295, 65535, 255, 'backfill'),
(9223372036854775829, 4294967295, 65535, 255, 'backfill'),
(9223372036854775830, 4294967295, 65535, 255, 'backfill'),
(9223372036854775831, 4294967295, 65535, 255, 'backfill'),
(9223372036854775832, 4294967295, 65535, 255, 'backfill'),
(9223372036854775833, 4294967295, 65535, 255, 'backfill'),
(9223372036854775834, 4294967295, 65535, 255, 'backfill'),
(9223372036854775835, 4294967295, 65535, 255, 'backfill'),
(9223372036854775836, 4294967295, 65535, 255, 'backfill'),
(18446251075179777772, 4294967295, 65535, 255, 'backfill');
"

sleep 2s

statement ok
create source s_unsigned_int with (
${RISEDEV_MYSQL_WITH_OPTIONS_COMMON},
username = '${RISEDEV_MYSQL_USER}',
password = '${MYSQL_PWD}',
database.name = 'risedev',
debezium.bigint.unsigned.handling.mode = 'precise'
);

sleep 2s

statement ok
create table rw_unsigned_int (
id DECIMAL,
unsigned_bigint DECIMAL,
unsigned_int BIGINT,
unsigned_smallint INT,
unsigned_tinyint SMALLINT,
info VARCHAR,
primary key (id, unsigned_bigint)
) with (
snapshot = 'true',
snapshot.batch_size = 10,
) from s_unsigned_int table 'risedev.test_unsigned_int';

sleep 5s

# Insert 3 incremental data (debezium)
system ok
mysql -e "
USE risedev;
INSERT INTO test_unsigned_int (unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info) VALUES
(18446251075179777770, 4294967295, 65535, 255, 'debezium'),
(18446251075179777771, 4294967295, 65535, 255, 'debezium'),
(18446251075179777772, 4294967295, 65535, 255, 'debezium');
"

sleep 3s

# Verify total count in RisingWave (50 backfill + 3 debezium = 53)
query I
select count(*) as row_cnt from rw_unsigned_int;
----
53

# Verify we have both backfill and debezium data
query T
select count(*) as backfill_count from rw_unsigned_int where info = 'backfill';
----
50

query T
select count(*) as debezium_count from rw_unsigned_int where info = 'debezium';
----
3

# Verify specific rows match inserted data
# Check id=50 (last backfill row with overflow data)
query TTTTTT
select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 50;
----
50 18446251075179777772 4294967295 65535 255 backfill

# Check id=53 (last debezium row with overflow data)
query TTTTTT
select id, unsigned_bigint, unsigned_int, unsigned_smallint, unsigned_tinyint, info from rw_unsigned_int where id = 53;
----
53 18446251075179777772 4294967295 65535 255 debezium



# Clean up
statement ok
drop table rw_unsigned_int;

statement ok
drop source s_unsigned_int cascade;

system ok
mysql -e "
USE risedev;
DROP TABLE IF EXISTS test_unsigned_int;
"
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ private boolean isDataTypeCompatible(
return Data.DataType.TypeName.INT32_VALUE <= val
&& val <= Data.DataType.TypeName.INT64_VALUE;
case "bigint":
return val == Data.DataType.TypeName.INT64_VALUE;
return val == Data.DataType.TypeName.INT64_VALUE
|| val == Data.DataType.TypeName.DECIMAL_VALUE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to distinguish signed bigint from unsigned bigint in a later PR to achieve more accurate schema check.

case "boolean":
case "bool":
return val == Data.DataType.TypeName.BOOLEAN_VALUE;
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/parser/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::secret::LocalSecretManager;
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};

use super::unified::json::BigintUnsignedHandlingMode;
use super::utils::get_kafka_topic;
use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
use crate::WithOptionsSecResolved;
Expand Down Expand Up @@ -94,6 +95,7 @@ impl SpecificParserConfig {
timestamp_handling: None,
timestamptz_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Plain,
Expand Down Expand Up @@ -272,6 +274,7 @@ impl SpecificParserConfig {
&format_encode_options_with_secret,
)?,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
(SourceFormat::DebeziumMongo, SourceEncode::Json) => {
Expand Down Expand Up @@ -357,6 +360,7 @@ pub struct JsonProperties {
pub timestamp_handling: Option<TimestampHandling>,
pub timestamptz_handling: Option<TimestamptzHandling>,
pub time_handling: Option<TimeHandling>,
pub bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
pub handle_toast_columns: bool,
}

Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
use crate::parser::unified::json::{
BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
Expand Down Expand Up @@ -77,6 +79,9 @@ async fn build_accessor_builder(
.timestamp_handling
.unwrap_or(TimestampHandling::GuessNumberUnit),
json_config.time_handling.unwrap_or(TimeHandling::Micro),
json_config
.bigint_unsigned_handling
.unwrap_or(BigintUnsignedHandlingMode::Long),
json_config.handle_toast_columns,
)?,
)),
Expand Down Expand Up @@ -120,6 +125,7 @@ impl DebeziumParser {
timestamptz_handling: None,
timestamp_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
Expand Down Expand Up @@ -227,6 +233,7 @@ mod tests {
timestamptz_handling: None,
timestamp_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
Expand Down Expand Up @@ -302,6 +309,7 @@ mod tests {
timestamptz_handling: None,
timestamp_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::error::ConnectorResult;
use crate::parser::unified::AccessImpl;
use crate::parser::unified::debezium::MongoJsonAccess;
use crate::parser::unified::json::{
JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling,
BigintUnsignedHandlingMode, JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling,
TimestamptzHandling,
};
use crate::parser::{AccessBuilder, MongoProperties};

Expand All @@ -37,6 +38,7 @@ impl DebeziumJsonAccessBuilder {
timestamptz_handling: TimestamptzHandling,
timestamp_handling: TimestampHandling,
time_handling: TimeHandling,
bigint_unsigned_handling: BigintUnsignedHandlingMode,
handle_toast_columns: bool,
) -> ConnectorResult<Self> {
Ok(Self {
Expand All @@ -45,6 +47,7 @@ impl DebeziumJsonAccessBuilder {
timestamptz_handling,
timestamp_handling,
time_handling,
bigint_unsigned_handling,
handle_toast_columns,
),
})
Expand Down Expand Up @@ -98,6 +101,7 @@ impl DebeziumMongoJsonAccessBuilder {
TimestamptzHandling::GuessNumberUnit,
TimestampHandling::GuessNumberUnit,
TimeHandling::Micro,
BigintUnsignedHandlingMode::Long,
false,
),
strong_schema: props.strong_schema,
Expand Down Expand Up @@ -168,6 +172,7 @@ mod tests {
timestamptz_handling: None,
timestamp_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod tests {
timestamptz_handling: None,
timestamp_handling: None,
time_handling: None,
bigint_unsigned_handling: None,
handle_toast_columns: false,
}),
protocol_config: ProtocolProperties::Maxwell,
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use self::plain_parser::PlainParser;
pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
pub use self::unified::Access;
pub use self::unified::json::{JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling};
pub use self::unified::json::{
BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
};
use self::upsert_parser::UpsertParser;
use crate::error::ConnectorResult;
use crate::parser::maxwell::MaxwellParser;
Expand Down
42 changes: 40 additions & 2 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::LazyLock;

use mysql_async::Row as MysqlRow;
use mysql_common::constants::ColumnFlags;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::OwnedRow;
Expand Down Expand Up @@ -62,6 +63,42 @@ macro_rules! handle_data_type {
}};
}

macro_rules! handle_data_type_with_signed {
(
$mysql_row:expr,
$mysql_datum_index:expr,
$column_name:expr,
$signed_type:ty,
$unsigned_type:ty
) => {{
let column_flags = $mysql_row.columns()[$mysql_datum_index].flags();

if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) {
// UNSIGNED type: use unsigned type conversion, then convert to signed
match $mysql_row.take_opt::<Option<$unsigned_type>, _>($mysql_datum_index) {
Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))),
Some(Ok(None)) => Ok(None),
Some(Err(e)) => Err(anyhow::Error::new(e.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: {}",
$column_name,
$mysql_datum_index,
stringify!($unsigned_type),
))),
None => bail!(
"no value found at column: {}, index: {}",
$column_name,
$mysql_datum_index
),
}
} else {
// SIGNED type: use default signed type conversion
handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type)
}
}};
}

/// The decoding result can be interpreted as follows:
/// Ok(value) => The value was found and successfully decoded.
/// Err(error) => The value was found but could not be decoded,
Expand Down Expand Up @@ -107,12 +144,13 @@ pub fn mysql_datum_to_rw_datum(
}
}
DataType::Int16 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i16, u16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does upcast work here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the relationship between DataType::Int16, i16, and u16?

}
DataType::Int32 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i32, u32)
}
DataType::Int64 => {
// for bigint unsigned, should up cast to decimal.
handle_data_type!(mysql_row, mysql_datum_index, column_name, i64)
}
DataType::Float32 => {
Expand Down
Loading
Loading