Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/async-fake-aiokafka-consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ The `FakeAIOKafkaConsumer` class is a mock implementation of aiokafka's AIOKafka
#### `unsubscribe(self)`
- **Description:** Resets subscribed topics.

#### `_get_key(self, topic, partition) -> str`
- **Description:** Generates `consumer_store` lookup key from topic/partition.

#### `getone(self)`
- **Description:** Gets the next available message from subscribed topics. Updates `consumer_store` as messages are consumed.

Expand Down
40 changes: 27 additions & 13 deletions mockafka/aiokafka/aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import random
import re
import warnings
from collections.abc import Iterable, Iterator, Set
from collections.abc import Iterable, Iterator, Set, Mapping
from typing import Any, Optional

from aiokafka.abc import ConsumerRebalanceListener # type: ignore[import-untyped]
from aiokafka.errors import ConsumerStoppedError # type: ignore[import-untyped]
from aiokafka.util import commit_structure_validate # type: ignore[import-untyped]
from aiokafka.structs import ( # type: ignore[import-untyped]
ConsumerRecord,
OffsetAndMetadata,
TopicPartition,
)
from typing_extensions import Self
Expand Down Expand Up @@ -77,7 +79,6 @@ class FakeAIOKafkaConsumer:
- subscribe(): Subscribe to topics by name.
- subscription(): Get subscribed topics.
- unsubscribe(): Reset subscribed topics.
- _get_key(): Generate consumer_store lookup key from topic/partition.
- getone(): Get next available message from subscribed topics.
Updates consumer_store as messages are consumed.
- getmany(): Get next available messages from subscribed topics.
Expand All @@ -86,7 +87,7 @@ class FakeAIOKafkaConsumer:

def __init__(self, *topics: str, **kwargs: Any) -> None:
self.kafka = KafkaStore()
self.consumer_store: dict[str, int] = {}
self.consumer_store: dict[TopicPartition, int] = {}
self.subscribed_topic = [x for x in topics if self.kafka.is_topic_exist(x)]
self._is_closed = True

Expand All @@ -98,18 +99,34 @@ async def stop(self) -> None:
self.consumer_store = {}
self._is_closed = True

async def commit(self):
for item in self.consumer_store:
topic, partition = item.split("*")
async def commit(
self,
offsets: dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata] | None = None,
):
validated = commit_structure_validate(offsets or self.consumer_store)
simple_offsets: dict[TopicPartition, int]
simple_offsets = {
tp: offset
for tp, (offset, _) in validated.items()
}

for tp, offset in simple_offsets.items():
topic, partition = tp
if (
self.kafka.get_partition_first_offset(topic, partition)
<= self.consumer_store[item]
<= offset
):
self.kafka.set_first_offset(
topic=topic, partition=partition, value=self.consumer_store[item]
topic=topic, partition=partition, value=offset
)

self.consumer_store = {}
# If committing to the same level as read, then we don't need to
# store our offset as it should now match Kafka's own tracking.
#
# Otherwise keep our own tracking as it suggests we've read ahead of
# what we're committing.
if offset == self.consumer_store[tp]:
self.consumer_store.pop(tp)

async def topics(self) -> set[str]:
return set(self.kafka.topic_list())
Expand Down Expand Up @@ -160,9 +177,6 @@ def subscription(self) -> Set[str]:
def unsubscribe(self) -> None:
self.subscribed_topic = []

def _get_key(self, topic, partition) -> str:
return f"{topic}*{partition}"

def _fetch_one(self, topic: str, partition: int) -> Optional[ConsumerRecord[bytes, bytes]]:

