-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Fix: rd_kafka_consume_batch incorrectly advances offset by 2 on EOF messages #5213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…a_q_serve_rkmessages function
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
…h_queue This commit introduces a new test, do_test_consume_batch_eof_position, to verify that the consumer position is correctly updated when EOF messages are received with enable.partition.eof set to true. The test ensures that the consumer position advances by 1 instead of 2, addressing a previously identified bug. The test includes setup for producing messages, consuming them, and validating the final consumer position after EOF is reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a bug in rd_kafka_consume_batch() where EOF messages incorrectly advance the consumer position by 2 instead of 1 when enable.partition.eof=true, causing rd_kafka_position() to return an incorrect offset.
- Added a check to prevent EOF messages from updating consumer position in
rd_kafka_q_serve_rkmessages() - Added comprehensive regression test to verify correct consumer position after EOF messages
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| src/rdkafka_queue.c | Fixed offset advancement logic to exclude EOF messages from position updates |
| tests/0137-barrier_batch_consume.c | Added regression test to verify EOF message position handling |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
This commit modifies the comment in the do_test_consume_batch_eof_position function to clarify that the position value is being extracted from the partition list after reaching EOF. This change enhances code readability and understanding of the test's purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
This PR fixes a bug in
rd_kafka_consume_batch()where EOF messages incorrectly advance the consumer position by 2 instead of 1 whenenable.partition.eof=true. The bug causesrd_kafka_position()to return last_offset + 2 instead of the correct last_offset + 1 after consuming an EOF message in batch mode.Motivation
Root Cause
In
src/rdkafka_queue.c, therd_kafka_q_serve_rkmessages()function updates the consumer position for all messages in a batch, including EOF messages. EOF messages have error codeRD_KAFKA_RESP_ERR__PARTITION_EOFbut are not control messages, so they were incorrectly incrementing the offset.Changes
File:
src/rdkafka_queue.cAdded a check to prevent EOF messages from updating the consumer position.
Regression subtest:
tests/0137-barrier_batch_consume.cNew Subtest
do_test_consume_batch_eof_position()is addedTest Flow
rd_kafka_consume_batch_queue()withenable.partition.eof=truerd_kafka_position()returnslast_offset + 1(notlast_offset + 2)