Skip to content

support per-record observations in batch listeners #3944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,28 @@ The `record` property in both observation contexts contains the `ConsumerRecord`
The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into ``KafkaTemplate``s and listener containers.
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.

[[batch-listener-obs]]
=== Batch Listener Observations

When using a batch listener, by default, no observations are created, even if a `ObservationRegistry` is present.
This is because the scope of an observation is tied to the thread, and with a batch listener, there is no one-to-one mapping between an observation and a record.

To enable per-record observations in a batch listener, set the container factory property `recordObservationsInBatch` to `true`.

[source,java]
----
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {

ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setRecordObservationsInBatch(true);
return factory;
}
----

When this property is `true`, an observation will be created for each record in the batch, but the observation is not propagated to the listener method.
This allows you to have visibility into the processing of each record, even within a batch context.
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba

The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].

[[x40-batch-observability]]
=== Per-Record Observation in Batch Listeners

It is now possible to get an observation for each record when using a batch listener.
See xref:kafka/micrometer.adoc#batch-listener-obs[Observability for Batch Listeners] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ public enum EOSMode {

private boolean restartAfterAuthExceptions;

private boolean recordObservationsInBatch;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
}

/**
* When true, and a batch listener is configured with observation enabled, an observation
* will be started for each record in the batch.
* @return recordObservationsInBatch.
* @since 4.0
*/
public boolean isRecordObservationsInBatch() {
return this.recordObservationsInBatch;
}

/**
* Set whether to enable individual record observations in a batch.
* When true, and a batch listener is configured with observation enabled, an observation
* will be started for each record in the batch. Default false.
* @param recordObservationsInBatch true to enable individual record observations.
* @since 4.0
*/
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
this.recordObservationsInBatch = recordObservationsInBatch;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand Down Expand Up @@ -1141,6 +1164,7 @@ public String toString() {
? "\n observationRegistry=" + this.observationRegistry
: "")
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
+ "\n]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
this.observationEnabled = false;
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
Expand Down Expand Up @@ -2423,6 +2423,21 @@ private void ackBatch(final ConsumerRecords<K, V> records) throws InterruptedExc
}
}

private void invokeBatchWithIndividualRecordObservation(List<ConsumerRecord<K, V>> recordList) {
// Create individual observations for each record without scopes
for (ConsumerRecord<K, V> record : recordList) {
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
this::clusterId),
this.observationRegistry);
observation.observe(() -> {
this.logger.debug(() -> "Observing record in batch: " + KafkaUtils.format(record));
});
}
}

private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
List<ConsumerRecord<K, V>> recordListArg) {

Expand All @@ -2443,7 +2458,13 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
}
}
Object sample = startMicrometerSample();


try {
if (this.observationEnabled) {
invokeBatchWithIndividualRecordObservation(recordList);
}

if (this.wantsFullRecords) {
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
this.isAnyManualAck
Expand Down
Loading