diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 8bba69198a6b0..e74cf0414a8e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidRegularExpression; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Deserializer; @@ -1772,10 +1773,14 @@ public void enforceRebalance() { * If auto-commit is enabled, this will commit the current offsets if possible within the default * timeout. See {@link #close(CloseOptions)} for details. Note that {@link #wakeup()} * cannot be used to interrupt close. + *
+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * - * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted - * before or while this function is called - * @throws org.apache.kafka.common.KafkaException for any other error during close + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Override public void close() { @@ -1799,13 +1804,17 @@ public void close() { * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and * {@link ConsumerRebalanceListener}) does not consume time from the close timeout. + *
+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * * @throws IllegalArgumentException If the {@code timeout} is negative. - * @throws InterruptException If the thread is interrupted before or while this function is called - * @throws org.apache.kafka.common.KafkaException for any other error during close + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Deprecated(since = "4.1") @Override @@ -1833,8 +1842,16 @@ public void close(Duration timeout) { * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and * {@link ConsumerRebalanceListener}) does not consume time from the close timeout. + *
+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param option see {@link CloseOptions}; cannot be {@code null} + * @throws IllegalArgumentException If the {@code option} timeout is negative + * @throws WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws InterruptException if the calling thread is interrupted before or while this function is called + * @throws KafkaException for any other error during close + * (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits) */ @Override public void close(CloseOptions option) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 655788e846945..7f3bad2e318f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -679,10 +679,13 @@ public void unregisterMetricFromSubscription(KafkaMetric metric) { * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * This will commit acknowledgements if possible within the default timeout. * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close. + *
+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * - * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called * @throws InterruptException if the thread is interrupted before or while this method is called - * @throws KafkaException for any other error during close + * @throws KafkaException for any other error during close */ @Override public void close() { @@ -701,14 +704,16 @@ public void close() { * Even if a larger timeout is specified, the consumer will not wait longer than * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation. * Note that the execution time of callbacks (such as {@link AcknowledgementCommitCallback}) do not consume time from the close timeout. + *
+ * This close operation will attempt all shutdown steps even if one of them fails. + * It logs all encountered errors, continues to execute the next steps, and finally throws the first error found. * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * * @throws IllegalArgumentException if the {@code timeout} is negative - * @throws WakeupException if {@link #wakeup()} is called before or while this method is called - * @throws InterruptException if the thread is interrupted before or while this method is called - * @throws KafkaException for any other error during close + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted before or while this method is called + * @throws KafkaException for any other error during close */ @Override public void close(Duration timeout) {