Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}

private static final int MAX_RECORD_EMPTY_RETRIES = 30;
private static final long MAX_IDLE_TIME_MS = 600000L;

private static class ValueList {
public final String key;
Expand Down Expand Up @@ -383,60 +386,79 @@ public static VerificationResult verify(final String kafka,
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
// Verify all transactions are finished before proceeding with data verification
if (eosEnabled) {
final Properties txnProps = new Properties();
txnProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
txnProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
txnProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());

final VerificationResult txnResult;
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(txnProps)) {
txnResult = verifyAllTransactionFinished(consumer, kafka);
}

if (!txnResult.passed()) {
System.err.println("Transaction verification failed: " + txnResult.result());
System.out.println("FAILED");
return txnResult;
}
}
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));

final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

VerificationResult verificationResult = new VerificationResult(false, "no results yet");
int retry = 0;
final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false, eosEnabled);
if (verificationResult.passed()) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
break;
int recordsProcessed = 0;

try (final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final Map<String, AtomicInteger> processed =
Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));

int retry = 0;
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false, eosEnabled);
if (verificationResult.passed()) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
break;
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
}
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
}
} else {
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");

retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();
retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();

final String topic = record.topic();
processed.get(topic).incrementAndGet();
final String topic = record.topic();
processed.get(topic).incrementAndGet();

if (topic.equals("echo")) {
recordsProcessed++;
if (recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + recordsProcessed);
if (topic.equals("echo")) {
recordsProcessed++;
if (recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + recordsProcessed);
}
}

events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
}

events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
System.out.println(processed);
}

System.out.println(processed);
}
}
consumer.close();

final long finished = System.currentTimeMillis() - start;
System.out.println("Verification time=" + finished);
Expand Down Expand Up @@ -491,24 +513,6 @@ private static Map<String, Set<Number>> parseRecordsForEchoTopic(
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) : Collections.emptyMap();
}

public static class VerificationResult {
private final boolean passed;
private final String result;

VerificationResult(final boolean passed, final String result) {
this.passed = passed;
this.result = result;
}

public boolean passed() {
return passed;
}

public String result() {
return result;
}
}

private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults,
Expand Down Expand Up @@ -732,4 +736,54 @@ private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> c
return partitions;
}

private static VerificationResult verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
final String kafka) {
// Get all output topics except "data" (which is the input topic)
final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS)
.filter(topic -> !topic.equals("data"))
.toArray(String[]::new);

final List<TopicPartition> partitions = getAllPartitions(consumer, outputTopics);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (final TopicPartition tp : partitions) {
System.out.println(tp + " at position " + consumer.position(tp));
}

final Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
consumer.seekToEnd(partitions);
final Map<TopicPartition, Long> topicEndOffsets = consumerUncommitted.endOffsets(partitions);

final java.util.Iterator<TopicPartition> iterator = partitions.iterator();
while (iterator.hasNext()) {
final TopicPartition topicPartition = iterator.next();
final long position = consumer.position(topicPartition);

if (position == topicEndOffsets.get(topicPartition)) {
iterator.remove();
System.out.println("Removing " + topicPartition + " at position " + position);
} else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
return new VerificationResult(false, "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
} else {
System.out.println("Retry " + topicPartition + " at position " + position);
}
}
sleep(1000L);
}
}

if (!partitions.isEmpty()) {
return new VerificationResult(false, "Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec.");
}
return new VerificationResult(true, "All transactions finished successfully");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,22 @@ public static void sleep(final long duration) {
} catch (final Exception ignore) { }
}

public static class VerificationResult {
private final boolean passed;
private final String result;

public VerificationResult(final boolean passed, final String result) {
this.passed = passed;
this.result = result;
}

public boolean passed() {
return passed;
}

public String result() {
return result;
}
}

}