Skip to content

Commit f3ca9db

Browse files
committed
Use AsyncEmbeddedEngine replacing deprecated EmbeddedEngine
based on https://debezium.io/releases/3.2/release-notes The mentioned class EmbeddedEngine will be removed https://issues.redhat.com/browse/DBZ-8029 in 3.2 (after being deprecated for several versions) needs to use AsyncEmbeddedEngine instead https://github.com/debezium/debezium/pull/5645/files Doing the update here will allow an easier transition between Debezium 3.1 and 3.2 and alignment with Quarkus Signed-off-by: Aurélien Pupier <[email protected]>
1 parent 74916b1 commit f3ca9db

File tree

8 files changed

+41
-40
lines changed

8 files changed

+41
-40
lines changed

components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfiguration.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import io.debezium.config.Configuration;
2323
import io.debezium.config.Field;
24-
import io.debezium.embedded.EmbeddedEngine;
24+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2525
import io.debezium.engine.spi.OffsetCommitPolicy;
2626
import org.apache.camel.RuntimeCamelException;
2727
import org.apache.camel.component.debezium.DebeziumConstants;
@@ -163,19 +163,19 @@ public Configuration createDebeziumConfiguration() {
163163
private Configuration createDebeziumEmbeddedEngineConfiguration() {
164164
final Configuration.Builder configBuilder = Configuration.create();
165165

166-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.ENGINE_NAME, name);
167-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName());
168-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE, offsetStorage);
169-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME,
166+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.ENGINE_NAME, name);
167+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName());
168+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_STORAGE, offsetStorage);
169+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME,
170170
offsetStorageFileName);
171-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC, offsetStorageTopic);
172-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS,
171+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC, offsetStorageTopic);
172+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS,
173173
offsetStoragePartitions);
174-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR,
174+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR,
175175
offsetStorageReplicationFactor);
176-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_POLICY, offsetCommitPolicy);
177-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, offsetFlushIntervalMs);
178-
addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_TIMEOUT_MS, offsetCommitTimeoutMs);
176+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_COMMIT_POLICY, offsetCommitPolicy);
177+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, offsetFlushIntervalMs);
178+
addPropertyIfNotNull(configBuilder, AsyncEmbeddedEngine.OFFSET_COMMIT_TIMEOUT_MS, offsetCommitTimeoutMs);
179179

180180
if (internalKeyConverter != null && internalValueConverter != null) {
181181
configBuilder.with("internal.key.converter", internalKeyConverter);

components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfigurationTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.Collections;
2020

2121
import io.debezium.config.Configuration;
22-
import io.debezium.embedded.EmbeddedEngine;
22+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2323
import org.apache.camel.component.debezium.DebeziumConstants;
2424
import org.junit.jupiter.api.Test;
2525

@@ -38,13 +38,14 @@ void testIfCreatesConfig() {
3838

3939
final Configuration dbzEmbeddedConfiguration = configuration.createDebeziumConfiguration();
4040

41-
assertEquals("test_config", dbzEmbeddedConfiguration.getString(EmbeddedEngine.ENGINE_NAME), "Expect the same name");
41+
assertEquals("test_config", dbzEmbeddedConfiguration.getString(AsyncEmbeddedEngine.ENGINE_NAME),
42+
"Expect the same name");
4243
assertEquals(2, dbzEmbeddedConfiguration
43-
.getInteger(EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR));
44+
.getInteger(AsyncEmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR));
4445
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
45-
dbzEmbeddedConfiguration.getString(EmbeddedEngine.OFFSET_STORAGE));
46+
dbzEmbeddedConfiguration.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
4647
assertEquals("test_field", configuration.getTestField());
47-
assertEquals(Class.class.getName(), dbzEmbeddedConfiguration.getString(EmbeddedEngine.CONNECTOR_CLASS));
48+
assertEquals(Class.class.getName(), dbzEmbeddedConfiguration.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4849
}
4950

5051
@Test

