Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
931dfaf
KAFKA-19487: Improving consistency of command-line arguments for cons…
aheev Aug 20, 2025
17460c7
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Aug 25, 2025
34651a9
Add deprecation cycle for num-records
aheev Aug 25, 2025
529eb36
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Aug 26, 2025
d82ce4f
Add deprecation signatures to old options; Remove `consumer config` a…
aheev Aug 26, 2025
c4cd51e
Add checks for deprecated options
aheev Aug 26, 2025
b8b2dc4
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Aug 26, 2025
5c8fd2b
Rename tests to old config names
aheev Aug 27, 2025
d252457
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Aug 27, 2025
9d9a8c6
Rename consumer config files to unique prefix-suffixes
aheev Aug 27, 2025
8946542
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Aug 28, 2025
9e53ae5
replace consumer config -> config in performance service files
aheev Aug 28, 2025
7020430
add required check on bootstrap server opt
aheev Aug 28, 2025
a99f389
add required check on bootstrap server opt tests
aheev Aug 28, 2025
a6b1d7d
Add command-property
aheev Sep 1, 2025
0f06d95
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 1, 2025
4ff0992
fix command-property description
aheev Sep 2, 2025
a36a868
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 2, 2025
5af4ee4
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 3, 2025
fc98942
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 5, 2025
116f7cd
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 5, 2025
d9f8854
rename messages to num-records in testMetricsRetrievedBeforeConsumerC…
aheev Sep 5, 2025
037a177
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 7, 2025
f624975
Add support for running consumer_performance.py on older kafka brokers
aheev Sep 7, 2025
4a3862f
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-19487
aheev Sep 18, 2025
25da0ea
change messages arg to num-records in share_consumer_performance.py
aheev Sep 18, 2025
0fca232
add support for old brokers in share_consumer_performance.py
aheev Sep 18, 2025
d638653
clean debug statements
aheev Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/kafkatest/services/performance/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."

"new-consumer", "Use the new consumer implementation."
"consumer.config", "Consumer config properties file."
"command-config", "Consumer config properties file."
"""

# Root directory for persistent output
Expand Down Expand Up @@ -115,7 +115,7 @@ def start_cmd(self, node):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)

cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE

for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):

"socket-buffer-size", "The size of the tcp RECV size."

"consumer.config", "Consumer config properties file."
"command-config", "Share consumer config properties file."
"""

# Root directory for persistent output
Expand Down Expand Up @@ -100,7 +100,7 @@ def start_cmd(self, node):
for key, value in self.args().items():
cmd += " --%s %s" % (key, value)

cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE
cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE

for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
Expand Down
112 changes: 72 additions & 40 deletions tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
AtomicLong totalMessagesRead = new AtomicLong(0);
AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
AtomicLong joinTimeMs = new AtomicLong(0);
AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
Expand All @@ -68,14 +68,14 @@ public static void main(String[] args) {

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(options.props());
long bytesRead = 0L;
long messagesRead = 0L;
long recordsRead = 0L;
long lastBytesRead = 0L;
long lastMessagesRead = 0L;
long lastRecordsRead = 0L;
long currentTimeMs = System.currentTimeMillis();
long joinStartMs = currentTimeMs;
long startMs = currentTimeMs;
consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs,
bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs,
bytesRead, recordsRead, lastBytesRead, lastRecordsRead,
joinStartMs, joinTimeMsInSingleRound);
long endMs = System.currentTimeMillis();

Expand All @@ -94,12 +94,12 @@ public static void main(String[] args) {
options.dateFormat().format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
totalMessagesRead.get(),
totalMessagesRead.get() / elapsedSec,
totalRecordsRead.get(),
totalRecordsRead.get() / elapsedSec,
joinTimeMs.get(),
fetchTimeInMs,
totalMbRead / (fetchTimeInMs / 1000.0),
totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
totalRecordsRead.get() / (fetchTimeInMs / 1000.0)
);
}

Expand All @@ -122,16 +122,16 @@ protected static void printHeader(boolean showDetailedStats) {

private static void consume(KafkaConsumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
AtomicLong joinTimeMs,
long bytesRead,
long messagesRead,
long recordsRead,
long lastBytesRead,
long lastMessagesRead,
long lastRecordsRead,
long joinStartMs,
AtomicLong joinTimeMsInSingleRound) {
long numMessages = options.numMessages();
long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
Expand All @@ -149,81 +149,81 @@ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
long lastReportTimeMs = currentTimeMs;
long lastConsumedTimeMs = currentTimeMs;

while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesRead += 1;
recordsRead += 1;
if (record.key() != null)
bytesRead += record.key().length;
if (record.value() != null)
bytesRead += record.value().length;
if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
if (showDetailedStats)
printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
joinTimeMsInSingleRound = new AtomicLong(0);
lastReportTimeMs = currentTimeMs;
lastMessagesRead = messagesRead;
lastRecordsRead = recordsRead;
lastBytesRead = bytesRead;
}
}
}

if (messagesRead < numMessages)
System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " +
if (recordsRead < numRecords)
System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
totalMessagesRead.set(messagesRead);
totalRecordsRead.set(recordsRead);
totalBytesRead.set(bytesRead);
}

protected static void printConsumerProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
long joinTimeMsInSingleRound) {
printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
System.out.println();
}

private static void printBasicProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat) {
double elapsedMs = endMs - startMs;
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0;
double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0;
System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id,
totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec);
totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
}

private static void printExtendedProgress(long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
long joinTimeMsInSingleRound) {
long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
long intervalMessagesRead = messagesRead - lastMessagesRead;
long intervalRecordsRead = recordsRead - lastRecordsRead;
double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs;
double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs;
double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs;
System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
}

public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
Expand Down Expand Up @@ -258,11 +258,15 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Long> numMessagesOpt;
private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
Expand Down Expand Up @@ -292,13 +296,18 @@ public ConsumerPerfOptions(String[] args) {
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.");
"offset to consume from, start with the latest record present in the log rather than the earliest record.");
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " +
"This option will be removed in a future version. Use --command-config instead")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Config properties file")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
Expand All @@ -310,7 +319,12 @@ public ConsumerPerfOptions(String[] args) {
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.")
numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " +
"This option will be removed in a future version. Use --num-records instead")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
Expand All @@ -335,8 +349,18 @@ public ConsumerPerfOptions(String[] args) {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt);
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);

CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt);
CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt);

if (options.has(numMessagesOpt)) {
System.out.println("Warning: --messages is deprecated. Use --num-records instead.");
}

if (options.has(consumerConfigOpt)) {
System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead.");
}
}
}

Expand All @@ -349,8 +373,14 @@ public String brokerHostsAndPorts() {
}

public Properties props() throws IOException {
Properties props = (options.has(consumerConfigOpt))
? Utils.loadProps(options.valueOf(consumerConfigOpt))
String commandConfigFile;
if (options.has(consumerConfigOpt)) {
commandConfigFile = options.valueOf(consumerConfigOpt);
} else {
commandConfigFile = options.valueOf(commandConfigOpt);
}
Properties props = commandConfigFile != null
? Utils.loadProps(commandConfigFile)
: new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt));
Expand Down Expand Up @@ -378,8 +408,10 @@ public Optional<Pattern> include() {
: Optional.empty();
}

public long numMessages() {
return options.valueOf(numMessagesOpt);
public long numRecords() {
return options.has(numMessagesOpt)
? options.valueOf(numMessagesOpt)
: options.valueOf(numRecordsOpt);
}

public long reportingIntervalMs() {
Expand Down
Loading
Loading