Skip to content

Commit e7847ea

Browse files
committed
Added support for delay message
1 parent 04fedb1 commit e7847ea

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

src/PubSubQueue.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ public function pop($subscriber = null)
165165
'returnImmediately' => true,
166166
'maxMessages' => 1,
167167
]);
168+
$messages = $this->checkAvailabilty($messages);
168169

169170
if (!empty($messages) && count($messages) > 0) {
170171
return new PubSubJob(
@@ -178,6 +179,18 @@ public function pop($subscriber = null)
178179
}
179180
}
180181

182+
public function checkAvailabilty($messages) {
183+
if (!empty($messages) && count($messages) > 0) {
184+
$availableAt = $messages[0]->attribute('available_at');
185+
$now = (new \DateTime('now'))->getTimestamp();
186+
if (!$availableAt) {
187+
return $messages;
188+
} else if ($availableAt < $now) {
189+
return $messages;
190+
}
191+
}
192+
}
193+
181194
/**
182195
* Push an array of jobs onto the queue.
183196
*

tests/Unit/PubSubQueueTests.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,4 +298,30 @@ public function testGetPubSub()
298298
{
299299
$this->assertTrue($this->queue->getPubSub() instanceof PubSubClient);
300300
}
301+
302+
public function testDelayedMessage()
303+
{
304+
$this->subscription->method('pull')
305+
->willReturn([$this->message]);
306+
307+
$this->message->method('attribute')
308+
->with($this->equalTo('available_at'))
309+
->willReturn((new \DateTime('now +1seconds'))->getTimestamp());
310+
311+
$this->topic->method('subscription')
312+
->willReturn($this->subscription);
313+
314+
$this->topic->method('exists')
315+
->willReturn(true);
316+
317+
$this->queue->method('getTopic')
318+
->willReturn($this->topic);
319+
320+
$this->queue->setContainer($this->createMock(Container::class));
321+
322+
$this->assertTrue(is_null($this->queue->pop('test')));
323+
sleep(2);
324+
$this->assertTrue($this->queue->pop('test') instanceof PubSubJob);
325+
326+
}
301327
}

0 commit comments

Comments
 (0)