Skip to content

Commit e89c0b0

Browse files
mateusjungesgithub-actions[bot]
authored andcommitted
Fix styling
1 parent 50fef9a commit e89c0b0

File tree

5 files changed

+57
-38
lines changed

5 files changed

+57
-38
lines changed

src/Consumers/Consumer.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
use RdKafka\KafkaConsumer;
3131
use RdKafka\Message;
3232
use RdKafka\Producer as KafkaProducer;
33-
use RdKafka\TopicPartition;
3433
use Throwable;
3534

3635
class Consumer implements MessageConsumer
@@ -193,6 +192,7 @@ public function commit(mixed $messageOrOffsets = null): void
193192
} catch (\Throwable $throwable) {
194193
if ($throwable->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) {
195194
$this->logger->error($messageOrOffsets, $throwable, 'COMMIT_ERROR');
195+
196196
throw $throwable;
197197
}
198198
}
@@ -206,6 +206,7 @@ public function commitAsync(mixed $message_or_offsets = null): void
206206
} catch (\Throwable $throwable) {
207207
if ($throwable->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) {
208208
$this->logger->error($message_or_offsets, $throwable, 'COMMIT_ERROR');
209+
209210
throw $throwable;
210211
}
211212
}

src/Contracts/Committer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public function commitDlq(Message $message): void;
1717
*
1818
* @param mixed $messageOrOffsets Can be:
1919
* - null: Commit offsets for current assignment
20-
* - \RdKafka\Message: Commit offset for a single topic+partition
20+
* - \RdKafka\Message: Commit offset for a single topic+partition
2121
* - \Junges\Kafka\Contracts\ConsumerMessage: Commit offset for a single topic+partition
2222
* - array of \RdKafka\TopicPartition: Commit offsets for provided partitions
2323
*/
@@ -28,7 +28,7 @@ public function commit(mixed $messageOrOffsets = null): void;
2828
*
2929
* @param mixed $messageOrOffsets Can be:
3030
* - null: Commit offsets for current assignment
31-
* - \RdKafka\Message: Commit offset for a single topic+partition
31+
* - \RdKafka\Message: Commit offset for a single topic+partition
3232
* - \Junges\Kafka\Contracts\ConsumerMessage: Commit offset for a single topic+partition
3333
* - array of \RdKafka\TopicPartition: Commit offsets for provided partitions
3434
*/

tests/Commit/KafkaCommitterTest.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,14 @@ public function it_allows_manual_commits_in_manual_commit_mode(): void
9898
->withAnyArgs()
9999
->andReturn($message)
100100
->shouldReceive('commit')
101-
->andReturnUsing(function() use (&$commitCalled) {
101+
->andReturnUsing(function () use (&$commitCalled) {
102102
$commitCalled = true;
103+
103104
return null;
104105
})
105106
->getMock();
106107

107-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
108+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
108109
$this->mockProducer();
109110

110111
$handlerCalled = false;
@@ -158,7 +159,7 @@ public function it_disables_auto_commits_in_manual_commit_mode(): void
158159
->never()
159160
->getMock();
160161

161-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
162+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
162163
$this->mockProducer();
163164

164165
$handlerCalled = false;
@@ -209,13 +210,14 @@ public function it_enables_auto_commits_in_auto_commit_mode(): void
209210
->withAnyArgs()
210211
->andReturn($message)
211212
->shouldReceive('commit')
212-
->andReturnUsing(function() use (&$autoCommitCalled) {
213+
->andReturnUsing(function () use (&$autoCommitCalled) {
213214
$autoCommitCalled = true;
215+
214216
return null;
215217
})
216218
->getMock();
217219

218-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
220+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
219221
$this->mockProducer();
220222

221223
$handlerCalled = false;

tests/Consumers/ConsumerTest.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ public function it_provides_consumer_to_handler_classes(): void
511511
->andReturn($message)
512512
->getMock();
513513

514-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
514+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
515515
$this->mockProducer();
516516

517517
$handlerCalled = false;
@@ -523,7 +523,8 @@ public function __construct(
523523
private bool &$handlerCalledRef,
524524
private bool &$consumerProvidedRef,
525525
private mixed &$messageDataRef
526-
) {}
526+
) {
527+
}
527528

528529
public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): void
529530
{
@@ -534,7 +535,7 @@ public function __invoke(ConsumerMessage $message, MessageConsumer $consumer): v
534535
'partition' => $message->getPartition(),
535536
'offset' => $message->getOffset(),
536537
'has_consumer' => $consumer !== null,
537-
'can_commit' => method_exists($consumer, 'commit')
538+
'can_commit' => method_exists($consumer, 'commit'),
538539
];
539540
}
540541
};
@@ -624,7 +625,7 @@ public function make(KafkaConsumer $kafkaConsumer, Config $config): \Junges\Kafk
624625
->andReturn($message)
625626
->getMock();
626627

