diff --git a/README.md b/README.md index 66656aa..88f8a08 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # Laravel PubSub Queue -[![Travis](https://img.shields.io/travis/munir131/laravel-pubsub-queue/5.8?style=for-the-badge)](https://github.com/munir131/laravel-pubsub-queue) -[![StyleCI](https://styleci.io/repos/131718560/shield)](https://styleci.io/repos/131718560) - This package is a Laravel 5.8 queue driver that use the [Google PubSub](https://github.com/GoogleCloudPlatform/google-cloud-php-pubsub) service. ## Installation @@ -10,7 +7,7 @@ This package is a Laravel 5.8 queue driver that use the [Google PubSub](https:// You can easily install this package with [Composer](https://getcomposer.org) by running this command : ```bash -composer require munir131/laravel-pubsub-queue +composer require lde/laravel-pubsub-queue ``` If you disabled package discovery, you can still manually register this package by adding the following line to the providers of your `config/app.php` file : diff --git a/composer.json b/composer.json index 7117dbf..18abab8 100644 --- a/composer.json +++ b/composer.json @@ -1,8 +1,8 @@ { - "name": "munir131/laravel-pubsub-queue", + "name": "lde/laravel-pubsub-queue", "description": "Queue driver for Google Cloud Pub/Sub.", "keywords": [ - "munir131", + "lde", "laravel", "queue", "gcp", diff --git a/src/Jobs/PubSubJob.php b/src/Jobs/PubSubJob.php index cff14b9..d5d0ae4 100644 --- a/src/Jobs/PubSubJob.php +++ b/src/Jobs/PubSubJob.php @@ -130,10 +130,17 @@ public function release($delay = 0) $attempts = $this->attempts(); - $this->pubsub->acknowledgeAndPublish( + // $this->pubsub->acknowledgeAndPublish( + // $this->job, + // $this->queue, + // ['attempts' => $attempts], + // $delay + // ); + + $this->pubsub->republish( $this->job, $this->queue, - ['attempts' => $attempts], + ['attempts' => (string) $attempts], $delay ); } diff --git a/src/PubSubQueue.php b/src/PubSubQueue.php index 8450c56..9367f89 100644 --- a/src/PubSubQueue.php +++ b/src/PubSubQueue.php @@ -146,7 +146,7 @@ public function later($delay, $job, $data = '', $subscriber = null) return $this->pushRaw( $this->createPayload($job, $data), $subscriber, - ['available_at' => (string) $this->availableAt($delay)] + ['available_at' => (string) $this->availableAt($delay)] ); } @@ -166,7 +166,18 @@ public function pop($subscriber = null) 'maxMessages' => 1, ]); - if (!empty($messages) && count($messages) > 0) { + if (empty($messages) || count($messages) < 1) { + return; + } + + $available_at = $messages[0]->attribute('available_at'); + if ($available_at && $available_at > time()) { + return; + } + + $this->acknowledge($messages[0]); + + if (! empty($messages) && count($messages) > 0) { return new PubSubJob( $this->container, $this, @@ -222,6 +233,7 @@ public function acknowledge(Message $message, $queue = null) */ public function acknowledgeAndPublish(Message $message, $subscriberName = null, $options = [], $delay = 0) { + // added republish method - not used for now if (isset($options['attempts'])) { $options['attempts'] = (string) $options['attempts']; } @@ -235,6 +247,28 @@ public function acknowledgeAndPublish(Message $message, $subscriberName = null, 'available_at' => (string) $this->availableAt($delay), ], $options); + + return $topic->publish([ + 'data' => $message->data(), + 'attributes' => $options, + ]); + } + + /** + * Republish a message onto the queue. + * + * @param \Google\Cloud\PubSub\Message $message + * @param string $queue + * @return mixed + */ + public function republish(Message $message, $queue = null, $options = [], $delay = 0) + { + $topic = $this->getTopic($this->getQueue($queue)); + + $options = array_merge([ + 'available_at' => (string) $this->availableAt($delay), + ], $options); + return $topic->publish([ 'data' => $message->data(), 'attributes' => $options,