Skip to content

Commit 4a77c6e

Browse files
committed
Kafka: Expand default_msg_processor into a miniature decoding unit
- Accept a bunch of decoding options per `KafkaDecodingOptions` - Provide a bunch of output formatting options per `KafkaEvent` - Tie both elements together using `KafkaEventProcessor` The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots, a selection mechanism to limit the output to specific fields only, and a small projection mechanism to optionally drill down into a specific field. In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. The output formatter provides three different variants out of the box. More variants can be added in the future, as other users or use cases may have different requirements in the same area. Most importantly, the decoding unit is very compact, so relevant tasks don't need a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.
1 parent 442b0ab commit 4a77c6e

File tree

7 files changed

+238
-41
lines changed

7 files changed

+238
-41
lines changed

ingestr/src/kafka/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
from .helpers import (
1818
KafkaCredentials,
19+
KafkaEventProcessor,
1920
OffsetTracker,
20-
default_msg_processor,
2121
)
2222

2323

@@ -29,9 +29,7 @@
2929
def kafka_consumer(
3030
topics: Union[str, List[str]],
3131
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
32-
msg_processor: Optional[
33-
Callable[[Message], Dict[str, Any]]
34-
] = default_msg_processor,
32+
msg_processor: Optional[Callable[[Message], Dict[str, Any]]] = None,
3533
batch_size: Optional[int] = 3000,
3634
batch_timeout: Optional[int] = 3,
3735
start_from: Optional[TAnyDateTime] = None,
@@ -60,6 +58,8 @@ def kafka_consumer(
6058
Yields:
6159
Iterable[TDataItem]: Kafka messages.
6260
"""
61+
msg_processor = msg_processor or KafkaEventProcessor().process
62+
6363
if not isinstance(topics, list):
6464
topics = [topics]
6565

ingestr/src/kafka/helpers.py

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,86 @@
11
from typing import Any, Dict, List, Optional
22

3+
import orjson
34
from confluent_kafka import Consumer, Message, TopicPartition # type: ignore
45
from confluent_kafka.admin import TopicMetadata # type: ignore
56
from dlt import config
67
from dlt.common import pendulum
78
from dlt.common.configuration import configspec
89
from dlt.common.configuration.specs import CredentialsConfiguration
9-
from dlt.common.time import ensure_pendulum_datetime
1010
from dlt.common.typing import DictStrAny, TSecretValue
11-
from dlt.common.utils import digest128
1211

12+
from ingestr.src.kafka.model import KafkaDecodingOptions, KafkaEvent
1313

14-
def default_msg_processor(msg: Message) -> Dict[str, Any]:
15-
"""Basic Kafka message processor.
1614

17-
Returns the message value and metadata. Timestamp consists of two values:
18-
(type of the timestamp, timestamp). Type represents one of the Python
19-
Kafka constants:
20-
TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker.
21-
TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time).
22-
TIMESTAMP_LOG_APPEND_TIME - Broker receive time.
23-
24-
Args:
25-
msg (confluent_kafka.Message): A single Kafka message.
15+
class KafkaEventProcessor:
16+
"""
17+
A processor for Kafka events with processing stages and configuration capabilities.
2618
27-
Returns:
28-
dict: Processed Kafka message.
19+
It cycles through "decode", "deserialize" and "format".
2920
"""
30-
ts = msg.timestamp()
31-
topic = msg.topic()
32-
partition = msg.partition()
33-
key = msg.key()
34-
if key is not None:
35-
key = key.decode("utf-8")
36-
37-
return {
38-
"_kafka": {
39-
"partition": partition,
40-
"topic": topic,
41-
"key": key,
42-
"offset": msg.offset(),
43-
"ts": {
44-
"type": ts[0],
45-
"value": ensure_pendulum_datetime(ts[1] / 1e3),
46-
},
47-
"data": msg.value().decode("utf-8"),
48-
},
49-
"_kafka_msg_id": digest128(topic + str(partition) + str(key)),
50-
}
21+
22+
def __init__(self, options: Optional[KafkaDecodingOptions] = None):
23+
self.options = options or KafkaDecodingOptions()
24+
25+
def process(self, msg: Message) -> Dict[str, Any]:
26+
"""
27+
Progress Kafka event.
28+
29+
Returns the message value and metadata. Timestamp consists of two values:
30+
(type of the timestamp, timestamp). Type represents one of the Python
31+
Kafka constants:
32+
TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker.
33+
TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time).
34+
TIMESTAMP_LOG_APPEND_TIME - Broker receive time.
35+
36+
Args:
37+
msg (confluent_kafka.Message): A single Kafka message.
38+
39+
Returns:
40+
dict: Processed Kafka message.
41+
"""
42+
43+
# Decode.
44+
event = self.decode(msg)
45+
46+
# Deserialize.
47+
self.deserialize(event)
48+
49+
# Format egress message based on input options.
50+
return event.to_dict(self.options)
51+
52+
def decode(self, msg: Message) -> KafkaEvent:
53+
"""
54+
Translate from Confluent library's `Message` instance to `Event` instance.
55+
"""
56+
return KafkaEvent(
57+
ts=msg.timestamp(),
58+
topic=msg.topic(),
59+
partition=msg.partition(),
60+
offset=msg.offset(),
61+
key=msg.key(),
62+
value=msg.value(),
63+
)
64+
65+
def deserialize(self, event: KafkaEvent) -> KafkaEvent:
66+
"""
67+
Deserialize event key and value according to decoding options.
68+
"""
69+
if self.options.key_type is not None:
70+
if self.options.key_type == "json":
71+
event.key = orjson.loads(event.key)
72+
else:
73+
raise NotImplementedError(
74+
f"Unknown key type: {self.options.value_type}"
75+
)
76+
if self.options.value_type is not None:
77+
if self.options.value_type == "json":
78+
event.value = orjson.loads(event.value)
79+
else:
80+
raise NotImplementedError(
81+
f"Unknown value type: {self.options.value_type}"
82+
)
83+
return event
5184

5285

5386
class OffsetTracker(dict): # type: ignore

ingestr/src/kafka/model.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from typing import List, Optional, Tuple, Union
2+
3+
import toolz # type: ignore[import-untyped]
4+
from attrs import define
5+
from dlt.common.time import ensure_pendulum_datetime
6+
from dlt.common.utils import digest128
7+
8+
9+
@define
10+
class KafkaDecodingOptions:
11+
"""
12+
Options that control decoding of the Kafka event.
13+
"""
14+
15+
key_type: Optional[str] = None
16+
value_type: Optional[str] = None
17+
format: Optional[str] = None
18+
include: Optional[str] = None
19+
select: Optional[str] = None
20+
21+
def __attrs_post_init__(self):
22+
if self.format is None:
23+
self.format = "standard_v1"
24+
25+
@classmethod
26+
def from_params(
27+
cls,
28+
key_type: List[str],
29+
value_type: List[str],
30+
format: List[str],
31+
include: List[str],
32+
select: List[str],
33+
):
34+
output_format = None
35+
include_fields = None
36+
select_field = None
37+
if format:
38+
output_format = format[0]
39+
else:
40+
output_format = "standard_v1"
41+
if include:
42+
output_format = "flexible"
43+
include_fields = toolz.apply(str.strip, include[0].split(","))
44+
if select:
45+
output_format = "flexible"
46+
select_field = select[0]
47+
return cls(
48+
key_type=key_type and key_type[0] or None,
49+
value_type=value_type and value_type[0] or None,
50+
format=output_format,
51+
include=include_fields,
52+
select=select_field,
53+
)
54+
55+
56+
@define
57+
class KafkaEvent:
58+
"""
59+
Manage details of a typical Kafka event/message/record.
60+
61+
https://kafka.apache.org/intro#intro_concepts_and_terms
62+
"""
63+
64+
ts: Tuple[int, int]
65+
topic: str
66+
partition: int
67+
offset: int
68+
key: Union[bytes, str]
69+
value: Union[bytes, str]
70+
71+
def decode_text(self):
72+
if self.key is not None and isinstance(self.key, bytes):
73+
self.key = self.key.decode("utf-8")
74+
if self.value is not None and isinstance(self.value, bytes):
75+
self.value = self.value.decode("utf-8")
76+
77+
def to_dict(self, options: KafkaDecodingOptions):
78+
# TODO: Make decoding from text optional.
79+
self.decode_text()
80+
81+
message_id = digest128(self.topic + str(self.partition) + str(self.key))
82+
83+
# The standard message layout as defined per dlt and ingestr.
84+
standard_payload = {
85+
"partition": self.partition,
86+
"topic": self.topic,
87+
"key": self.key,
88+
"offset": self.offset,
89+
"ts": {
90+
"type": self.ts[0],
91+
"value": ensure_pendulum_datetime(self.ts[1] / 1e3),
92+
},
93+
"data": self.value,
94+
}
95+
96+
# Basic Kafka message processors providing two formats.
97+
# Returns the message value and metadata.
98+
# The legacy format `standard_v1` uses the field `_kafka_msg_id`,
99+
# while the future `standard_v2` format uses `_kafka__msg_id`,
100+
# better aligned with all the other fields.
101+
#
102+
# Currently, as of July 2025, `standard_v1` is used as the
103+
# default to not cause any breaking changes.
104+
105+
if options.format == "standard_v1":
106+
UserWarning(
107+
"Future versions of ingestr will use the `standard_v2` output format. "
108+
"To retain compatibility, make sure to start using `format=standard_v1` early."
109+
)
110+
return {
111+
"_kafka": standard_payload,
112+
"_kafka_msg_id": message_id,
113+
}
114+
115+
if options.format == "standard_v2":
116+
standard_payload["msg_id"] = message_id
117+
return {
118+
"_kafka": standard_payload,
119+
}
120+
121+
# Slightly advanced Kafka message processor providing basic means of projections.
122+
# include: A list of column names to include.
123+
# select: A single column name to select and drill down into.
124+
if options.format == "flexible":
125+
# TODO: Refactor by caching preparation steps.
126+
if options.include:
127+
include_keys = [
128+
key == "value" and "data" or key for key in options.include
129+
]
130+
return toolz.keyfilter(lambda k: k in include_keys, standard_payload)
131+
if options.select:
132+
# TODO: Instead of a simple dictionary getter, `jsonpointer` or `jqlang`
133+
# can provide easy access to deeper levels of nested data structures.
134+
key = options.select.replace("value", "data")
135+
return standard_payload.get(key)
136+
return standard_payload
137+
138+
raise NotImplementedError(f"Unknown message processor format: {options.format}")

ingestr/src/sources.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,13 @@ def dlt_source(self, uri: str, table: str, **kwargs):
11801180
batch_size = source_params.get("batch_size", [3000])
11811181
batch_timeout = source_params.get("batch_timeout", [3])
11821182

1183+
# Decoding options.
1184+
key_type = source_params.get("key_type", [])
1185+
value_type = source_params.get("value_type", [])
1186+
format = source_params.get("format", [])
1187+
include = source_params.get("include", [])
1188+
select = source_params.get("select", [])
1189+
11831190
if not bootstrap_servers:
11841191
raise ValueError(
11851192
"bootstrap_servers in the URI is required to connect to kafka"
@@ -1189,8 +1196,21 @@ def dlt_source(self, uri: str, table: str, **kwargs):
11891196
raise ValueError("group_id in the URI is required to connect to kafka")
11901197

11911198
start_date = kwargs.get("interval_start")
1199+
11921200
from ingestr.src.kafka import kafka_consumer
1193-
from ingestr.src.kafka.helpers import KafkaCredentials
1201+
from ingestr.src.kafka.helpers import (
1202+
KafkaCredentials,
1203+
KafkaEventProcessor,
1204+
)
1205+
from ingestr.src.kafka.model import KafkaDecodingOptions
1206+
1207+
options = KafkaDecodingOptions.from_params(
1208+
key_type=key_type,
1209+
value_type=value_type,
1210+
format=format,
1211+
include=include,
1212+
select=select,
1213+
)
11941214

11951215
return kafka_consumer(
11961216
topics=[table],
@@ -1206,6 +1226,7 @@ def dlt_source(self, uri: str, table: str, **kwargs):
12061226
sasl_username=sasl_username[0] if len(sasl_username) > 0 else None, # type: ignore
12071227
sasl_password=sasl_password[0] if len(sasl_password) > 0 else None, # type: ignore
12081228
),
1229+
msg_processor=KafkaEventProcessor(options=options).process,
12091230
start_from=start_date,
12101231
batch_size=int(batch_size[0]),
12111232
batch_timeout=int(batch_timeout[0]),

requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,4 @@ google-cloud-spanner==3.54.0
5656
smartsheet-python-sdk==3.0.5
5757
paramiko==3.5.1
5858
python-quickbooks==0.9.2
59+
toolz==1.0.0

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,8 @@ tomlkit==0.13.2
579579
# via
580580
# dlt
581581
# snowflake-connector-python
582+
toolz==1.0.0
583+
# via -r requirements.in
582584
tqdm==4.67.1
583585
# via -r requirements.in
584586
typer==0.13.1

requirements_arm64.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,8 @@ tomlkit==0.13.2
574574
# via
575575
# dlt
576576
# snowflake-connector-python
577+
toolz==1.0.0
578+
# via -r requirements.in
577579
tqdm==4.67.1
578580
# via -r requirements.in
579581
typer==0.13.1

0 commit comments

Comments
 (0)