first_offset = self.kafka.get_partition_first_offset(
Expand All @@ -175,7 +189,7 @@ def _fetch_one(self, topic: str, partition: int) -> Optional[ConsumerRecord[byte
# Topic partition is empty
return None

topic_key = self._get_key(topic, partition)
topic_key = TopicPartition(topic, partition)

consumer_amount = self.consumer_store.setdefault(topic_key, first_offset)
if consumer_amount == next_offset:
Expand Down
105 changes: 98 additions & 7 deletions tests/test_aiokafka/test_aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ async def produce_message(self):
topic=self.test_topic, partition=0, key=b"test1", value=b"test1"
)

async def test_consume(self):
await self.test_poll_with_commit()

async def test_start(self):
# check consumer store is empty
await self.consumer.start()
Expand Down Expand Up @@ -81,6 +78,15 @@ async def test_poll_without_commit(self):
self.assertIsNone(await self.consumer.getone())
self.assertIsNone(await self.consumer.getone())

await self.consumer.stop()

# We didn't commit, so we should see the same messages again
async with FakeAIOKafkaConsumer(self.test_topic) as new_consumer:
message = await new_consumer.getone()
self.assertEqual(message.value, b"test")
message = await new_consumer.getone()
self.assertEqual(message.value, b"test1")

async def test_partition_specific_poll_without_commit(self):
self.create_topic()
await self.produce_message()
Expand All @@ -97,6 +103,16 @@ async def test_partition_specific_poll_without_commit(self):
)
self.assertEqual(message.value, b"test")

await self.consumer.stop()

# We didn't commit, so we should see the same results again
async with FakeAIOKafkaConsumer(self.test_topic) as new_consumer:
message = await new_consumer.getone(TopicPartition(self.test_topic, 0))
self.assertEqual(message.value, b"test")

message = await new_consumer.getone(TopicPartition(self.test_topic, 2))
self.assertIsNone(message)

async def test_poll_with_commit(self):
self.create_topic()
await self.produce_message()
Expand All @@ -107,12 +123,87 @@ async def test_poll_with_commit(self):
await self.consumer.commit()
self.assertEqual(message.value, b"test")

message = await self.consumer.getone()
await self.consumer.commit()
self.assertEqual(message.value, b"test1")
# We committed, so a new consumer should see the next message in the topic
async with FakeAIOKafkaConsumer(self.test_topic) as new_consumer:
message = await new_consumer.getone()
self.assertEqual(message.value, b"test1")
await new_consumer.commit()

# Back to the original consumer should see empty now that both messages
# are consumed
self.assertIsNone(await self.consumer.getone())
self.assertIsNone(await self.consumer.getone())

async def test_partition_specific_poll_with_commit(self):
topic_a = "topic-a"
topic_b = "topic-b"

self.kafka.create_partition(topic=topic_a, partitions=2)
self.kafka.create_partition(topic=topic_b, partitions=2)

await self.producer.send(
topic=topic_a, partition=0, key=b"a0.0", value=b"a0.0"
)
await self.producer.send(
topic=topic_a, partition=0, key=b"a0.1", value=b"a0.1"
)
await self.producer.send(
topic=topic_a, partition=1, key=b"a1.0", value=b"a1.0"
)
await self.producer.send(
topic=topic_b, partition=0, key=b"b0.0", value=b"b0.0"
)
await self.producer.send(
topic=topic_b, partition=0, key=b"b0.1", value=b"b0.1"
)

self.consumer.subscribe(topics=[topic_a, topic_b])
await self.consumer.start()

await self.consumer.getmany(
TopicPartition(topic_a, 0),
TopicPartition(topic_a, 1),
)
await self.consumer.getone(
TopicPartition(topic_b, 0),
)

# Only commit the first messages from each partition on topic a -- this
# leaves one message non-committed on the 0th partition and shouldn't
# affect either our logical or committed position on topic b.
await self.consumer.commit({
TopicPartition(topic_a, 0): 1,
TopicPartition(topic_a, 1): 1,
})

result = await self.consumer.getmany()
simple_result = {
tp: [x.value for x in msgs]
for tp, msgs in result.items()
}

assert simple_result == {
# Topic A is fully consumed, even though not fully committed, so no
# further results here.

# One more message on topic B partition 0
TopicPartition(topic_b, 0): [b"b0.1"],
}, "Wrong result after commit"

# We didn't commit, so we should see the same results again
async with FakeAIOKafkaConsumer(topic_a, topic_b) as new_consumer:
new_result = await new_consumer.getmany()
simple_new_result = {
tp: [x.value for x in msgs]
for tp, msgs in new_result.items()
}

assert simple_new_result == {
# Topic A partition 1 wasn't fully committed -- the remaining
# message should be here.
TopicPartition(topic_a, 0): [b"a0.1"],
# Topic B wasn't committed -- all messages appear
TopicPartition(topic_b, 0): [b"b0.0", b"b0.1"],
}, "Wrong result from fresh consumer"

async def test_getmany_without_commit(self):
self.create_topic()
Expand Down