Skip to content

Commit c398311

Browse files
authored
Update kafka-go settings to be compatible to latest Kafka Helm Charts (#459)
* Update kafka-go settings to be compatible to latest Kafka Helm Charts Signed-off-by: Arrobo, Gabriel <[email protected]> * Address Copilot''s comments Signed-off-by: Arrobo, Gabriel <[email protected]> --------- Signed-off-by: Arrobo, Gabriel <[email protected]>
1 parent e39a827 commit c398311

File tree

2 files changed

+23
-24
lines changed

2 files changed

+23
-24
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.0.1-dev
1+
3.0.1

metrics/kafka.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0
44

5-
// Kafka metric Producer
65
package metrics
76

87
import (
@@ -25,35 +24,38 @@ var StatWriter Writer
2524

2625
func InitialiseKafkaStream(config *factory.Configuration) error {
2726
if !*config.KafkaInfo.EnableKafka {
28-
logger.KafkaLog.Info("Kafka disabled")
27+
logger.KafkaLog.Warnln("Kafka is disabled")
2928
return nil
3029
}
3130

32-
brokerUrl := "sd-core-kafka-headless:9092"
31+
brokerUrl := "kafka:9092"
3332
topicName := "sdcore-data-source-smf"
3433

3534
if config.KafkaInfo.BrokerUri != "" && config.KafkaInfo.BrokerPort != 0 {
3635
brokerUrl = fmt.Sprintf("%s:%d", config.KafkaInfo.BrokerUri, config.KafkaInfo.BrokerPort)
3736
}
3837

39-
logger.KafkaLog.Debugf("initialise kafka broker url [%v]", brokerUrl)
38+
logger.KafkaLog.Debugf("initialise kafka broker url: %s", brokerUrl)
4039

4140
if config.KafkaInfo.Topic != "" {
4241
topicName = config.KafkaInfo.Topic
4342
}
4443

45-
logger.KafkaLog.Debugf("initialise kafka Topic [%v]", config.KafkaInfo.Topic)
44+
logger.KafkaLog.Debugf("initialise kafka Topic: %s", topicName)
4645

4746
producer := kafka.Writer{
48-
Addr: kafka.TCP(brokerUrl),
49-
Topic: topicName,
50-
Balancer: &kafka.LeastBytes{},
51-
BatchTimeout: 10 * time.Millisecond,
47+
Addr: kafka.TCP(brokerUrl),
48+
Topic: topicName,
49+
AllowAutoTopicCreation: true,
50+
Balancer: &kafka.LeastBytes{},
51+
BatchTimeout: 10 * time.Millisecond,
5252
}
5353

5454
StatWriter = Writer{
5555
kafkaWriter: &producer,
5656
}
57+
58+
logger.KafkaLog.Debugf("initialising kafka stream with url[%s], topic[%s]", brokerUrl, topicName)
5759
return nil
5860
}
5961

@@ -67,26 +69,25 @@ func (writer Writer) SendMessage(message []byte) error {
6769
}
6870
msg := kafka.Message{Value: message}
6971
if err := writer.kafkaWriter.WriteMessages(context.Background(), msg); err != nil {
70-
logger.KafkaLog.Errorf("kafka send message write error: [%v] ", err.Error())
72+
logger.KafkaLog.Errorf("kafka send message write error: %s", err.Error())
7173
return err
7274
}
7375
return nil
7476
}
7577

76-
// caller should make sure kafka is enabled.
7778
func (writer Writer) PublishPduSessEvent(ctxt mi.CoreSubscriber, op mi.SubscriberOp) error {
7879
smKafkaEvt := mi.MetricEvent{
7980
EventType: mi.CSubscriberEvt,
8081
SubscriberData: mi.CoreSubscriberData{Subscriber: ctxt, Operation: op},
8182
}
8283
if msg, err := json.Marshal(smKafkaEvt); err != nil {
83-
logger.KafkaLog.Errorf("publishing pdu sess event marshal error [%v] ", err.Error())
84+
logger.KafkaLog.Errorf("publishing pdu sess event marshal error: %s", err.Error())
8485
return err
8586
} else {
86-
logger.KafkaLog.Debugf("publishing pdu sess event[%s] ", msg)
87+
logger.KafkaLog.Debugf("publishing pdu sess event[%s]", string(msg))
8788
err := StatWriter.SendMessage(msg)
8889
if err != nil {
89-
logger.KafkaLog.Errorf("publishing pdu sess event error [%v] ", err.Error())
90+
logger.KafkaLog.Errorf("publishing pdu sess event error: %s", err.Error())
9091
}
9192
}
9293
return nil
@@ -105,28 +106,26 @@ func PublishMsgEvent(msgType mi.SmfMsgType) error {
105106
}
106107
smKafkaMsgEvt := mi.MetricEvent{EventType: mi.CMsgTypeEvt, MsgType: mi.CoreMsgType{MsgType: msgType.String(), SourceNfId: nfInstanceId}}
107108
if msg, err := json.Marshal(smKafkaMsgEvt); err != nil {
108-
logger.KafkaLog.Errorf("publishing msg event marshal error [%v] ", err.Error())
109+
logger.KafkaLog.Errorf("publishing msg event marshal error: %s", err.Error())
109110
return err
110111
} else {
111-
logger.KafkaLog.Debugf("publishing msg event[%s] ", msg)
112+
logger.KafkaLog.Debugf("publishing msg event: %s", string(msg))
112113
err := StatWriter.SendMessage(msg)
113114
if err != nil {
114-
logger.KafkaLog.Errorf("publishing msg event error [%v] ", err.Error())
115+
logger.KafkaLog.Errorf("publishing msg event error: %s", err.Error())
115116
}
116117
}
117118
return nil
118119
}
119120

120-
// caller should make sure kafka is enabled.
121121
func (writer Writer) PublishNfStatusEvent(msgEvent mi.MetricEvent) error {
122122
if msg, err := json.Marshal(msgEvent); err != nil {
123-
logger.KafkaLog.Errorf("publishing nf status marshal error [%v] ", err.Error())
123+
logger.KafkaLog.Errorf("publishing nf status marshal error: %s", err.Error())
124124
return err
125125
} else {
126-
logger.KafkaLog.Debugf("publishing nf status event[%s] ", msg)
127-
err := StatWriter.SendMessage(msg)
128-
if err != nil {
129-
logger.KafkaLog.Errorf("publishing nf status event error [%v] ", err.Error())
126+
logger.KafkaLog.Debugf("publishing nf status event: %s", string(msg))
127+
if err := StatWriter.SendMessage(msg); err != nil {
128+
logger.KafkaLog.Errorf("publishing nf status event error: %s", err.Error())
130129
}
131130
}
132131
return nil

0 commit comments

Comments
 (0)