Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,19 @@ private void testInterceptors(Map<String, Object> consumerConfig) throws Excepti

// commit sync and verify onCommit is called
var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, "metadata")));
OffsetAndMetadata metadata = consumer.committed(Set.of(TP)).get(TP);
assertEquals(2, metadata.offset());
assertEquals("metadata", metadata.metadata());
assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());

// commit async and verify onCommit is called
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
metadata = consumer.committed(Set.of(TP)).get(TP);
assertEquals(5, metadata.offset());
// null metadata will be converted to an empty string
assertEquals("", metadata.metadata());
assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
}
// cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ public OffsetAndMetadata(long offset, Optional<Integer> leaderEpoch, String meta

// The server converts null metadata to an empty string. So we store it as an empty string as well on the client
// to be consistent.
if (metadata == null)
this.metadata = OffsetFetchResponse.NO_METADATA;
else
this.metadata = metadata;
this.metadata = Objects.requireNonNullElse(metadata, OffsetFetchResponse.NO_METADATA);
}

/**
Expand All @@ -82,6 +79,11 @@ public long offset() {
return offset;
}

/**
* Get the metadata of the previously consumed record.
*
* @return the metadata or empty string if no metadata
*/
public String metadata() {
return metadata;
}
Expand All @@ -106,21 +108,20 @@ public boolean equals(Object o) {
OffsetAndMetadata that = (OffsetAndMetadata) o;
return offset == that.offset &&
Objects.equals(metadata, that.metadata) &&
Objects.equals(leaderEpoch, that.leaderEpoch);
Objects.equals(leaderEpoch(), that.leaderEpoch());
}

@Override
public int hashCode() {
return Objects.hash(offset, metadata, leaderEpoch);
return Objects.hash(offset, metadata, leaderEpoch());
}

@Override
public String toString() {
return "OffsetAndMetadata{" +
"offset=" + offset +
", leaderEpoch=" + leaderEpoch +
", leaderEpoch=" + leaderEpoch().orElse(null) +
", metadata='" + metadata + '\'' +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ public void testDeserializationCompatibilityWithLeaderEpoch() throws IOException
assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit metadata"), deserializedObject);
}

@Test
public void testEqualsWithNullAndNegativeLeaderEpoch() {
OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L, Optional.empty(), "metadata");
OffsetAndMetadata metadataWithNegativeEpoch = new OffsetAndMetadata(100L, Optional.of(-1), "metadata");
assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch);
assertEquals(metadataWithNullEpoch.hashCode(), metadataWithNegativeEpoch.hashCode());
}

@Test
public void testEqualsWithNullAndEmptyMetadata() {
OffsetAndMetadata metadataWithNullMetadata = new OffsetAndMetadata(100L, Optional.of(1), null);
OffsetAndMetadata metadataWithEmptyMetadata = new OffsetAndMetadata(100L, Optional.of(1), "");
assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata);
assertEquals(metadataWithNullMetadata.hashCode(), metadataWithEmptyMetadata.hashCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ public void testSuccessfulOffsetFetch() {

// Complete request with a response
long expectedOffset = 100;
String expectedMetadata = "metadata";
NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(DEFAULT_GROUP_ID)
Expand All @@ -777,6 +778,7 @@ public void testSuccessfulOffsetFetch() {
.setPartitionIndex(tp.partition())
.setCommittedOffset(expectedOffset)
.setCommittedLeaderEpoch(1)
.setMetadata(expectedMetadata)
))
));
req.handler().onComplete(buildOffsetFetchClientResponse(req, groupResponse, false));
Expand All @@ -794,6 +796,7 @@ public void testSuccessfulOffsetFetch() {
assertEquals(1, offsetsAndMetadata.size());
assertTrue(offsetsAndMetadata.containsKey(tp));
assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata());
assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
"request should be removed from the queue when a response is received.");
}
Expand Down