Skip to content

Conversation

@amotl
Copy link
Contributor

@amotl amotl commented Jul 13, 2025

Pitch

Loading data from Kafka into a database destination works well, but we found there are no options to specifically decode and break out the Kafka event value properly, in order to only relay that into the target database, without any metadata information.

Observation

For example, Kafka Connect provides such configuration options for similar use cases which are fragments hereof.

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonIotaConverter",

Solution

This patch slightly builds upon and expands the existing default_msg_processor implementation to accept a few more options which resolve our problem.

Details

  • Accept a bunch of decoding options per KafkaDecodingOptions
  • Provide a bunch of output formatting options per KafkaEvent
  • Tie both elements together using KafkaEventProcessor

The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots (key_type and value_type), a selection mechanism to limit the output to specific fields only (include), a small projection mechanism to optionally drill down into a specific field (select), and an option to select the output format (format).

In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. Currently, the output formatter provides three different variants out of the box (standard_v1, standard_v2, flexible) 1. More variants can be added in the future, as other users or use cases may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks do NOT require a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.

Preview

uv pip install --upgrade 'ingestr @ git+https://github.com/crate-workbench/ingestr.git@kafka-decoder'

Example

docker run --rm --name=kafka \
  --publish=9092:9092 docker.io/apache/kafka:latest
echo '{"sensor_id":1,"ts":"2025-06-01 10:00","reading":42.42}' | \
  kcat -P -b localhost -t demo
echo '{"sensor_id":2,"ts":"2025-06-01 11:00","reading":451.00}' | \
  kcat -P -b localhost -t demo
ingestr ingest --yes \
  --source-uri "kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value" \
  --source-table "demo" \
  --dest-uri "duckdb:///kafka.duckdb" \
  --dest-table "demo.kafka"
duckdb kafka.duckdb 'SELECT * FROM demo.kafka WHERE sensor_id>1;'

Backlog

  • Add software tests for non-standard decoding and output formatting options
  • Add docs and improve inline comments

Footnotes

  1. The standard_v2 output format is intended to resolve Naming things: Rename _kafka_msg_id to _kafka__msg_id #289.

@amotl amotl force-pushed the kafka-decoder branch 4 times, most recently from bee01aa to 7e91ef5 Compare July 14, 2025 22:34
@amotl amotl marked this pull request as ready for review July 14, 2025 22:51
```
```sh
ingestr ingest \
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would the same work without the select but the format set to flexible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using select implicitly sets the format to flexible.

Comment on lines +35 to +36
When using the `include` or `select` option, the decoder will automatically
select the `flexible` output format.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karakanb: Does this statement help on the question you had about the flexible output format?

Comment on lines +126 to +134
if options.format == "standard_v1":
UserWarning(
"Future versions of ingestr will use the `standard_v2` output format. "
"To retain compatibility, make sure to start using `format=standard_v1` early."
)
return {
"_kafka": standard_payload,
"_kafka_msg_id": message_id,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section might need further refinement.

a) Is the user warning emitted at all? Let's check that using a test case.
b) What if the user decides to deliberately continue using standard_v1? Can we have a mechanism to suppress the user warning in this case?

- Accept a bunch of decoding options per `KafkaDecodingOptions`
- Provide a bunch of output formatting options per `KafkaEvent`
- Tie both elements together using `KafkaEventProcessor`

The machinery is effectively the same like before, but provides a few
more options to allow type decoding for Kafka event's key/value slots,
a selection mechanism to limit the output to specific fields only, and
a small projection mechanism to optionally drill down into a specific
field.

In combination, those decoding options allow users to relay
JSON-encoded Kafka event values directly into a destination table,
without any metadata wrappings.

The output formatter provides three different variants out of the box.
More variants can be added in the future, as other users or use cases
may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks
don't need a corresponding transformation unit down the pipeline, to
keep the whole ensemble lean, in the very spirit of ingestr.
@amotl amotl requested a review from karakanb August 3, 2025 21:05
- `value_type`: The data type of the Kafka event `value_type` field. Possible values: `json`.
- `format`: The output format/layout. Possible values: `standard_v1`, `standard_v2`, `flexible`.
- `include`: Which fields to include in the output, comma-separated.
- `select`: Which field to select (pick) into the output.
Copy link
Contributor Author

@amotl amotl Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: Add a remark to the select explanation, a compressed version of:

Always use select=value to select the value [sic!] of the Kafka event. This has been chosen deliberately to adhere to the Kafka jargon, despite ingestr's internal event layout relays this field as data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants