diff --git a/control-plane/pkg/reconciler/base/receiver_condition_set.go b/control-plane/pkg/reconciler/base/receiver_condition_set.go index 4091d27f31..910855a960 100644 --- a/control-plane/pkg/reconciler/base/receiver_condition_set.go +++ b/control-plane/pkg/reconciler/base/receiver_condition_set.go @@ -166,6 +166,16 @@ func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err err return fmt.Errorf("failed to create topic: %s: %w", topic, err) } +func (manager *StatusConditionManager) IsTopicNotReady() bool { + + condition := manager.Object.GetStatus().GetCondition(ConditionTopicReady) + + if condition == nil || condition.Status != corev1.ConditionTrue { + return true + } + return false +} + func (manager *StatusConditionManager) TopicReady(topic string) { if owner, ok := manager.Object.GetStatus().Annotations[TopicOwnerAnnotation]; ok { diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 8c582f8961..ab512dc0da 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -339,9 +339,11 @@ func (r *Reconciler) reconcileBrokerTopic(ctx context.Context, broker *eventing. } } - topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) - if err != nil { - return "", statusConditionManager.FailedToCreateTopic(topic, err) + if statusConditionManager.IsTopicNotReady() { + topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) + if err != nil { + return "", statusConditionManager.FailedToCreateTopic(topic, err) + } } } diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 1ea73f1510..c57f7911b7 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -200,12 +200,15 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta defer kafkaClusterAdminClient.Close() // create the topic - topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) - if err != nil { - return statusConditionManager.FailedToCreateTopic(topic, err) + var topic string + if statusConditionManager.IsTopicNotReady() { + topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) + if err != nil { + return statusConditionManager.FailedToCreateTopic(topic, err) + } + logger.Debug("Topic created", zap.Any("topic", topic)) + statusConditionManager.TopicReady(topic) } - logger.Debug("Topic created", zap.Any("topic", topic)) - statusConditionManager.TopicReady(topic) // Get data plane config map. contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx) diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 825dc00906..c80c378063 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -138,9 +138,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) topicConfig := topicConfigFromSinkSpec(&ks.Spec) - topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, ks.Spec.Topic, topicConfig) - if err != nil { - return statusConditionManager.FailedToCreateTopic(topic, err) + if statusConditionManager.IsTopicNotReady() { + topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, ks.Spec.Topic, topicConfig) + if err != nil { + return statusConditionManager.FailedToCreateTopic(topic, err) + } } } else {