From 60280162bb40100edf924e6a1b218a45eb939f9f Mon Sep 17 00:00:00 2001 From: Munir Khakhi Date: Fri, 7 Jul 2023 13:22:45 +0530 Subject: [PATCH 1/5] Moved to lde packages --- README.md | 5 +---- composer.json | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 66656aa..88f8a08 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # Laravel PubSub Queue -[![Travis](https://img.shields.io/travis/munir131/laravel-pubsub-queue/5.8?style=for-the-badge)](https://github.com/munir131/laravel-pubsub-queue) -[![StyleCI](https://styleci.io/repos/131718560/shield)](https://styleci.io/repos/131718560) - This package is a Laravel 5.8 queue driver that use the [Google PubSub](https://github.com/GoogleCloudPlatform/google-cloud-php-pubsub) service. ## Installation @@ -10,7 +7,7 @@ This package is a Laravel 5.8 queue driver that use the [Google PubSub](https:// You can easily install this package with [Composer](https://getcomposer.org) by running this command : ```bash -composer require munir131/laravel-pubsub-queue +composer require lde/laravel-pubsub-queue ``` If you disabled package discovery, you can still manually register this package by adding the following line to the providers of your `config/app.php` file : diff --git a/composer.json b/composer.json index 7117dbf..18abab8 100644 --- a/composer.json +++ b/composer.json @@ -1,8 +1,8 @@ { - "name": "munir131/laravel-pubsub-queue", + "name": "lde/laravel-pubsub-queue", "description": "Queue driver for Google Cloud Pub/Sub.", "keywords": [ - "munir131", + "lde", "laravel", "queue", "gcp", From e17b54e836addc2279fd9fc96b0347ce81ba4405 Mon Sep 17 00:00:00 2001 From: disha-kothari Date: Mon, 8 Jan 2024 16:52:24 +0530 Subject: [PATCH 2/5] refactor:job release with delay functionality --- src/Jobs/PubSubJob.php | 9 ++++++++- src/PubSubQueue.php | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/Jobs/PubSubJob.php b/src/Jobs/PubSubJob.php index cff14b9..9604172 100644 --- a/src/Jobs/PubSubJob.php +++ b/src/Jobs/PubSubJob.php @@ -130,7 +130,14 @@ public function release($delay = 0) $attempts = $this->attempts(); - $this->pubsub->acknowledgeAndPublish( + // $this->pubsub->acknowledgeAndPublish( + // $this->job, + // $this->queue, + // ['attempts' => $attempts], + // $delay + // ); + + $this->pubsub->republish( $this->job, $this->queue, ['attempts' => $attempts], diff --git a/src/PubSubQueue.php b/src/PubSubQueue.php index 8450c56..e1b7335 100644 --- a/src/PubSubQueue.php +++ b/src/PubSubQueue.php @@ -166,7 +166,18 @@ public function pop($subscriber = null) 'maxMessages' => 1, ]); - if (!empty($messages) && count($messages) > 0) { + if (empty($messages) || count($messages) < 1) { + return; + } + + $available_at = $messages[0]->attribute('available_at'); + if ($available_at && $available_at > time()) { + return; + } + + $this->acknowledge($messages[0], $queue); + + if (! empty($messages) && count($messages) > 0) { return new PubSubJob( $this->container, $this, @@ -222,6 +233,7 @@ public function acknowledge(Message $message, $queue = null) */ public function acknowledgeAndPublish(Message $message, $subscriberName = null, $options = [], $delay = 0) { + // added republish method - not used for now if (isset($options['attempts'])) { $options['attempts'] = (string) $options['attempts']; } @@ -241,6 +253,27 @@ public function acknowledgeAndPublish(Message $message, $subscriberName = null, ]); } + /** + * Republish a message onto the queue. + * + * @param \Google\Cloud\PubSub\Message $message + * @param string $queue + * @return mixed + */ + public function republish(Message $message, $queue = null, $options = [], $delay = 0) + { + $topic = $this->getTopic($this->getQueue($queue)); + + $options = array_merge([ + 'available_at' => (string) $this->availableAt($delay), + ], $options); + + return $topic->publish([ + 'data' => $message->data(), + 'attributes' => $options, + ]); + } + /** * Create a payload string from the given job and data. * From 2e23742eaef9ed9e07da8d130a5a705487ead50f Mon Sep 17 00:00:00 2001 From: disha-kothari Date: Wed, 10 Jan 2024 14:02:56 +0530 Subject: [PATCH 3/5] change available_time value based on current time with delay --- src/PubSubQueue.php | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/PubSubQueue.php b/src/PubSubQueue.php index e1b7335..88dff92 100644 --- a/src/PubSubQueue.php +++ b/src/PubSubQueue.php @@ -146,7 +146,7 @@ public function later($delay, $job, $data = '', $subscriber = null) return $this->pushRaw( $this->createPayload($job, $data), $subscriber, - ['available_at' => (string) $this->availableAt($delay)] + ['available_at' => (string) time() + $delay] ); } @@ -243,8 +243,11 @@ public function acknowledgeAndPublish(Message $message, $subscriberName = null, $subscription->acknowledge($message); + // $options = array_merge([ + // 'available_at' => (string) $this->availableAt($delay), + // ], $options); $options = array_merge([ - 'available_at' => (string) $this->availableAt($delay), + 'available_at' => (string) time() + $delay, ], $options); return $topic->publish([ @@ -265,7 +268,7 @@ public function republish(Message $message, $queue = null, $options = [], $delay $topic = $this->getTopic($this->getQueue($queue)); $options = array_merge([ - 'available_at' => (string) $this->availableAt($delay), + 'available_at' => (string) time() + $delay, ], $options); return $topic->publish([ From 059784c5ada644043959e8e09ab35dfcffcc3451 Mon Sep 17 00:00:00 2001 From: disha-kothari Date: Wed, 10 Jan 2024 18:18:00 +0530 Subject: [PATCH 4/5] re-change available_time based on available time function --- src/PubSubQueue.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/PubSubQueue.php b/src/PubSubQueue.php index 88dff92..b002084 100644 --- a/src/PubSubQueue.php +++ b/src/PubSubQueue.php @@ -146,7 +146,7 @@ public function later($delay, $job, $data = '', $subscriber = null) return $this->pushRaw( $this->createPayload($job, $data), $subscriber, - ['available_at' => (string) time() + $delay] + ['available_at' => (string) $this->availableAt($delay)] ); } @@ -243,13 +243,11 @@ public function acknowledgeAndPublish(Message $message, $subscriberName = null, $subscription->acknowledge($message); - // $options = array_merge([ - // 'available_at' => (string) $this->availableAt($delay), - // ], $options); $options = array_merge([ - 'available_at' => (string) time() + $delay, + 'available_at' => (string) $this->availableAt($delay), ], $options); + return $topic->publish([ 'data' => $message->data(), 'attributes' => $options, @@ -268,7 +266,7 @@ public function republish(Message $message, $queue = null, $options = [], $delay $topic = $this->getTopic($this->getQueue($queue)); $options = array_merge([ - 'available_at' => (string) time() + $delay, + 'available_at' => (string) $this->availableAt($delay), ], $options); return $topic->publish([ From f919ab77edd8cfd0fa53125171088b951a4510be Mon Sep 17 00:00:00 2001 From: disha-kothari Date: Thu, 11 Jan 2024 18:31:57 +0530 Subject: [PATCH 5/5] Convert attempt attributes to string and acknowledge message with passing only message --- src/Jobs/PubSubJob.php | 2 +- src/PubSubQueue.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Jobs/PubSubJob.php b/src/Jobs/PubSubJob.php index 9604172..d5d0ae4 100644 --- a/src/Jobs/PubSubJob.php +++ b/src/Jobs/PubSubJob.php @@ -140,7 +140,7 @@ public function release($delay = 0) $this->pubsub->republish( $this->job, $this->queue, - ['attempts' => $attempts], + ['attempts' => (string) $attempts], $delay ); } diff --git a/src/PubSubQueue.php b/src/PubSubQueue.php index b002084..9367f89 100644 --- a/src/PubSubQueue.php +++ b/src/PubSubQueue.php @@ -175,7 +175,7 @@ public function pop($subscriber = null) return; } - $this->acknowledge($messages[0], $queue); + $this->acknowledge($messages[0]); if (! empty($messages) && count($messages) > 0) { return new PubSubJob(