diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 4830fb960b94e..1a1952b9aa483 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -821,7 +821,7 @@ public class StreamsConfig extends AbstractConfig { private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = - new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG}; + new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG}; private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG}; private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = @@ -1255,7 +1255,8 @@ public class StreamsConfig extends AbstractConfig { ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", - ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic" + ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic", + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false" ); private static final Map CONSUMER_EOS_OVERRIDES; @@ -1759,6 +1760,7 @@ public Map getMainConsumerConfigs(final String groupId, final St // Get main consumer override configs final Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); consumerProps.putAll(mainConsumerProps); // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting @@ -1789,9 +1791,6 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG)); - // disable auto topic creation - consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map topicProps = originalsWithPrefix(TOPIC_PREFIX, false); final Map producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); @@ -1834,6 +1833,7 @@ public Map getRestoreConsumerConfigs(final String clientId) { // Get restore consumer override configs final Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(restoreConsumerProps); // no need to set group id for a restore consumer @@ -1867,6 +1867,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { // Get global consumer override configs final Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(globalConsumerProps); // no need to set group id for a global consumer @@ -1877,6 +1878,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { // add client id with stream client id prefix baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer"); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + return baseConsumerProps; } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 65147a81101fa..8c88ef4c925dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -442,6 +442,51 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() { assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG)); } + @Test + public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() { + // User tries to override the setting + props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + + // Main consumer - verify override is ignored + final Map mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0); + assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Main consumer should not allow auto topic creation even with override"); + + // Restore consumer - verify override is ignored + final Map restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Restore consumer should not allow auto topic creation even with override"); + + // Global consumer - verify override is ignored + final Map globalConfigs = streamsConfig.getGlobalConsumerConfigs("client"); + assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Global consumer should not allow auto topic creation even with override"); + } + + @Test + public void shouldLogErrorWhenUserTriesToOverrideAutoCreateTopics() { + props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.ERROR); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + // Trigger the warning by getting consumer configs + streamsConfig.getMainConsumerConfigs("group", "client", 0); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found") + && msg.contains("User setting (true) will be ignored and the Streams default setting (false) will be used")), + "Should log error when user tries to override allow.auto.create.topics"); + } + } + + @Test public void shouldSupportNonPrefixedAdminConfigs() { props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);