Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
},
"require-dev": {
"phpunit/phpunit": "^10.5|^11.5.3",
"orchestra/testbench": "^7.16|^8.0|^9.0|^10.0",
"orchestra/testbench": "^9.0|^10.0",
"predis/predis": "^1",
"rector/rector": "^0.19.8",
"rector/rector": "^2.1",
"laravel/pint": "dev-main"
},
"minimum-stability": "dev",
Expand Down
3 changes: 1 addition & 2 deletions rector.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__.'/config',
__DIR__.'/dev',
__DIR__.'/src',
__DIR__.'/tests',
]);
Expand All @@ -17,6 +16,6 @@

// define sets of rules
$rectorConfig->sets([
LevelSetList::UP_TO_PHP_81,
LevelSetList::UP_TO_PHP_83,
]);
};
2 changes: 1 addition & 1 deletion src/Commit/RetryableCommitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class RetryableCommitter implements Committer
{
private const RETRYABLE_ERRORS = [
private const array RETRYABLE_ERRORS = [
RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
Expand Down
8 changes: 4 additions & 4 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

class Config
{
final public const SASL_PLAINTEXT = 'SASL_PLAINTEXT';
final public const string SASL_PLAINTEXT = 'SASL_PLAINTEXT';

final public const SASL_SSL = 'SASL_SSL';
final public const string SASL_SSL = 'SASL_SSL';

final public const PRODUCER_ONLY_CONFIG_OPTIONS = [
final public const array PRODUCER_ONLY_CONFIG_OPTIONS = [
'transactional.id',
'transaction.timeout.ms',
'enable.idempotence',
Expand All @@ -36,7 +36,7 @@ class Config
'sticky.partitioning.linger.ms',
];

final public const CONSUMER_ONLY_CONFIG_OPTIONS = [
final public const array CONSUMER_ONLY_CONFIG_OPTIONS = [
'partition.assignment.strategy',
'session.timeout.ms',
'heartbeat.interval.ms',
Expand Down
14 changes: 7 additions & 7 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@

class Consumer implements MessageConsumer
{
private const IGNORABLE_CONSUMER_ERRORS = [
private const array IGNORABLE_CONSUMER_ERRORS = [
RD_KAFKA_RESP_ERR__PARTITION_EOF,
RD_KAFKA_RESP_ERR__TRANSPORT,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR__TIMED_OUT,
];

private const CONSUME_STOP_EOF_ERRORS = [
private const array CONSUME_STOP_EOF_ERRORS = [
RD_KAFKA_RESP_ERR__PARTITION_EOF,
RD_KAFKA_RESP_ERR__TIMED_OUT,
];

private const TIMEOUT_ERRORS = [
private const array TIMEOUT_ERRORS = [
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
];

private const IGNORABLE_COMMIT_ERRORS = [
private const array IGNORABLE_COMMIT_ERRORS = [
RD_KAFKA_RESP_ERR__NO_OFFSET,
];

Expand All @@ -73,9 +73,9 @@ class Consumer implements MessageConsumer

private bool $stopRequested = false;

private ?Closure $whenStopConsuming;
private readonly ?Closure $whenStopConsuming;

private Dispatcher $dispatcher;
private readonly Dispatcher $dispatcher;

public function __construct(private readonly Config $config, private readonly MessageDeserializer $deserializer, ?CommitterFactory $committerFactory = null)
{
Expand Down Expand Up @@ -390,7 +390,7 @@ private function buildHeadersForDlq(Message $message, ?Throwable $throwable = nu

$throwableHeaders['kafka_throwable_message'] = $throwable->getMessage();
$throwableHeaders['kafka_throwable_code'] = $throwable->getCode();
$throwableHeaders['kafka_throwable_class_name'] = get_class($throwable);
$throwableHeaders['kafka_throwable_class_name'] = $throwable::class;

return array_merge($message->headers ?? [], $throwableHeaders);
}
Expand Down
8 changes: 4 additions & 4 deletions src/Events/CouldNotPublishMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

use Throwable;

final class CouldNotPublishMessage
final readonly class CouldNotPublishMessage
{
public function __construct(
public readonly int $errorCode,
public readonly string $message,
public readonly Throwable $throwable,
public int $errorCode,
public string $message,
public Throwable $throwable,
) {}
}
4 changes: 2 additions & 2 deletions src/Events/MessageConsumed.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

use Junges\Kafka\Contracts\ConsumerMessage;

final class MessageConsumed
final readonly class MessageConsumed
{
public function __construct(
public readonly ConsumerMessage $message
public ConsumerMessage $message
) {}

public function getMessageIdentifier(): string
Expand Down
4 changes: 2 additions & 2 deletions src/Events/MessagePublished.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

use Junges\Kafka\Contracts\ProducerMessage;

final class MessagePublished
final readonly class MessagePublished
{
public function __construct(
public readonly ProducerMessage $message,
public ProducerMessage $message,
) {}

public function getMessageIdentifier(): string
Expand Down
4 changes: 2 additions & 2 deletions src/Events/PublishingMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

use Junges\Kafka\Contracts\ProducerMessage;

final class PublishingMessage
final readonly class PublishingMessage
{
public function __construct(
public readonly ProducerMessage $message,
public ProducerMessage $message,
) {}

public function getMessageIdentifier(): string
Expand Down
2 changes: 1 addition & 1 deletion src/Exceptions/SchemaRegistryException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

class SchemaRegistryException extends LaravelKafkaException
{
final public const SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s';
final public const string SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s';
}
4 changes: 2 additions & 2 deletions src/Exceptions/Serializers/AvroSerializerException.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class AvroSerializerException extends LaravelKafkaException
{
final public const NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s';
final public const string NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s';

final public const UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s';
final public const string UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s';
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

final class TransactionFatalErrorException extends LaravelKafkaException
{
private const FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]';
private const string FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]';

public static function new(KafkaErrorException $baseException): self
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

final class TransactionShouldBeAbortedException extends LaravelKafkaException
{
private const ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]';
private const string ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]';

public static function new(KafkaErrorException $baseException): self
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

final class TransactionShouldBeRetriedException extends LaravelKafkaException
{
private const RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]';
private const string RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]';

public static function new(KafkaErrorException $baseException): self
{
Expand Down
2 changes: 2 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\AbstractMessage;
use Junges\Kafka\Contracts\ProducerMessage;
use Override;

class Message extends AbstractMessage implements Arrayable, ProducerMessage
{
Expand Down Expand Up @@ -81,6 +82,7 @@ public function withHeader(string $key, string|int|float $value): ProducerMessag
return $this;
}

#[Override]
public function getHeaders(): ?array
{
// Here we insert an uuid to be used to uniquely identify this message. If the
Expand Down
2 changes: 2 additions & 0 deletions src/Providers/LaravelKafkaServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Junges\Kafka\Message\Deserializers\JsonDeserializer;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Override;

class LaravelKafkaServiceProvider extends ServiceProvider
{
Expand All @@ -32,6 +33,7 @@ public function boot(): void
}
}

#[Override]
public function register(): void
{
$this->app->bind(MessageSerializer::class, fn () => new JsonSerializer);
Expand Down
3 changes: 3 additions & 0 deletions src/Support/InfiniteTimer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

namespace Junges\Kafka\Support;

use Override;

class InfiniteTimer extends Timer
{
#[Override]
public function isTimedOut(): bool
{
return false;
Expand Down
3 changes: 3 additions & 0 deletions src/Support/Testing/Fakes/BuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
use Junges\Kafka\Consumers\CallableConsumer;
use Junges\Kafka\Contracts\ConsumerBuilder as ConsumerBuilderContract;
use Junges\Kafka\Contracts\MessageConsumer;
use Override;

class BuilderFake extends Builder implements ConsumerBuilderContract
{
/** @var \Junges\Kafka\Contracts\ConsumerMessage[] */
private array $messages = [];

/** {@inheritDoc} */
#[Override]
public static function create(?string $brokers, array $topics = [], ?string $groupId = null): self
{
return new self(
Expand All @@ -32,6 +34,7 @@ public function setMessages(array $messages): self
}

/** Build the Kafka consumer. */
#[Override]
public function build(): MessageConsumer
{
$config = new Config(
Expand Down
8 changes: 2 additions & 6 deletions tests/Commit/KafkaCommitterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ public function it_can_commit(): void
->shouldReceive('commit')->once()
->andReturnSelf();

$this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) {
return $kafkaConsumer->getMock();
});
$this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock());

$config = new Config(
broker: 'broker',
Expand Down Expand Up @@ -54,9 +52,7 @@ public function it_can_commit_to_dlq(): void
->shouldReceive('commit')->once()
->andReturnSelf();

$this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) {
return $kafkaConsumer->getMock();
});
$this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock());

$config = new Config(
broker: 'broker',
Expand Down
2 changes: 1 addition & 1 deletion tests/Commit/RetryableCommitterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function it_should_progressively_wait_for_the_next_retry(): void

try {
$retryableCommitter->commitMessage(new Message, true);
} catch (RdKafkaException $exception) {
} catch (RdKafkaException) {
}

$expectedSleeps = [1e6, 2e6, 4e6, 8e6, 16e6, 32e6];
Expand Down
2 changes: 1 addition & 1 deletion tests/Commit/SeekToCurrentErrorCommitterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ public function it_passes_dlq_commits(): void

$seekToCurrentErrorCommitter = new SeekToCurrentErrorCommitter($mockedKafkaConsumer, $mockedCommitter);

$seekToCurrentErrorCommitter->commitDlq(new Message, true);
$seekToCurrentErrorCommitter->commitDlq(new Message);
}
}
2 changes: 2 additions & 0 deletions tests/Console/Consumers/OptionsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
use Junges\Kafka\Console\Commands\KafkaConsumer\Options;
use Junges\Kafka\Tests\Fakes\FakeHandler;
use Junges\Kafka\Tests\LaravelKafkaTestCase;
use Override;
use PHPUnit\Framework\Attributes\Test;

class OptionsTest extends LaravelKafkaTestCase
{
private array $config;

#[Override]
protected function setUp(): void
{
parent::setUp();
Expand Down
6 changes: 2 additions & 4 deletions tests/Consumers/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public function commitAsync(mixed $messageOrOffsets = null): void
}
};

$customCommitterFactory = new class($customCommitter) implements CommitterFactory
$customCommitterFactory = new readonly class($customCommitter) implements CommitterFactory
{
public function __construct(private \Junges\Kafka\Contracts\Committer $committer) {}

Expand Down Expand Up @@ -701,8 +701,6 @@ private function mockConsumerWithMessageAndPartitions(Message $message, array $p
->andReturn($partitions)
->getMock();

$this->app->bind(KafkaConsumer::class, function () use ($mockedKafkaConsumer) {
return $mockedKafkaConsumer;
});
$this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
}
}
4 changes: 2 additions & 2 deletions tests/Consumers/ManualCommitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public function it_handles_commit_errors_gracefully(): void
function (ConsumerMessage $message, Consumer $consumer) use (&$exceptionThrown) {
try {
$consumer->commit($message);
} catch (Throwable $e) {
} catch (Throwable) {
$exceptionThrown = true;
}
},
Expand Down Expand Up @@ -495,7 +495,7 @@ public function it_ignores_no_offset_commit_errors(): void
function (ConsumerMessage $message, Consumer $consumer) use (&$noExceptionThrown) {
try {
$consumer->commit($message);
} catch (\RdKafka\Exception $e) {
} catch (\RdKafka\Exception) {
$noExceptionThrown = false;
}
},
Expand Down
10 changes: 1 addition & 9 deletions tests/FailingCommitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,13 @@

final class FailingCommitter implements Committer
{
private int $timesToFail;

private Exception $failure;

private int $timesTriedToCommitMessage = 0;

private int $timesTriedToCommitDlq = 0;

private int $commitCount = 0;

public function __construct(Exception $failure, int $timesToFail)
{
$this->failure = $failure;
$this->timesToFail = $timesToFail;
}
public function __construct(private readonly Exception $failure, private readonly int $timesToFail) {}

/**
* @throws Exception
Expand Down
Loading
Loading