Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions control-plane/pkg/reconciler/base/receiver_condition_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err err
return fmt.Errorf("failed to create topic: %s: %w", topic, err)
}

func (manager *StatusConditionManager) IsTopicNotReady() bool {

condition := manager.Object.GetStatus().GetCondition(ConditionTopicReady)

if condition == nil || condition.Status != corev1.ConditionTrue {
return true
}
return false
}

func (manager *StatusConditionManager) TopicReady(topic string) {

if owner, ok := manager.Object.GetStatus().Annotations[TopicOwnerAnnotation]; ok {
Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,11 @@ func (r *Reconciler) reconcileBrokerTopic(ctx context.Context, broker *eventing.
}
}

topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
return "", statusConditionManager.FailedToCreateTopic(topic, err)
if statusConditionManager.IsTopicNotReady() {
topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
return "", statusConditionManager.FailedToCreateTopic(topic, err)
}
Comment on lines +342 to +346
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a huge reduction of "create topic" requests, with this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a way to work around this would be to update the CreateTopicIfDoesntExist function to check if the topic exists before sending a create topic request?

As far as I can tell, the reason for so many requests is that the create topic method doesn't actually check if the topic exists before sending the request, so we could add that logic here:

createTopicError := admin.CreateTopic(topic, &config.TopicDetail, false)
if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists {
return topic, nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the list topics would still send a request to kafka brokers

}
}

Expand Down
13 changes: 8 additions & 5 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,15 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
defer kafkaClusterAdminClient.Close()

// create the topic
topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
return statusConditionManager.FailedToCreateTopic(topic, err)
var topic string
if statusConditionManager.IsTopicNotReady() {
topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
return statusConditionManager.FailedToCreateTopic(topic, err)
}
logger.Debug("Topic created", zap.Any("topic", topic))
statusConditionManager.TopicReady(topic)
}
logger.Debug("Topic created", zap.Any("topic", topic))
statusConditionManager.TopicReady(topic)

// Get data plane config map.
contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

topicConfig := topicConfigFromSinkSpec(&ks.Spec)

topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, ks.Spec.Topic, topicConfig)
if err != nil {
return statusConditionManager.FailedToCreateTopic(topic, err)
if statusConditionManager.IsTopicNotReady() {
topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, ks.Spec.Topic, topicConfig)
if err != nil {
return statusConditionManager.FailedToCreateTopic(topic, err)
}
}
} else {

Expand Down
Loading