File tree 2 files changed +18
-1
lines changed
2 files changed +18
-1
lines changed Original file line number Diff line number Diff line change @@ -70,14 +70,26 @@ $consumer->consume();
70
70
** Consuming algorithm:**
71
71
-------------------------------
72
72
73
- ????
73
+ ``` $consumer->consume(); ``` - base realization of consumer.
74
74
75
+ If the message table does not exist, it will be created.
75
76
77
+ Next, will start endless loop ``` while(true) ``` to get the next message from the queue.
78
+ if there are no messages, there will be a sustained seconds pause.
76
79
80
+ When the message is received, it will be processed. Job has priority over the processor.
77
81
82
+ If an uncaught error occurs, it will be caught and increment first processing attempt.
83
+
84
+ After several unsuccessful attempts, the message will status ` \Simple\Queue\Status::FAILURE ` .
85
+
86
+ If there are no handlers for the message, the message will status ` \Simple\Queue\Status::UNDEFINED_HANDLER ` .
87
+
88
+ > Messages are processed with statuses: ` \Simple\Queue\Status::NEW ` and ` \Simple\Queue\Status::REDELIVERED `
78
89
79
90
<br >
80
91
92
+
81
93
** Custom example for consuming:**
82
94
-------------------------------
83
95
Original file line number Diff line number Diff line change @@ -204,6 +204,9 @@ public function consume(array $queues = []): void
204
204
while (true ) {
205
205
if ($ message = $ this ->fetchMessage ($ queues )) {
206
206
try {
207
+
208
+ // TODO: set IN_PROCESS status
209
+
207
210
$ this ->processing ($ message );
208
211
} catch (Throwable $ throwable ) {
209
212
if ($ message ->getAttempts () >= $ this ->config ->getNumberOfAttemptsBeforeFailure ()) {
@@ -223,7 +226,9 @@ public function consume(array $queues = []): void
223
226
// maybe lucky later
224
227
}
225
228
}
229
+ continue ;
226
230
}
231
+ sleep (1 );
227
232
}
228
233
}
229
234
You can’t perform that action at this time.
0 commit comments