components/camel-debezium/camel-debezium-db2/src/test/java/org/apache/camel/component/debezium/db2/configuration/Db2ConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.debezium.config.Configuration;
2222
import io.debezium.connector.db2.Db2Connector;
2323
import io.debezium.connector.db2.Db2ConnectorConfig;
24-
import io.debezium.embedded.EmbeddedEngine;
24+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2525
import org.apache.camel.component.debezium.DebeziumConstants;
2626
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2727
import org.junit.jupiter.api.Test;
@@ -42,12 +42,12 @@ void testIfCreatesConfig() {
4242

4343
final Configuration dbzConfigurations = configuration.createDebeziumConfiguration();
4444

45-
assertEquals("test_config", dbzConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
45+
assertEquals("test_config", dbzConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4646
assertEquals("test_user", dbzConfigurations.getString(Db2ConnectorConfig.USER));
4747
assertEquals(1212, dbzConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
48-
assertEquals(Db2Connector.class.getName(), dbzConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
48+
assertEquals(Db2Connector.class.getName(), dbzConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4949
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
50-
dbzConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
50+
dbzConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
5151
}
5252

5353
@Test

components/camel-debezium/camel-debezium-mongodb/src/test/java/org/apache/camel/component/debezium/mongodb/configuration/MongodbConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.debezium.config.Configuration;
2121
import io.debezium.connector.mongodb.MongoDbConnector;
2222
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
23-
import io.debezium.embedded.EmbeddedEngine;
23+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2424
import org.apache.camel.component.debezium.DebeziumConstants;
2525
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2626
import org.junit.jupiter.api.Test;
@@ -40,12 +40,12 @@ void testIfCreatesConfig() {
4040

4141
final Configuration dbzConfigurations = configuration.createDebeziumConfiguration();
4242

43-
assertEquals("test_config", dbzConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
43+
assertEquals("test_config", dbzConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4444
assertEquals("test_user", dbzConfigurations.getString(MongoDbConnectorConfig.USER));
4545
assertEquals(1212, dbzConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
46-
assertEquals(MongoDbConnector.class.getName(), dbzConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
46+
assertEquals(MongoDbConnector.class.getName(), dbzConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4747
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
48-
dbzConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
48+
dbzConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
4949
}
5050

5151
@Test

components/camel-debezium/camel-debezium-mysql/src/test/java/org/apache/camel/component/debezium/mysql/configuration/MySqlConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.debezium.config.Configuration;
2121
import io.debezium.connector.mysql.MySqlConnector;
2222
import io.debezium.connector.mysql.MySqlConnectorConfig;
23-
import io.debezium.embedded.EmbeddedEngine;
23+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2424
import org.apache.camel.component.debezium.DebeziumConstants;
2525
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2626
import org.junit.jupiter.api.Test;
@@ -41,16 +41,16 @@ void testIfCreatesConfig() {
4141

4242
final Configuration dbzMysqlConfigurations = configuration.createDebeziumConfiguration();
4343

44-
assertEquals("test_config", dbzMysqlConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
44+
assertEquals("test_config", dbzMysqlConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4545
assertEquals("test_user", dbzMysqlConfigurations.getString(MySqlConnectorConfig.USER));
4646
assertTrue(dbzMysqlConfigurations.getBoolean(MySqlConnectorConfig.INCLUDE_SQL_QUERY));
4747
assertEquals(1212, dbzMysqlConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
4848
assertEquals(30000,
4949
dbzMysqlConfigurations.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
50-
assertEquals(MySqlConnector.class.getName(), dbzMysqlConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
50+
assertEquals(MySqlConnector.class.getName(), dbzMysqlConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
5151
assertTrue(dbzMysqlConfigurations.getBoolean(MySqlConnectorConfig.INCLUDE_SQL_QUERY));
5252
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
53-
dbzMysqlConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
53+
dbzMysqlConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
5454
assertEquals(30000,
5555
dbzMysqlConfigurations.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
5656
}

components/camel-debezium/camel-debezium-oracle/src/test/java/org/apache/camel/component/debezium/oracle/configuration/OracleConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.debezium.config.Configuration;
2222
import io.debezium.connector.oracle.OracleConnector;
2323
import io.debezium.connector.oracle.OracleConnectorConfig;
24-
import io.debezium.embedded.EmbeddedEngine;
24+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2525
import org.apache.camel.component.debezium.DebeziumConstants;
2626
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2727
import org.junit.jupiter.api.Test;
@@ -42,12 +42,12 @@ void testIfCreatesConfig() {
4242

4343
final Configuration dbzConfigurations = configuration.createDebeziumConfiguration();
4444

45-
assertEquals("test_config", dbzConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
45+
assertEquals("test_config", dbzConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4646
assertEquals("test_user", dbzConfigurations.getString(OracleConnectorConfig.USER));
4747
assertEquals(1212, dbzConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
48-
assertEquals(OracleConnector.class.getName(), dbzConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
48+
assertEquals(OracleConnector.class.getName(), dbzConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4949
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
50-
dbzConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
50+
dbzConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
5151
}
5252

5353
@Test

components/camel-debezium/camel-debezium-postgres/src/test/java/org/apache/camel/component/debezium/postgres/configuration/PostgresConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.debezium.config.Configuration;
2121
import io.debezium.connector.postgresql.PostgresConnector;
2222
import io.debezium.connector.postgresql.PostgresConnectorConfig;
23-
import io.debezium.embedded.EmbeddedEngine;
23+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2424
import org.apache.camel.component.debezium.DebeziumConstants;
2525
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2626
import org.junit.jupiter.api.Test;
@@ -41,12 +41,12 @@ void testIfCreatesConfig() {
4141

4242
final Configuration dbzConfigurations = configuration.createDebeziumConfiguration();
4343

44-
assertEquals("test_config", dbzConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
44+
assertEquals("test_config", dbzConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4545
assertEquals("test_user", dbzConfigurations.getString(PostgresConnectorConfig.USER));
4646
assertEquals(1212, dbzConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
47-
assertEquals(PostgresConnector.class.getName(), dbzConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
47+
assertEquals(PostgresConnector.class.getName(), dbzConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4848
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
49-
dbzConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
49+
dbzConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
5050
}
5151

5252
@Test

components/camel-debezium/camel-debezium-sqlserver/src/test/java/org/apache/camel/component/debezium/sqlserver/configuration/SqlserverConnectorEmbeddedDebeziumConfigurationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.debezium.config.CommonConnectorConfig;
2020
import io.debezium.config.Configuration;
2121
import io.debezium.connector.sqlserver.SqlServerConnector;
22-
import io.debezium.embedded.EmbeddedEngine;
22+
import io.debezium.embedded.async.AsyncEmbeddedEngine;
2323
import org.apache.camel.component.debezium.DebeziumConstants;
2424
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
2525
import org.junit.jupiter.api.Test;
@@ -40,12 +40,12 @@ void testIfCreatesConfig() {
4040

4141
final Configuration dbzConfigurations = configuration.createDebeziumConfiguration();
4242

43-
assertEquals("test_config", dbzConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
43+
assertEquals("test_config", dbzConfigurations.getString(AsyncEmbeddedEngine.ENGINE_NAME));
4444
assertEquals("test_user", dbzConfigurations.getString("database.user"));
4545
assertEquals(1212, dbzConfigurations.getInteger(CommonConnectorConfig.MAX_QUEUE_SIZE));
46-
assertEquals(SqlServerConnector.class.getName(), dbzConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
46+
assertEquals(SqlServerConnector.class.getName(), dbzConfigurations.getString(AsyncEmbeddedEngine.CONNECTOR_CLASS));
4747
assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
48-
dbzConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
48+
dbzConfigurations.getString(AsyncEmbeddedEngine.OFFSET_STORAGE));
4949
}
5050

5151
@Test

0 commit comments

Comments
 (0)