diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java index 5fd2ad2008964..c69c9c35fd462 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java @@ -553,14 +553,19 @@ private void testInterceptors(Map consumerConfig) throws Excepti // commit sync and verify onCommit is called var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); - consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L))); - assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset()); + consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, "metadata"))); + OffsetAndMetadata metadata = consumer.committed(Set.of(TP)).get(TP); + assertEquals(2, metadata.offset()); + assertEquals("metadata", metadata.metadata()); assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); // commit async and verify onCommit is called - var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L)); + var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null)); sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit)); - assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset()); + metadata = consumer.committed(Set.of(TP)).get(TP); + assertEquals(5, metadata.offset()); + // null metadata will be converted to an empty string + assertEquals("", metadata.metadata()); assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); } // cleanup diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java index d6b3b947c209d..f459dd5ba5507 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java @@ -54,10 +54,7 @@ public OffsetAndMetadata(long offset, Optional leaderEpoch, String meta // The server converts null metadata to an empty string. So we store it as an empty string as well on the client // to be consistent. - if (metadata == null) - this.metadata = OffsetFetchResponse.NO_METADATA; - else - this.metadata = metadata; + this.metadata = Objects.requireNonNullElse(metadata, OffsetFetchResponse.NO_METADATA); } /** @@ -82,6 +79,11 @@ public long offset() { return offset; } + /** + * Get the metadata of the previously consumed record. + * + * @return the metadata or empty string if no metadata + */ public String metadata() { return metadata; } @@ -106,21 +108,20 @@ public boolean equals(Object o) { OffsetAndMetadata that = (OffsetAndMetadata) o; return offset == that.offset && Objects.equals(metadata, that.metadata) && - Objects.equals(leaderEpoch, that.leaderEpoch); + Objects.equals(leaderEpoch(), that.leaderEpoch()); } @Override public int hashCode() { - return Objects.hash(offset, metadata, leaderEpoch); + return Objects.hash(offset, metadata, leaderEpoch()); } @Override public String toString() { return "OffsetAndMetadata{" + "offset=" + offset + - ", leaderEpoch=" + leaderEpoch + + ", leaderEpoch=" + leaderEpoch().orElse(null) + ", metadata='" + metadata + '\'' + '}'; } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java index 3035703ff37ab..c1a13c054eea4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java @@ -65,4 +65,19 @@ public void testDeserializationCompatibilityWithLeaderEpoch() throws IOException assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit metadata"), deserializedObject); } + @Test + public void testEqualsWithNullAndNegativeLeaderEpoch() { + OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L, Optional.empty(), "metadata"); + OffsetAndMetadata metadataWithNegativeEpoch = new OffsetAndMetadata(100L, Optional.of(-1), "metadata"); + assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch); + assertEquals(metadataWithNullEpoch.hashCode(), metadataWithNegativeEpoch.hashCode()); + } + + @Test + public void testEqualsWithNullAndEmptyMetadata() { + OffsetAndMetadata metadataWithNullMetadata = new OffsetAndMetadata(100L, Optional.of(1), null); + OffsetAndMetadata metadataWithEmptyMetadata = new OffsetAndMetadata(100L, Optional.of(1), ""); + assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata); + assertEquals(metadataWithNullMetadata.hashCode(), metadataWithEmptyMetadata.hashCode()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index d4ceeedde56b1..7032c13b28508 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -766,6 +766,7 @@ public void testSuccessfulOffsetFetch() { // Complete request with a response long expectedOffset = 100; + String expectedMetadata = "metadata"; NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0); OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(DEFAULT_GROUP_ID) @@ -777,6 +778,7 @@ public void testSuccessfulOffsetFetch() { .setPartitionIndex(tp.partition()) .setCommittedOffset(expectedOffset) .setCommittedLeaderEpoch(1) + .setMetadata(expectedMetadata) )) )); req.handler().onComplete(buildOffsetFetchClientResponse(req, groupResponse, false)); @@ -794,6 +796,7 @@ public void testSuccessfulOffsetFetch() { assertEquals(1, offsetsAndMetadata.size()); assertTrue(offsetsAndMetadata.containsKey(tp)); assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset()); + assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata()); assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " + "request should be removed from the queue when a response is received."); }