Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.kafka.util import get_log4j_config_param, get_log4j_config_for_tools
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
from kafkatest.version import get_version, DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0


class ConsumerState:
Expand Down Expand Up @@ -424,7 +424,12 @@ def start_cmd(self, node):
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)

cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
version = get_version(node)
if version.supports_command_config():
cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
else:
cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE

cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
return cmd

Expand Down
5 changes: 4 additions & 1 deletion tests/kafkatest/services/verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ def start_cmd(self, node, idx):
if self.repeating_keys is not None:
cmd += " --repeating-keys %s " % str(self.repeating_keys)

cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
if version.supports_command_config():
cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
else:
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE

cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
Expand Down
11 changes: 11 additions & 0 deletions tests/kafkatest/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ def supports_fk_joins(self):
def supports_feature_command(self):
return self >= V_3_8_0

def supports_command_config(self):
# According to KIP-1147, --producer.config and --consumer.config have been deprecated and will be removed in future versions
# For backward compatibility, we select the configuration based on node version:
# - For versions 4.2.0 and above, use --command-config
# - For older versions, continue using --producer.config or --consumer.config
return self >= V_4_2_0

def get_version(node=None):
"""Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None)
Expand Down Expand Up @@ -223,3 +230,7 @@ def get_version(node=None):
# 4.1.x version
V_4_1_0 = KafkaVersion("4.1.0")
LATEST_4_1 = V_4_1_0

# 4.2.x version
V_4_2_0 = KafkaVersion("4.2.0")
LATEST_4_2 = V_4_2_0
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
}
if (commandConfigFile != null) {
try {
consumerProps.putAll(Utils.loadProps(res.getString(commandConfigFile)));
consumerProps.putAll(Utils.loadProps(commandConfigFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
Expand Down