627-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
628+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
628629
$this->mockProducer();
629630

630631
$handlerCalled = false;

tests/Consumers/ManualCommitTest.php

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,29 @@ public function it_can_commit_manually_with_consumer_message(): void
3939
->withAnyArgs()
4040
->andReturn($message)
4141
->shouldReceive('commit')
42-
->with(m::on(function($topicPartitions) use (&$commitCalled, &$committedOffsets) {
42+
->with(m::on(function ($topicPartitions) use (&$commitCalled, &$committedOffsets) {
4343
$commitCalled = true;
44+
4445
if (is_array($topicPartitions) && count($topicPartitions) === 1) {
4546
$tp = $topicPartitions[0];
47+
4648
if ($tp instanceof TopicPartition) {
4749
$committedOffsets = [
4850
'topic' => $tp->getTopic(),
4951
'partition' => $tp->getPartition(),
50-
'offset' => $tp->getOffset()
52+
'offset' => $tp->getOffset(),
5153
];
54+
5255
return true;
5356
}
5457
}
58+
5559
return false;
5660
}))
5761
->andReturn()
5862
->getMock();
5963

60-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
64+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
6165
$this->mockProducer();
6266

6367
$handlerCalled = false;
@@ -94,7 +98,7 @@ function (ConsumerMessage $message, Consumer $consumer) use (&$handlerCalled, &$
9498
$this->assertEquals(6, $committedOffsets['offset']); // offset + 1
9599
}
96100

97-
#[Test]
101+
#[Test]
98102
public function it_can_commit_async_with_consumer_message(): void
99103
{
100104
$message = new Message();
@@ -115,15 +119,16 @@ public function it_can_commit_async_with_consumer_message(): void
115119
->withAnyArgs()
116120
->andReturn($message)
117121
->shouldReceive('commitAsync')
118-
->with(m::on(function($topicPartitions) use (&$commitAsyncCalled) {
122+
->with(m::on(function ($topicPartitions) use (&$commitAsyncCalled) {
119123
$commitAsyncCalled = true;
120-
return is_array($topicPartitions) && count($topicPartitions) === 1
124+
125+
return is_array($topicPartitions) && count($topicPartitions) === 1
121126
&& $topicPartitions[0] instanceof TopicPartition;
122127
}))
123128
->andReturn()
124129
->getMock();
125130

126-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
131+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
127132
$this->mockProducer();
128133

129134
$fakeHandler = new CallableConsumer(
@@ -172,13 +177,14 @@ public function it_can_commit_without_parameters(): void
172177
->andReturn($message)
173178
->shouldReceive('commit')
174179
->with(null)
175-
->andReturnUsing(function() use (&$commitCalled) {
180+
->andReturnUsing(function () use (&$commitCalled) {
176181
$commitCalled = true;
182+
177183
return null;
178184
})
179185
->getMock();
180186

181-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
187+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
182188
$this->mockProducer();
183189

184190
$fakeHandler = new CallableConsumer(
@@ -228,15 +234,16 @@ public function it_can_commit_with_rdkafka_message(): void
228234
->withAnyArgs()
229235
->andReturn($message)
230236
->shouldReceive('commit')
231-
->with(m::on(function($msg) use (&$commitCalled, &$committedMessage, $message) {
237+
->with(m::on(function ($msg) use (&$commitCalled, &$committedMessage, $message) {
232238
$commitCalled = true;
233239
$committedMessage = $msg;
240+
234241
return $msg === $message;
235242
}))
236243
->andReturn()
237244
->getMock();
238245

239-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
246+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
240247
$this->mockProducer();
241248

242249
$fakeHandler = new CallableConsumer(
@@ -291,7 +298,7 @@ public function it_does_not_auto_commit_when_manual_commit_is_enabled(): void
291298
->never() // Should never be called for auto-commit
292299
->getMock();
293300

294-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
301+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
295302
$this->mockProducer();
296303

297304
$fakeHandler = new CallableConsumer(
@@ -347,25 +354,29 @@ public function it_converts_consumer_message_to_topic_partition_correctly(): voi
347354
->withAnyArgs()
348355
->andReturn($dummyMessage)
349356
->shouldReceive('commit')
350-
->with(m::on(function($topicPartitions) use (&$commitCalled, &$topicPartitionData) {
357+
->with(m::on(function ($topicPartitions) use (&$commitCalled, &$topicPartitionData) {
351358
$commitCalled = true;
359+
352360
if (is_array($topicPartitions) && count($topicPartitions) === 1) {
353361
$tp = $topicPartitions[0];
362+
354363
if ($tp instanceof TopicPartition) {
355364
$topicPartitionData = [
356365
'topic' => $tp->getTopic(),
357366
'partition' => $tp->getPartition(),
358-
'offset' => $tp->getOffset()
367+
'offset' => $tp->getOffset(),
359368
];
369+
360370
return true;
361371
}
362372
}
373+
363374
return false;
364375
}))
365376
->andReturn()
366377
->getMock();
367378

368-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
379+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
369380
$this->mockProducer();
370381

371382
$fakeHandler = new CallableConsumer(
@@ -420,7 +431,7 @@ public function it_handles_commit_errors_gracefully(): void
420431
->andThrow(new \RdKafka\Exception('Commit failed', RD_KAFKA_RESP_ERR_INVALID_CONFIG))
421432
->getMock();
422433

423-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
434+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
424435
$this->mockProducer();
425436

426437
$fakeHandler = new CallableConsumer(
@@ -475,7 +486,7 @@ public function it_ignores_no_offset_commit_errors(): void
475486
->andThrow(new \RdKafka\Exception('No offset', RD_KAFKA_RESP_ERR__NO_OFFSET))
476487
->getMock();
477488

478-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
489+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
479490
$this->mockProducer();
480491

481492
$fakeHandler = new CallableConsumer(
@@ -529,13 +540,14 @@ public function it_auto_commits_when_auto_commit_is_enabled(): void
529540
->andReturn($message)
530541
->shouldReceive('commit')
531542
->with($message) // Auto-commit should pass the original message
532-
->andReturnUsing(function() use (&$autoCommitCalled) {
543+
->andReturnUsing(function () use (&$autoCommitCalled) {
533544
$autoCommitCalled = true;
545+
534546
return null;
535547
})
536548
->getMock();
537549

538-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
550+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
539551
$this->mockProducer();
540552

541553
$handlerCalled = false;
@@ -588,13 +600,14 @@ public function it_allows_manual_commit_to_override_auto_commit_behavior(): void
588600
->andReturn($message)
589601
->shouldReceive('commit')
590602
->with(m::type('array')) // Manual commit converts to TopicPartition array
591-
->andReturnUsing(function() use (&$manualCommitCalled) {
603+
->andReturnUsing(function () use (&$manualCommitCalled) {
592604
$manualCommitCalled = true;
605+
593606
return null;
594607
})
595608
->getMock();
596609

597-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
610+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
598611
$this->mockProducer();
599612

600613
$handlerCalled = false;
@@ -649,14 +662,15 @@ public function it_handles_handler_exceptions_differently_in_auto_vs_manual_comm
649662
->never()
650663
->getMock();
651664

652-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumerManualCommit);
665+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumerManualCommit);
653666
$this->mockProducer();
654667

655668
$manualCommitHandlerCalled = false;
656669

657670
$fakeHandlerManualCommit = new CallableConsumer(
658671
function (ConsumerMessage $message, Consumer $consumer) use (&$manualCommitHandlerCalled) {
659672
$manualCommitHandlerCalled = true;
673+
660674
throw new \Exception('Processing failed');
661675
},
662676
[]
@@ -704,16 +718,17 @@ public function it_uses_same_commit_infrastructure_for_both_modes(): void
704718
->withAnyArgs()
705719
->andReturn($message, $message)
706720
->shouldReceive('commit')
707-
->andReturnUsing(function($params) use (&$commitCallsLog) {
721+
->andReturnUsing(function ($params) use (&$commitCallsLog) {
708722
$commitCallsLog[] = [
709723
'type' => 'commit',
710-
'params' => $params
724+
'params' => $params,
711725
];
726+
712727
return null;
713728
})
714729
->getMock();
715730

716-
$this->app->bind(KafkaConsumer::class, fn() => $mockedKafkaConsumer);
731+
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
717732
$this->mockProducer();
718733

719734
// Test 1: Auto-commit mode
@@ -763,4 +778,4 @@ function (ConsumerMessage $message, Consumer $consumer) {
763778
$this->assertInstanceOf(Message::class, $commitCallsLog[0]['params']);
764779
$this->assertIsArray($commitCallsLog[1]['params']);
765780
}
766-
}
781+
}

0 commit comments

Comments
 (0)