From b9b35925ea3cb051a71a9157d2fe5cd1b68d776d Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Wed, 3 Sep 2025 15:18:38 -0400 Subject: [PATCH 1/3] improve close java doc --- .../kafka/clients/consumer/KafkaConsumer.java | 18 +++++++++++++----- .../clients/consumer/KafkaShareConsumer.java | 17 +++++++++++------ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 8bba69198a6b0..052a2f18ff39d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1772,10 +1772,13 @@ public void enforceRebalance() { * If auto-commit is enabled, this will commit the current offsets if possible within the default * timeout. See {@link #close(CloseOptions)} for details. Note that {@link #wakeup()} * cannot be used to interrupt close. + *

+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted - * before or while this function is called - * @throws org.apache.kafka.common.KafkaException for any other error during close + * before or while this function is called + * @throws org.apache.kafka.common.KafkaException for any other error during close */ @Override public void close() { @@ -1799,12 +1802,14 @@ public void close() { * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and * {@link ConsumerRebalanceListener}) does not consume time from the close timeout. + *

+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * - * @throws IllegalArgumentException If the {@code timeout} is negative. - * @throws InterruptException If the thread is interrupted before or while this function is called + * @throws IllegalArgumentException If the {@code timeout} is negative. + * @throws InterruptException If the thread is interrupted before or while this function is called * @throws org.apache.kafka.common.KafkaException for any other error during close */ @Deprecated(since = "4.1") @@ -1833,6 +1838,9 @@ public void close(Duration timeout) { * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and * {@link ConsumerRebalanceListener}) does not consume time from the close timeout. + *

+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param option see {@link CloseOptions}; cannot be {@code null} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 655788e846945..7f3bad2e318f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -679,10 +679,13 @@ public void unregisterMetricFromSubscription(KafkaMetric metric) { * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * This will commit acknowledgements if possible within the default timeout. * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close. + *

+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * - * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called * @throws InterruptException if the thread is interrupted before or while this method is called - * @throws KafkaException for any other error during close + * @throws KafkaException for any other error during close */ @Override public void close() { @@ -701,14 +704,16 @@ public void close() { * Even if a larger timeout is specified, the consumer will not wait longer than * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link AcknowledgementCommitCallback}) do not consume time from the close timeout. + *

+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * * @throws IllegalArgumentException if the {@code timeout} is negative - * @throws WakeupException if {@link #wakeup()} is called before or while this method is called - * @throws InterruptException if the thread is interrupted before or while this method is called - * @throws KafkaException for any other error during close + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted before or while this method is called + * @throws KafkaException for any other error during close */ @Override public void close(Duration timeout) { From a39c557c2f8c3b2a5e2476cf9479fb2f172bb4f6 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 5 Sep 2025 09:47:30 -0400 Subject: [PATCH 2/3] add exceptions --- .../kafka/clients/consumer/KafkaConsumer.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 052a2f18ff39d..c0339ecb0b06b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.errors.InvalidRegularExpression; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -1776,9 +1777,10 @@ public void enforceRebalance() { * This close operation will attempt all shutdown steps even if one of them fails. * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * - * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted - * before or while this function is called - * @throws org.apache.kafka.common.KafkaException for any other error during close + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Override public void close() { @@ -1808,9 +1810,11 @@ public void close() { * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * @throws IllegalArgumentException If the {@code timeout} is negative. - * @throws InterruptException If the thread is interrupted before or while this function is called - * @throws org.apache.kafka.common.KafkaException for any other error during close + * @throws IllegalArgumentException If the {@code timeout} is negative. + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Deprecated(since = "4.1") @Override @@ -1843,6 +1847,11 @@ public void close(Duration timeout) { * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param option see {@link CloseOptions}; cannot be {@code null} + * @throws IllegalArgumentException If the {@code option} timeout is negative + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Override public void close(CloseOptions option) { From bc42af81fcc7711bde6a136f02d3c0d88b040e1e Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 5 Sep 2025 09:58:15 -0400 Subject: [PATCH 3/3] checkstyle fix --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c0339ecb0b06b..e74cf0414a8e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; -import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.errors.InvalidRegularExpression; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Deserializer;