Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
final Set<TopicPartition> partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
final ShareFetchMetricsAggregator shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(metricsManager, partitions);

List<ShareCompletedFetch> completedFetches = new ArrayList<>(responseData.size());
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
for (Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> entry : responseData.entrySet()) {
TopicIdPartition tip = entry.getKey();
Expand Down Expand Up @@ -815,21 +816,26 @@ private void handleShareFetchSuccess(Node fetchTarget,
}
}

ShareCompletedFetch completedFetch = new ShareCompletedFetch(
completedFetches.add(
new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
fetchTarget.id(),
tip,
partitionData,
shareFetchMetricsAggregator,
requestVersion);
shareFetchBuffer.add(completedFetch);
requestVersion)
);

if (!partitionData.acquiredRecords().isEmpty()) {
fetchMoreRecords = false;
}
}

if (!completedFetches.isEmpty()) {
shareFetchBuffer.add(completedFetches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this primarily a performance optimization for handling many partitions? If so, should we reduce the ShareAcknowledgementCommitCallbackEvent by batching partitions ?

}

// Handle any acknowledgements which were not received in the response for this node.
if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, acknowledgements) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* {@code ShareFetchBuffer} buffers up {@link ShareCompletedFetch the results} from the broker responses
Expand Down Expand Up @@ -73,25 +73,10 @@ boolean isEmpty() {
}
}

/**
* Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has
* visibility for testing.
*
* @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise
*/
boolean hasCompletedFetches(Predicate<ShareCompletedFetch> predicate) {
lock.lock();
try {
return completedFetches.stream().anyMatch(predicate);
} finally {
lock.unlock();
}
}

void add(ShareCompletedFetch fetch) {
void add(List<ShareCompletedFetch> fetches) {
lock.lock();
try {
completedFetches.add(fetch);
completedFetches.addAll(fetches);
notEmptyCondition.signalAll();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -87,8 +88,7 @@ public void testBasicPeekAndPoll() {
try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) {
ShareCompletedFetch completedFetch = completedFetch(topicAPartition0);
assertTrue(fetchBuffer.isEmpty());
fetchBuffer.add(completedFetch);
assertTrue(fetchBuffer.hasCompletedFetches(p -> true));
fetchBuffer.add(List.of(completedFetch));
assertFalse(fetchBuffer.isEmpty());
assertNotNull(fetchBuffer.peek());
assertSame(completedFetch, fetchBuffer.peek());
Expand All @@ -111,7 +111,7 @@ public void testCloseClearsData() {
assertNull(fetchBuffer.nextInLineFetch());
assertTrue(fetchBuffer.isEmpty());

fetchBuffer.add(completedFetch(topicAPartition0));
fetchBuffer.add(List.of(completedFetch(topicAPartition0)));
assertFalse(fetchBuffer.isEmpty());

fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
Expand All @@ -132,8 +132,7 @@ public void testCloseClearsData() {
public void testBufferedPartitions() {
try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) {
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
fetchBuffer.add(completedFetch(topicAPartition1));
fetchBuffer.add(completedFetch(topicAPartition2));
fetchBuffer.add(List.of(completedFetch(topicAPartition1), completedFetch(topicAPartition2)));
assertEquals(allPartitions, fetchBuffer.bufferedPartitions());

fetchBuffer.setNextInLineFetch(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testFetchNormal() {

// Validate that the buffer is empty until after we add the fetch data.
assertTrue(fetchBuffer.isEmpty());
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
assertFalse(fetchBuffer.isEmpty());

// Validate that the completed fetch isn't initialized just because we add it to the buffer.
Expand Down Expand Up @@ -153,7 +153,7 @@ protected ShareCompletedFetch initialize(final ShareCompletedFetch completedFetc
ShareCompletedFetch completedFetch = completedFetchBuilder
.recordCount(10)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));

// At first, the queue is populated
assertFalse(fetchBuffer.isEmpty());
Expand All @@ -170,7 +170,7 @@ public void testFetchWithTopicAuthorizationFailed() {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.TOPIC_AUTHORIZATION_FAILED)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
assertThrows(TopicAuthorizationException.class, () -> fetchCollector.collect(fetchBuffer));
}

Expand All @@ -182,7 +182,7 @@ public void testFetchWithUnknownLeaderEpoch() {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_LEADER_EPOCH)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
assertTrue(fetch.isEmpty());
}
Expand All @@ -195,7 +195,7 @@ public void testFetchWithUnknownServerError() {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_SERVER_ERROR)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
assertTrue(fetch.isEmpty());
}
Expand All @@ -208,7 +208,7 @@ public void testFetchWithCorruptMessage() {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.CORRUPT_MESSAGE)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
assertThrows(KafkaException.class, () -> fetchCollector.collect(fetchBuffer));
}

Expand All @@ -221,7 +221,7 @@ public void testFetchWithOtherErrors(final Errors error) {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(error)
.build();
fetchBuffer.add(completedFetch);
fetchBuffer.add(List.of(completedFetch));
assertThrows(IllegalStateException.class, () -> fetchCollector.collect(fetchBuffer));
}

Expand Down