Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -1772,10 +1773,14 @@ public void enforceRebalance() {
* If auto-commit is enabled, this will commit the current offsets if possible within the default
* timeout. See {@link #close(CloseOptions)} for details. Note that {@link #wakeup()}
* cannot be used to interrupt close.
* <p>
* This close operation will attempt all shutdown steps even if one of them fails.
* It logs all encountered errors, continues to execute the next steps, and finally throws the first error found.
*
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
* before or while this function is called
* @throws org.apache.kafka.common.KafkaException for any other error during close
* @throws WakeupException if {@link #wakeup()} is called before or while this function is called
* @throws InterruptException if the calling thread is interrupted before or while this function is called
* @throws KafkaException for any other error during close
* (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits)
*/
@Override
public void close() {
Expand All @@ -1799,13 +1804,17 @@ public void close() {
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and
* {@link ConsumerRebalanceListener}) does not consume time from the close timeout.
* <p>
* This close operation will attempt all shutdown steps even if one of them fails.
* It logs all encountered errors, continues to execute the next steps, and finally throws the first error found.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
*
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted before or while this function is called
* @throws org.apache.kafka.common.KafkaException for any other error during close
* @throws WakeupException if {@link #wakeup()} is called before or while this function is called
* @throws InterruptException if the calling thread is interrupted before or while this function is called
* @throws KafkaException for any other error during close
* (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits)
*/
@Deprecated(since = "4.1")
@Override
Expand Down Expand Up @@ -1833,8 +1842,16 @@ public void close(Duration timeout) {
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and
* {@link ConsumerRebalanceListener}) does not consume time from the close timeout.
* <p>
* This close operation will attempt all shutdown steps even if one of them fails.
* It logs all encountered errors, continues to execute the next steps, and finally throws the first error found.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also list the exceptions, as we do for other close methods?

Copy link
Contributor

@frankvicky frankvicky Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, makes sense to me to align these comments.

*
* @param option see {@link CloseOptions}; cannot be {@code null}
* @throws IllegalArgumentException If the {@code option} timeout is negative
* @throws WakeupException if {@link #wakeup()} is called before or while this function is called
* @throws InterruptException if the calling thread is interrupted before or while this function is called
* @throws KafkaException for any other error during close
* (e.g., errors thrown from rebalance callbacks or commit callbacks from previous asynchronous commits)
*/
@Override
public void close(CloseOptions option) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,13 @@ public void unregisterMetricFromSubscription(KafkaMetric metric) {
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
* This will commit acknowledgements if possible within the default timeout.
* See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
* <p>
* This close operation will attempt all shutdown steps even if one of them fails.
* It logs all encountered errors, continues to execute the next steps, and finally throws the first error found.
*
* @throws WakeupException if {@link #wakeup()} is called before or while this method is called
* @throws WakeupException if {@link #wakeup()} is called before or while this method is called
* @throws InterruptException if the thread is interrupted before or while this method is called
* @throws KafkaException for any other error during close
* @throws KafkaException for any other error during close
*/
@Override
public void close() {
Expand All @@ -701,14 +704,16 @@ public void close() {
* Even if a larger timeout is specified, the consumer will not wait longer than
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link AcknowledgementCommitCallback}) do not consume time from the close timeout.
* <p>
* This close operation will attempt all shutdown steps even if one of them fails.
* It logs all encountered errors, continues to execute the next steps, and finally throws the first error found.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
*
* @throws IllegalArgumentException if the {@code timeout} is negative
* @throws WakeupException if {@link #wakeup()} is called before or while this method is called
* @throws InterruptException if the thread is interrupted before or while this method is called
* @throws KafkaException for any other error during close
* @throws WakeupException if {@link #wakeup()} is called before or while this method is called
* @throws InterruptException if the thread is interrupted before or while this method is called
* @throws KafkaException for any other error during close
*/
@Override
public void close(Duration timeout) {
Expand Down
Loading