Skip to content

Commit 232d9ad

Browse files
authored
Make publishing async by default (#366)
* Make publishing async by default * Fix tests * Update docs * Linting
1 parent 18b144f commit 232d9ad

File tree

9 files changed

+130
-38
lines changed

9 files changed

+130
-38
lines changed

docs/producing-messages/producing-messages.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,24 @@ Kafka::publish('broker')->onTopic('topic-name')
1414
This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
1515
The following lines describes these methods.
1616

17-
If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class:
17+
The default `publish()` method now uses asynchronous publishing for better performance. Messages are queued and flushed when the application terminates:
1818

1919
```php
2020
use Junges\Kafka\Facades\Kafka;
2121

22-
Kafka::asyncPublish('broker')->onTopic('topic-name')
22+
Kafka::publish('broker')->onTopic('topic-name')
2323
```
2424

25-
The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send.
26-
This reduces the overhead when you want to send a lot of messages in your request handlers.
25+
The async producer is a singleton and will only flush messages when the application is shutting down, instead of after each send.
26+
This reduces overhead when you want to send a lot of messages in your request handlers.
27+
28+
If you need immediate message flushing (synchronous publishing), use the `publishSync()` method:
29+
30+
```php
31+
use Junges\Kafka\Facades\Kafka;
32+
33+
Kafka::publishSync('broker')->onTopic('topic-name')
34+
```
2735

2836
```+parse
2937
<x-sponsors.request-sponsor/>
@@ -37,6 +45,6 @@ available on the `Kafka` facade (added in v2.2.0). This method will return a fre
3745
use Junges\Kafka\Facades\Kafka;
3846

3947
Kafka::fresh()
40-
->asyncPublish('broker')
48+
->publish('broker')
4149
->onTopic('topic-name')
4250
```

docs/producing-messages/publishing-to-kafka.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,19 @@ $producer = Kafka::publish('broker')
1818
$producer->send();
1919
```
2020

21-
If you want to send multiple messages, consider using the async producer instead. The default `send` method is recommended for low-throughput systems only, as it
22-
flushes the producer after every message that is sent.
21+
The `publish()` method uses asynchronous publishing for better performance, batching messages and flushing them when the application terminates.
22+
If you need immediate message flushing, use `publishSync()` instead:
23+
24+
```php
25+
use Junges\Kafka\Facades\Kafka;
26+
27+
// For immediate flush (synchronous)
28+
$producer = Kafka::publishSync('broker')
29+
->onTopic('topic')
30+
->withKafkaKey('kafka-key');
31+
32+
$producer->send();
33+
```
2334

2435
```+parse
2536
<x-sponsors.request-sponsor/>

docs/upgrade-guide.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,36 @@ title: Upgrade guide
33
weight: 6
44
---
55

6+
## Upgrade to v3.0 from v2.9
7+
8+
### Breaking Changes
9+
10+
- `publish()` is now asynchronous by default. Messages are queued and flushed when the application terminates for better performance
11+
- Removed `asyncPublish()` and `publishAsync()` methods - use `publish()` for async behavior (default) or `publishSync()` for immediate flushing
12+
- Minimum PHP version raised to 8.3
13+
- Minimum Laravel version raised to 11.0
14+
- **NEW**: Added `publishSync()` method for synchronous message publishing with immediate flush
15+
16+
### Migration Guide
17+
18+
**Before (v2.9):**
19+
```php
20+
// Async publishing
21+
Kafka::asyncPublish()->onTopic('topic')->withBody(['data' => 'value'])->send();
22+
23+
// Sync publishing
24+
Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();
25+
```
26+
27+
**After (v3.0):**
28+
```php
29+
// Async publishing (default behavior)
30+
Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();
31+
32+
// Sync publishing (immediate flush)
33+
Kafka::publishSync()->onTopic('topic')->withBody(['data' => 'value'])->send();
34+
```
35+
636
## Upgrade to v2.9 from v2.8
737

838
- **BREAKING CHANGE**: Deprecated producer batch messages feature has been removed (`MessageBatch`, `sendBatch`, `produceBatch`). Use `Kafka::asyncPublish()` instead for better performance

src/Contracts/Manager.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ interface Manager
77
/** Returns a new fresh instance of the Manager. */
88
public function fresh(): self;
99

10-
/** Creates a new ProducerBuilder instance, setting brokers and topic. */
10+
/** Creates a new async ProducerBuilder instance, setting brokers and topic. */
1111
public function publish(?string $broker = null): MessageProducer;
1212

13+
/** Creates a synchronous ProducerBuilder instance for immediate message flushing. */
14+
public function publishSync(?string $broker = null): MessageProducer;
15+
1316
/** Return a ConsumerBuilder instance. */
1417
public function consumer(array $topics = [], ?string $groupId = null, ?string $brokers = null): ConsumerBuilder;
1518

src/Facades/Kafka.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
/**
1010
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
11-
* @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null)
11+
* @method static \Junges\Kafka\Contracts\MessageProducer publishSync(string $broker = null)
1212
* @method static \Junges\Kafka\Factory fresh(string $broker = null)
1313
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
1414
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)

src/Factory.php

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,44 +28,33 @@ public function publish(?string $broker = null): MessageProducer
2828
return Kafka::fake()->publish($broker);
2929
}
3030

31+
if ($this->builder instanceof ProducerBuilder) {
32+
return $this->builder;
33+
}
34+
3135
return new ProducerBuilder(
32-
broker: $broker ?? config('kafka.brokers')
36+
broker: $broker ?? config('kafka.brokers'),
37+
asyncProducer: true,
3338
);
3439
}
3540

36-
/** Returns a fresh factory instance. */
37-
public function fresh(): self
38-
{
39-
return new self;
40-
}
41-
42-
/**
43-
* Creates a new ProducerBuilder instance, optionally setting the brokers.
44-
* The producer will be flushed only when the application terminates,
45-
* and doing SEND does not mean that the message was flushed!
46-
*/
47-
public function asyncPublish(?string $broker = null): MessageProducer
41+
/** Creates a synchronous ProducerBuilder instance for immediate message flushing. */
42+
public function publishSync(?string $broker = null): MessageProducer
4843
{
4944
if ($this->shouldFake) {
5045
return Kafka::fake()->publish($broker);
5146
}
5247

53-
if ($this->builder instanceof ProducerBuilder) {
54-
return $this->builder;
55-
}
56-
57-
$this->builder = new ProducerBuilder(
48+
return new ProducerBuilder(
5849
broker: $broker ?? config('kafka.brokers'),
59-
asyncProducer: true
50+
asyncProducer: false
6051
);
61-
62-
return $this->builder;
6352
}
6453

65-
/** This is an alias for the asyncPublish method. */
66-
public function publishAsync(?string $broker = null): MessageProducer
54+
/** Returns a fresh factory instance. */
55+
public function fresh(): self
6756
{
68-
return $this->asyncPublish($broker);
57+
return new self;
6958
}
7059

7160
/** Return a ConsumerBuilder instance. */

src/Support/Testing/Fakes/KafkaFake.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function publish(?string $broker = null): ProducerBuilderFake
4646
return $this->makeProducerBuilderFake($broker);
4747
}
4848

49-
public function asyncPublish(?string $broker = null): ProducerBuilderFake
49+
public function publishSync(?string $broker = null): ProducerBuilderFake
5050
{
5151
return $this->publish($broker);
5252
}

tests/KafkaFakeTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public function it_stores_multiple_messages(): void
5858
public function it_stores_multiple_messages_when_publishing_async(): void
5959
{
6060
for ($i = 0; $i < 3; $i++) {
61-
$this->fake->asyncPublish()
61+
$this->fake->publish()
6262
->onTopic('topic')
6363
->withBody('test')
6464
->send();

tests/KafkaTest.php

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,57 @@ public function it_can_publish_messages_to_kafka(): void
5656
$this->assertTrue($test);
5757
}
5858

59+
#[Test]
60+
public function it_can_publish_messages_synchronously(): void
61+
{
62+
Event::fake();
63+
64+
$mockedProducerTopic = m::mock(ProducerTopic::class)
65+
->shouldReceive('producev')->twice()
66+
->andReturn(m::self())
67+
->getMock();
68+
69+
$mockedProducer = m::mock(Producer::class)
70+
->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic)
71+
->shouldReceive('poll')->twice()
72+
->shouldReceive('flush')->twice()
73+
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
74+
->getMock();
75+
76+
$this->app->bind(Producer::class, fn () => $mockedProducer);
77+
78+
$test1 = Kafka::publishSync()
79+
->onTopic('test')
80+
->withConfigOptions([
81+
'metadata.broker.list' => 'broker',
82+
])
83+
->withKafkaKey(Str::uuid()->toString())
84+
->withBodyKey('test', ['test'])
85+
->withHeaders(['custom' => 'header'])
86+
->withDebugEnabled()
87+
->send();
88+
89+
$test2 = Kafka::publishSync()
90+
->onTopic('test')
91+
->withConfigOptions([
92+
'metadata.broker.list' => 'broker',
93+
])
94+
->withKafkaKey(Str::uuid()->toString())
95+
->withBodyKey('test', ['test'])
96+
->withHeaders(['custom' => 'header'])
97+
->withDebugEnabled()
98+
->send();
99+
100+
Event::assertDispatched(MessagePublished::class);
101+
102+
$this->assertTrue($test1);
103+
$this->assertTrue($test2);
104+
105+
Kafka::clearResolvedInstances();
106+
107+
Event::assertDispatched(MessagePublished::class);
108+
}
109+
59110
#[Test]
60111
public function it_can_publish_messages_asynchronously(): void
61112
{
@@ -69,13 +120,13 @@ public function it_can_publish_messages_asynchronously(): void
69120
$mockedProducer = m::mock(Producer::class)
70121
->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic)
71122
->shouldReceive('poll')->twice()
72-
->shouldReceive('flush')->once()
123+
->shouldReceive('flush')->atLeast()->once()
73124
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
74125
->getMock();
75126

76127
$this->app->bind(Producer::class, fn () => $mockedProducer);
77128

78-
$test1 = Kafka::asyncPublish()
129+
$test1 = Kafka::publish()
79130
->onTopic('test')
80131
->withConfigOptions([
81132
'metadata.broker.list' => 'broker',
@@ -86,7 +137,7 @@ public function it_can_publish_messages_asynchronously(): void
86137
->withDebugEnabled()
87138
->send();
88139

89-
$test2 = Kafka::asyncPublish()
140+
$test2 = Kafka::publish()
90141
->onTopic('test')
91142
->withConfigOptions([
92143
'metadata.broker.list' => 'broker',

0 commit comments

Comments
 (0)