Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
# Laravel PubSub Queue

[![Travis](https://img.shields.io/travis/munir131/laravel-pubsub-queue/5.8?style=for-the-badge)](https://github.com/munir131/laravel-pubsub-queue)
[![StyleCI](https://styleci.io/repos/131718560/shield)](https://styleci.io/repos/131718560)

This package is a Laravel 5.8 queue driver that use the [Google PubSub](https://github.com/GoogleCloudPlatform/google-cloud-php-pubsub) service.

## Installation

You can easily install this package with [Composer](https://getcomposer.org) by running this command :

```bash
composer require munir131/laravel-pubsub-queue
composer require lde/laravel-pubsub-queue
```

If you disabled package discovery, you can still manually register this package by adding the following line to the providers of your `config/app.php` file :
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "munir131/laravel-pubsub-queue",
"name": "lde/laravel-pubsub-queue",
"description": "Queue driver for Google Cloud Pub/Sub.",
"keywords": [
"munir131",
"lde",
"laravel",
"queue",
"gcp",
Expand Down
11 changes: 9 additions & 2 deletions src/Jobs/PubSubJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,17 @@ public function release($delay = 0)

$attempts = $this->attempts();

$this->pubsub->acknowledgeAndPublish(
// $this->pubsub->acknowledgeAndPublish(
// $this->job,
// $this->queue,
// ['attempts' => $attempts],
// $delay
// );

$this->pubsub->republish(
$this->job,
$this->queue,
['attempts' => $attempts],
['attempts' => (string) $attempts],
$delay
);
}
Expand Down
38 changes: 36 additions & 2 deletions src/PubSubQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public function later($delay, $job, $data = '', $subscriber = null)
return $this->pushRaw(
$this->createPayload($job, $data),
$subscriber,
['available_at' => (string) $this->availableAt($delay)]
['available_at' => (string) $this->availableAt($delay)]
);
}

Expand All @@ -166,7 +166,18 @@ public function pop($subscriber = null)
'maxMessages' => 1,
]);

if (!empty($messages) && count($messages) > 0) {
if (empty($messages) || count($messages) < 1) {
return;
}

$available_at = $messages[0]->attribute('available_at');
if ($available_at && $available_at > time()) {
return;
}

$this->acknowledge($messages[0]);

if (! empty($messages) && count($messages) > 0) {
return new PubSubJob(
$this->container,
$this,
Expand Down Expand Up @@ -222,6 +233,7 @@ public function acknowledge(Message $message, $queue = null)
*/
public function acknowledgeAndPublish(Message $message, $subscriberName = null, $options = [], $delay = 0)
{
// added republish method - not used for now
if (isset($options['attempts'])) {
$options['attempts'] = (string) $options['attempts'];
}
Expand All @@ -235,6 +247,28 @@ public function acknowledgeAndPublish(Message $message, $subscriberName = null,
'available_at' => (string) $this->availableAt($delay),
], $options);


return $topic->publish([
'data' => $message->data(),
'attributes' => $options,
]);
}

/**
* Republish a message onto the queue.
*
* @param \Google\Cloud\PubSub\Message $message
* @param string $queue
* @return mixed
*/
public function republish(Message $message, $queue = null, $options = [], $delay = 0)
{
$topic = $this->getTopic($this->getQueue($queue));

$options = array_merge([
'available_at' => (string) $this->availableAt($delay),
], $options);

return $topic->publish([
'data' => $message->data(),
'attributes' => $options,
Expand Down