-
-
Notifications
You must be signed in to change notification settings - Fork 65
Closed
Description
Requirement:
Forward messages to the corresponding topic based on the value of the message header tag field.
my_topic ==> dev_my_topic
echo "hello there" | kcat -b localhost:9192 -P -t my_topic -H "tag=dev" -d all
code
@Override
public CompletionStage<RequestFilterResult> onProduceRequest(short apiVersion, RequestHeaderData header, ProduceRequestData request, FilterContext context) {
if (!config.isEnabled()) {
return context.forwardRequest(header, request);
}
request.topicData().forEach(topic -> setTopicName(context, topic::setName, topic));
return context.forwardRequest(header, request);
}
@Override
public CompletionStage<ResponseFilterResult> onProduceResponse(short apiVersion, ResponseHeaderData header, ProduceResponseData response,
FilterContext context) {
response.responses().forEach(topic -> reductionTopicName(context, topic::setName, topic));
return context.forwardResponse(header, response);
}
The problem is that
kcat keeps sending messages in a loop, but dev_my_topic has no messages. What else needs to be done to forward messages to dev_my_topic
Metadata
Metadata
Assignees
Labels
No labels