diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 59698607912c4..e015331adb45f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -25,9 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; @@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } private static final int MAX_RECORD_EMPTY_RETRIES = 30; + private static final long MAX_IDLE_TIME_MS = 600000L; private static class ValueList { public final String key; @@ -383,60 +386,79 @@ public static VerificationResult verify(final String kafka, props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); - final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); + // Verify all transactions are finished before proceeding with data verification + if (eosEnabled) { + final Properties txnProps = new Properties(); + txnProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + txnProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + txnProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + + final VerificationResult txnResult; + try (final KafkaConsumer consumer = new KafkaConsumer<>(txnProps)) { + txnResult = verifyAllTransactionFinished(consumer, kafka); + } + if (!txnResult.passed()) { + System.err.println("Transaction verification failed: " + txnResult.result()); + System.out.println("FAILED"); + return txnResult; + } + } final int recordsGenerated = inputs.size() * maxRecordsPerKey; - int recordsProcessed = 0; - final Map processed = - Stream.of(NUMERIC_VALUE_TOPICS) - .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - final Map>>> events = new HashMap<>(); - VerificationResult verificationResult = new VerificationResult(false, "no results yet"); - int retry = 0; final long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); - if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - verificationResult = verifyAll(inputs, events, false, eosEnabled); - if (verificationResult.passed()) { - break; - } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { - System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); - break; + int recordsProcessed = 0; + + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + final Map processed = + Stream.of(NUMERIC_VALUE_TOPICS) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + int retry = 0; + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + if (records.isEmpty() && recordsProcessed >= recordsGenerated) { + verificationResult = verifyAll(inputs, events, false, eosEnabled); + if (verificationResult.passed()) { + break; + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); + break; + } else { + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); + } } else { - System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); - } - } else { - System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); + System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); - retry = 0; - for (final ConsumerRecord record : records) { - final String key = record.key(); + retry = 0; + for (final ConsumerRecord record : records) { + final String key = record.key(); - final String topic = record.topic(); - processed.get(topic).incrementAndGet(); + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); - if (topic.equals("echo")) { - recordsProcessed++; - if (recordsProcessed % 100 == 0) { - System.out.println("Echo records processed = " + recordsProcessed); + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); + } } + + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(record); } - events.computeIfAbsent(topic, t -> new HashMap<>()) - .computeIfAbsent(key, k -> new LinkedList<>()) - .add(record); + System.out.println(processed); } - - System.out.println(processed); } } - consumer.close(); final long finished = System.currentTimeMillis() - start; System.out.println("Verification time=" + finished); @@ -491,24 +513,6 @@ private static Map> parseRecordsForEchoTopic( .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) : Collections.emptyMap(); } - public static class VerificationResult { - private final boolean passed; - private final String result; - - VerificationResult(final boolean passed, final String result) { - this.passed = passed; - this.result = result; - } - - public boolean passed() { - return passed; - } - - public String result() { - return result; - } - } - private static VerificationResult verifyAll(final Map> inputs, final Map>>> events, final boolean printResults, @@ -732,4 +736,54 @@ private static List getAllPartitions(final KafkaConsumer c return partitions; } + private static VerificationResult verifyAllTransactionFinished(final KafkaConsumer consumer, + final String kafka) { + // Get all output topics except "data" (which is the input topic) + final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS) + .filter(topic -> !topic.equals("data")) + .toArray(String[]::new); + + final List partitions = getAllPartitions(consumer, outputTopics); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + for (final TopicPartition tp : partitions) { + System.out.println(tp + " at position " + consumer.position(tp)); + } + + final Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted"); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { + while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + consumer.seekToEnd(partitions); + final Map topicEndOffsets = consumerUncommitted.endOffsets(partitions); + + final java.util.Iterator iterator = partitions.iterator(); + while (iterator.hasNext()) { + final TopicPartition topicPartition = iterator.next(); + final long position = consumer.position(topicPartition); + + if (position == topicEndOffsets.get(topicPartition)) { + iterator.remove(); + System.out.println("Removing " + topicPartition + " at position " + position); + } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { + return new VerificationResult(false, "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); + } else { + System.out.println("Retry " + topicPartition + " at position " + position); + } + } + sleep(1000L); + } + } + + if (!partitions.isEmpty()) { + return new VerificationResult(false, "Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); + } + return new VerificationResult(true, "All transactions finished successfully"); + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 7e670802b93ad..3a664da65b64b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -128,4 +128,22 @@ public static void sleep(final long duration) { } catch (final Exception ignore) { } } + public static class VerificationResult { + private final boolean passed; + private final String result; + + public VerificationResult(final boolean passed, final String result) { + this.passed = passed; + this.result = result; + } + + public boolean passed() { + return passed; + } + + public String result() { + return result; + } + } + }