Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions exporter/kafkaexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,28 @@
SendMessages(msgs []*sarama.ProducerMessage) error
}

type AsyncMessageSender struct {
sarama.AsyncProducer
logger *fflog.FFLogger
}

func (a *AsyncMessageSender) SendMessages(msgs []*sarama.ProducerMessage) error {
for len(msgs) > 0 {
select {
case err := <-a.AsyncProducer.Errors():
a.logger.Warn("Failed to produce message: %w", err)
case a.AsyncProducer.Input() <- msgs[0]:
msgs = msgs[1:]

Check warning on line 33 in exporter/kafkaexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/kafkaexporter/exporter.go#L27-L33

Added lines #L27 - L33 were not covered by tests
}
}
return nil

Check warning on line 36 in exporter/kafkaexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/kafkaexporter/exporter.go#L36

Added line #L36 was not covered by tests
}

// Settings contains Kafka-specific configurations needed for message creation
type Settings struct {
Topic string `json:"topic"`
Addresses []string `json:"addresses"`
Async bool `json:"async"`
*sarama.Config
}

Expand All @@ -41,13 +59,15 @@
// dialer will create the producer. This field is added for dependency injection during testing as sarama
// has the annoying tendency to dial as soon as a producer is created.
dialer func(addrs []string, config *sarama.Config) (MessageSender, error)

logger *fflog.FFLogger

Check failure on line 63 in exporter/kafkaexporter/exporter.go

View workflow job for this annotation

GitHub Actions / Lint

field `logger` is unused (unused)
}

// Export will produce a message to the Kafka topic. The message's value will contain the event encoded in the
// selected format. Messages are published synchronously and will error immediately on failure.
func (e *Exporter) Export(_ context.Context, _ *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
func (e *Exporter) Export(_ context.Context, logger *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
if e.sender == nil {
err := e.initializeProducer()
err := e.initializeProducer(logger)
if err != nil {
return fmt.Errorf("writer: %w", err)
}
Expand Down Expand Up @@ -81,7 +101,7 @@

// initializeProducer runs only once and creates a new producer from the dialer. If the config is not populated a new
// one will be created with sensible defaults.
func (e *Exporter) initializeProducer() error {
func (e *Exporter) initializeProducer(logger *fflog.FFLogger) error {
if e.Settings.Config == nil {
e.Settings.Config = sarama.NewConfig()
e.Settings.Config.Producer.Return.Successes = true // Needs to be true for sync producers
Expand All @@ -90,9 +110,16 @@
if e.dialer == nil {
e.dialer = func(addrs []string, config *sarama.Config) (MessageSender, error) {
// Adapter for the function to comply with the MessageSender interface return
if e.Settings.Async {
asyncProducer, err := sarama.NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
return &AsyncMessageSender{AsyncProducer: asyncProducer, logger: logger}, nil //TODO Close should be called on shutdown
}

Check warning on line 119 in exporter/kafkaexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/kafkaexporter/exporter.go#L118-L119

Added lines #L118 - L119 were not covered by tests
return sarama.NewSyncProducer(addrs, config)
}
}

Check failure on line 122 in exporter/kafkaexporter/exporter.go

View workflow job for this annotation

GitHub Actions / Lint

commentFormatting: put a space between `//` and comment text (gocritic)

var err error
e.sender, err = e.dialer(e.Settings.Addresses, e.Settings.Config)
Expand Down
Loading