Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:

- name: Typecheck with mypy
run: |
poetry run mypy mockafka tests
poetry run mypy

- name: Test with pytest
run: |
Expand Down
14 changes: 8 additions & 6 deletions mockafka/admin_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import Any

from confluent_kafka.cimpl import ( # type: ignore[import-untyped]
NewPartitions,
NewTopic,
Expand Down Expand Up @@ -35,7 +37,7 @@ class FakeAdminClientImpl:
store (not implemented).
"""

def __init__(self, clean: bool = False, *args, **kwargs):
def __init__(self, clean: bool = False, *args: Any, **kwargs) -> None:
"""
Initialize the FakeAdminClientImpl.

Expand All @@ -49,7 +51,7 @@ def create_partitions(self, partitions: list[NewPartitions]) -> dict[str, NewPar
Create partitions in the in-memory Kafka store.

Parameters:
- partitions (List[NewPartitions]): List of partition objects to be created.
- partitions (List[NewPartitions]): List of partition objects to be created.

Returns:
- dict[str, NewPartitions]: Dictionary of created partitions.
Expand All @@ -60,7 +62,7 @@ def create_partitions(self, partitions: list[NewPartitions]) -> dict[str, NewPar
result[partition.topic] = partition
return result

def create_partition(self, partition: NewPartitions):
def create_partition(self, partition: NewPartitions) -> None:
"""
Create a single partition in the in-memory Kafka store.

Expand All @@ -71,7 +73,7 @@ def create_partition(self, partition: NewPartitions):
topic=partition.topic, partitions=partition.new_total_count
)

def create_topics(self, topics: list[NewTopic]):
def create_topics(self, topics: list[NewTopic]) -> None:
"""
Create topics in the in-memory Kafka store.

Expand All @@ -81,7 +83,7 @@ def create_topics(self, topics: list[NewTopic]):
for topic in topics:
self.create_topic(topic=topic)

def create_topic(self, topic: NewTopic):
def create_topic(self, topic: NewTopic) -> None:
"""
Create a single topic in the in-memory Kafka store.

Expand All @@ -106,7 +108,7 @@ def delete_topics(
for topic in topics:
self.delete_topic(topic=topic)

def delete_topic(self, topic: NewTopic):
def delete_topic(self, topic: NewTopic) -> None:
"""
Delete a single topic from the in-memory Kafka store.

Expand Down
23 changes: 13 additions & 10 deletions mockafka/aiokafka/aiokafka_admin_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Dict
from typing import Any, Dict

from aiokafka.admin import NewPartitions, NewTopic # type: ignore[import-untyped]

Expand Down Expand Up @@ -31,13 +31,13 @@ class FakeAIOKafkaAdmin:
Calls _create_partition() for each topic.
"""

def __init__(self, clean: bool = False, *args, **kwargs):
def __init__(self, clean: bool = False, *args: Any, **kwargs: Any) -> None:
self.kafka = KafkaStore(clean=clean)

async def close(self):
async def close(self) -> None:
pass

async def start(self):
async def start(self) -> None:
pass

async def _create_topic(self, topic: NewTopic) -> None:
Expand All @@ -46,23 +46,26 @@ async def _create_topic(self, topic: NewTopic) -> None:
topic=topic.name, partition_count=topic.num_partitions
)

async def _remove_topic(self, topic: str):
async def _remove_topic(self, topic: str) -> None:
self.kafka.remove_topic(topic=topic)

async def create_topics(self, new_topics: list[NewTopic], *args, **kwargs):
async def create_topics(self, new_topics: list[NewTopic], *args: Any, **kwargs: Any) -> None:
for topic in new_topics:
await self._create_topic(topic=topic)

async def delete_topics(self, topics: list[str], **kwargs) -> None:
async def delete_topics(self, topics: list[str], **kwargs: Any) -> None:
for topic in topics:
await self._remove_topic(topic=topic)

async def _create_partition(self, topic: str, partition_count: int):
async def _create_partition(self, topic: str, partition_count: int) -> None:
self.kafka.create_partition(topic=topic, partitions=partition_count)

async def create_partitions(
self, topic_partitions: Dict[str, NewPartitions], *args, **kwargs
):
self,
topic_partitions: Dict[str, NewPartitions],
*args: Any,
**kwargs: Any,
) -> None:
for topic, partition in topic_partitions.items():
await self._create_partition(
topic=topic, partition_count=partition.total_count
Expand Down
2 changes: 1 addition & 1 deletion mockafka/aiokafka/aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def subscription(self) -> Set[str]:
def unsubscribe(self) -> None:
self.subscribed_topic = []

def _get_key(self, topic, partition) -> str:
def _get_key(self, topic: str, partition: int) -> str:
return f"{topic}*{partition}"

def _fetch_one(self, topic: str, partition: int) -> Optional[ConsumerRecord[bytes, bytes]]:
Expand Down
8 changes: 4 additions & 4 deletions mockafka/aiokafka/aiokafka_producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import asyncio
from typing import Optional
from typing import Any, Optional

from aiokafka.util import create_future # type: ignore[import-untyped]
from typing_extensions import LiteralString, Self
Expand Down Expand Up @@ -39,7 +39,7 @@ class FakeAIOKafkaProducer:
- send_and_wait(): Call send().
"""

def __init__(self, *args, **kwargs) -> None:
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.kafka = KafkaStore()

async def _produce(
Expand Down Expand Up @@ -88,7 +88,7 @@ async def send(
headers=headers,
timestamp_ms=timestamp_ms,
)
future = create_future()
future: asyncio.Future[None] = create_future()
future.set_result(None)
return future

Expand All @@ -99,7 +99,7 @@ async def send_and_wait(
key: Optional[bytes] = None,
partition: int = 0,
timestamp_ms: Optional[int] = None,
headers=None,
headers: Optional[list[tuple[str, Optional[bytes]]]] = None,
) -> None:
future = await self.send(
topic=topic,
Expand Down
6 changes: 3 additions & 3 deletions mockafka/broker_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ class BrokerMetadata(object):
This class is typically not user instantiated.
"""

def __init__(self):
def __init__(self) -> None:
self.id = 1
"""Broker id"""
self.host = "fakebroker"
"""Broker hostname"""
self.port = 9091
"""Broker port"""

def __repr__(self):
def __repr__(self) -> str:
return "BrokerMetadata({}, {}:{})".format(self.id, self.host, self.port)

def __str__(self):
def __str__(self) -> str:
return "{}:{}/{}".format(self.host, self.port, self.id)
4 changes: 2 additions & 2 deletions mockafka/cluster_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __init__(self, topic: str | None = None):
self.orig_broker_id = -1
self.orig_broker_name = None

def __repr__(self):
def __repr__(self) -> str:
return "ClusterMetadata({})".format(self.cluster_id)

def __str__(self):
def __str__(self) -> str:
return str(self.cluster_id)


Expand Down
29 changes: 15 additions & 14 deletions mockafka/kafka_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

from copy import deepcopy
from typing import Any

from confluent_kafka import KafkaException # type: ignore[import-untyped]

Expand All @@ -32,9 +33,9 @@


class SingletonMeta(type):
_instances: dict[type[SingletonMeta], SingletonMeta] = {}
_instances: dict[SingletonMeta, Any] = {}

def __call__(cls, *args, **kwargs):
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
if cls not in cls._instances or "clean" in kwargs.keys():
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
Expand All @@ -49,7 +50,7 @@ class KafkaStore(metaclass=SingletonMeta):
FIRST_OFFSET = "first_offset"
NEXT_OFFSET = "next_offset"

def __init__(self, clean: bool = False):
def __init__(self, clean: bool = False) -> None:
if clean:
mock_topics.clear()
offset_store.clear()
Expand All @@ -70,13 +71,13 @@ def get_number_of_partition(topic: str) -> int:
return len(mock_topics[topic].keys())

@staticmethod
def create_topic(topic: str):
def create_topic(topic: str) -> None:
if mock_topics.get(topic, None) is not None:
raise KafkaException(f"{topic} exist is fake kafka")

mock_topics[topic] = {}

def create_partition(self, topic: str, partitions: int):
def create_partition(self, topic: str, partitions: int) -> None:
if not self.is_topic_exist(topic=topic):
self.create_topic(topic=topic)

Expand All @@ -92,7 +93,7 @@ def create_partition(self, topic: str, partitions: int):
else:
raise KafkaException("can not decrease partition of topic")

def remove_topic(self, topic: str):
def remove_topic(self, topic: str) -> None:
if not self.is_topic_exist(topic=topic):
return

Expand All @@ -103,22 +104,22 @@ def remove_topic(self, topic: str):
if topic in offset_key:
offset_store.pop(offset_key)

def set_first_offset(self, topic: str, partition: int, value: int):
def set_first_offset(self, topic: str, partition: int, value: int) -> None:
offset_store_key = self.get_offset_store_key(topic=topic, partition=partition)
first_offset = self.get_partition_first_offset(topic=topic, partition=partition)
next_offset = self.get_partition_next_offset(topic=topic, partition=partition)

if first_offset < value <= next_offset:
offset_store[offset_store_key][self.FIRST_OFFSET] = value

def _add_next_offset(self, topic: str, partition: int):
def _add_next_offset(self, topic: str, partition: int) -> None:
offset_store_key = self.get_offset_store_key(topic=topic, partition=partition)
offset_store[offset_store_key][self.NEXT_OFFSET] += 1

def get_offset_store_key(self, topic: str, partition: int):
def get_offset_store_key(self, topic: str, partition: int) -> str:
return f"{topic}*{partition}"

def produce(self, message: Message, topic: str, partition: int):
def produce(self, message: Message, topic: str, partition: int) -> None:
if not topic:
return

Expand Down Expand Up @@ -174,15 +175,15 @@ def number_of_message_in_topic(self, topic: str) -> int:

return count_of_messages

def clear_topic_messages(self, topic: str):
def clear_topic_messages(self, topic: str) -> None:
for partition in self.partition_list(topic=topic):
self.clear_partition_messages(topic=topic, partition=partition)

@staticmethod
def clear_partition_messages(topic: str, partition: int):
def clear_partition_messages(topic: str, partition: int) -> None:
mock_topics[topic][partition] = []

def reset_offset(self, topic: str, strategy: str = "latest"):
def reset_offset(self, topic: str, strategy: str = "latest") -> None:
for partition in self.partition_list(topic=topic):
key = self.get_offset_store_key(topic, partition)

Expand All @@ -195,6 +196,6 @@ def reset_offset(self, topic: str, strategy: str = "latest"):
offset_store[key][self.FIRST_OFFSET] = 0

@staticmethod
def fresh():
def fresh() -> None:
mock_topics.clear()
offset_store.clear()
6 changes: 3 additions & 3 deletions mockafka/topic_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ class TopicMetadata(object):
# Sphinx issue where it tries to reference the same instance variable
# on other classes which raises a warning/error.

def __init__(self, topic_name: str, partition_num: Collection[int] = ()):
def __init__(self, topic_name: str, partition_num: Collection[int] = ()) -> None:
self.topic = topic_name
"""Topic name"""
self.partitions = {num: PartitionMetadata(id=num) for num in partition_num}
"""Map of partitions indexed by partition id. Value is a PartitionMetadata object."""
self.error = None
"""Topic error, or None. Value is a KafkaError object."""

def __str__(self):
def __str__(self) -> str:
return self.topic

def __len__(self):
def __len__(self) -> int:
return len(self.partitions)
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ profile = "black"
float_to_top = true

[tool.mypy]
files = ["mockafka", "tests"]

warn_unused_ignores = true

no_implicit_optional = true
Expand Down
4 changes: 0 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,3 @@ per-file-ignores =
mockafka/decorators/typing.py: A005

noqa-require-code = true


[mypy]
enable_error_code = ignore-without-code