Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,9 @@ public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

// disable auto topic creation
baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we add this to NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS and also set it centrally in getCommonConsumerConfigs() ?

For this case, we can also remove the corresponding line in getMainConsumerConfigs()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Done the changes.


return baseConsumerProps;
}

Expand Down Expand Up @@ -1877,6 +1880,10 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

// disable auto topic creation
baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

return baseConsumerProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,32 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
}

@Test
public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() {
// User tries to override the setting
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

final StreamsConfig streamsConfig = new StreamsConfig(props);

// Main consumer - verify override is ignored
final Map<String, Object> mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0);
assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Main consumer should not allow auto topic creation even with override");

// Restore consumer - verify override is ignored
final Map<String, Object> restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client");
assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Restore consumer should not allow auto topic creation even with override");

// Global consumer - verify override is ignored
final Map<String, Object> globalConfigs = streamsConfig.getGlobalConsumerConfigs("client");
assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Global consumer should not allow auto topic creation even with override");
}


@Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);
Expand Down