diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 647b5b7e4b41b..e7bfdaefe4969 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -785,6 +785,7 @@ private void handleShareFetchSuccess(Node fetchTarget, final Set partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet()); final ShareFetchMetricsAggregator shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(metricsManager, partitions); + List completedFetches = new ArrayList<>(responseData.size()); Map partitionsWithUpdatedLeaderInfo = new HashMap<>(); for (Map.Entry entry : responseData.entrySet()) { TopicIdPartition tip = entry.getKey(); @@ -815,21 +816,26 @@ private void handleShareFetchSuccess(Node fetchTarget, } } - ShareCompletedFetch completedFetch = new ShareCompletedFetch( + completedFetches.add( + new ShareCompletedFetch( logContext, BufferSupplier.create(), fetchTarget.id(), tip, partitionData, shareFetchMetricsAggregator, - requestVersion); - shareFetchBuffer.add(completedFetch); + requestVersion) + ); if (!partitionData.acquiredRecords().isEmpty()) { fetchMoreRecords = false; } } + if (!completedFetches.isEmpty()) { + shareFetchBuffer.add(completedFetches); + } + // Handle any acknowledgements which were not received in the response for this node. if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) { fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, acknowledgements) -> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java index ebd86583ec6f8..7735c80451364 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -32,7 +33,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Predicate; /** * {@code ShareFetchBuffer} buffers up {@link ShareCompletedFetch the results} from the broker responses @@ -73,25 +73,10 @@ boolean isEmpty() { } } - /** - * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has - * visibility for testing. - * - * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise - */ - boolean hasCompletedFetches(Predicate predicate) { - lock.lock(); - try { - return completedFetches.stream().anyMatch(predicate); - } finally { - lock.unlock(); - } - } - - void add(ShareCompletedFetch fetch) { + void add(List fetches) { lock.lock(); try { - completedFetches.add(fetch); + completedFetches.addAll(fetches); notEmptyCondition.signalAll(); } finally { lock.unlock(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java index 2a06324f72a7b..72f9856d731cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.List; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -87,8 +88,7 @@ public void testBasicPeekAndPoll() { try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) { ShareCompletedFetch completedFetch = completedFetch(topicAPartition0); assertTrue(fetchBuffer.isEmpty()); - fetchBuffer.add(completedFetch); - assertTrue(fetchBuffer.hasCompletedFetches(p -> true)); + fetchBuffer.add(List.of(completedFetch)); assertFalse(fetchBuffer.isEmpty()); assertNotNull(fetchBuffer.peek()); assertSame(completedFetch, fetchBuffer.peek()); @@ -111,7 +111,7 @@ public void testCloseClearsData() { assertNull(fetchBuffer.nextInLineFetch()); assertTrue(fetchBuffer.isEmpty()); - fetchBuffer.add(completedFetch(topicAPartition0)); + fetchBuffer.add(List.of(completedFetch(topicAPartition0))); assertFalse(fetchBuffer.isEmpty()); fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0)); @@ -132,8 +132,7 @@ public void testCloseClearsData() { public void testBufferedPartitions() { try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) { fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0)); - fetchBuffer.add(completedFetch(topicAPartition1)); - fetchBuffer.add(completedFetch(topicAPartition2)); + fetchBuffer.add(List.of(completedFetch(topicAPartition1), completedFetch(topicAPartition2))); assertEquals(allPartitions, fetchBuffer.bufferedPartitions()); fetchBuffer.setNextInLineFetch(null); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java index afac3a76d9538..b086feb8cbdcd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java @@ -95,7 +95,7 @@ public void testFetchNormal() { // Validate that the buffer is empty until after we add the fetch data. assertTrue(fetchBuffer.isEmpty()); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); assertFalse(fetchBuffer.isEmpty()); // Validate that the completed fetch isn't initialized just because we add it to the buffer. @@ -153,7 +153,7 @@ protected ShareCompletedFetch initialize(final ShareCompletedFetch completedFetc ShareCompletedFetch completedFetch = completedFetchBuilder .recordCount(10) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); // At first, the queue is populated assertFalse(fetchBuffer.isEmpty()); @@ -170,7 +170,7 @@ public void testFetchWithTopicAuthorizationFailed() { ShareCompletedFetch completedFetch = completedFetchBuilder .error(Errors.TOPIC_AUTHORIZATION_FAILED) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); assertThrows(TopicAuthorizationException.class, () -> fetchCollector.collect(fetchBuffer)); } @@ -182,7 +182,7 @@ public void testFetchWithUnknownLeaderEpoch() { ShareCompletedFetch completedFetch = completedFetchBuilder .error(Errors.UNKNOWN_LEADER_EPOCH) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); ShareFetch fetch = fetchCollector.collect(fetchBuffer); assertTrue(fetch.isEmpty()); } @@ -195,7 +195,7 @@ public void testFetchWithUnknownServerError() { ShareCompletedFetch completedFetch = completedFetchBuilder .error(Errors.UNKNOWN_SERVER_ERROR) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); ShareFetch fetch = fetchCollector.collect(fetchBuffer); assertTrue(fetch.isEmpty()); } @@ -208,7 +208,7 @@ public void testFetchWithCorruptMessage() { ShareCompletedFetch completedFetch = completedFetchBuilder .error(Errors.CORRUPT_MESSAGE) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); assertThrows(KafkaException.class, () -> fetchCollector.collect(fetchBuffer)); } @@ -221,7 +221,7 @@ public void testFetchWithOtherErrors(final Errors error) { ShareCompletedFetch completedFetch = completedFetchBuilder .error(error) .build(); - fetchBuffer.add(completedFetch); + fetchBuffer.add(List.of(completedFetch)); assertThrows(IllegalStateException.class, () -> fetchCollector.collect(fetchBuffer)); }