4
4
"context"
5
5
"errors"
6
6
"reflect"
7
+ "regexp"
8
+ "strconv"
7
9
"strings"
8
10
"sync"
9
11
"time"
@@ -42,6 +44,8 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
42
44
self .callbacks = make (map [string ]server.PGCallback , 100 )
43
45
self .decoder = marshaler .NewDecoder ("json" ,
44
46
convertPtr ,
47
+ convertPGTime ,
48
+ convertPGDuration ,
45
49
convertFloatToIntUint ,
46
50
marshaler .ConvertTime ,
47
51
marshaler .ConvertDuration ,
@@ -61,7 +65,7 @@ func (task *task) Run(parent context.Context) error {
61
65
62
66
// Create a cancelable context
63
67
ctx , cancel := context .WithCancel (context .Background ())
64
- ctx = ref .WithPath (ref .WithLog (ctx , ref .Log (parent )), ref .Path (parent )... )
68
+ ctx = ref .WithPath (ref .WithProvider (ctx , ref .Provider (parent )), ref .Path (parent )... )
65
69
66
70
// Ticker loop
67
71
tickerch := make (chan * schema.Ticker )
@@ -147,7 +151,9 @@ FOR_LOOP:
147
151
}
148
152
n += len (tasks )
149
153
}
150
- ref .Log (ctx ).With ("ticker" , evt ).Debug (parent , "removed " , n , " tasks from queue" )
154
+ if n > 0 {
155
+ ref .Log (ctx ).With ("ticker" , evt ).Debug (parent , "removed " , n , " tasks from queue" )
156
+ }
151
157
}
152
158
}
153
159
}
@@ -167,6 +173,11 @@ func (t *task) Conn() pg.PoolConn {
167
173
return t .manager .Conn ()
168
174
}
169
175
176
+ // Namespace returns the namespace of the queue.
177
+ func (t * task ) Namespace () string {
178
+ return t .manager .Namespace ()
179
+ }
180
+
170
181
// RegisterTicker registers a periodic task (ticker) with a callback function.
171
182
// It returns the metadata of the registered ticker.
172
183
func (t * task ) RegisterTicker (ctx context.Context , meta schema.TickerMeta , fn server.PGCallback ) (* schema.Ticker , error ) {
@@ -255,7 +266,8 @@ func (t *task) tryTask(ctx context.Context, taskpool *pgqueue.TaskPool, task *sc
255
266
taskpool .RunTask (ctx , task , t .getTaskCallback (task ), func (err error ) {
256
267
var status string
257
268
delta := time .Since (now ).Truncate (time .Millisecond )
258
- if _ , err_ := t .manager .ReleaseTask (context .TODO (), task .Id , err == nil , err , & status ); err_ != nil {
269
+ child := ref .WithPath (ref .WithProvider (context .TODO (), ref .Provider (ctx )), ref .Path (ctx )... )
270
+ if _ , err_ := t .manager .ReleaseTask (child , task .Id , err == nil , err , & status ); err_ != nil {
259
271
err = errors .Join (err , err_ )
260
272
}
261
273
switch {
@@ -293,16 +305,75 @@ func joinName(parts ...string) string {
293
305
return strings .Join (parts , namespaceSeparator )
294
306
}
295
307
296
- func splitName (name string , n int ) []string {
297
- return strings .SplitN (name , namespaceSeparator , n )
298
- }
299
-
300
308
// //////////////////////////////////////////////////////////////////////////////
301
309
// PRIVATE METHODS
310
+
302
311
var (
303
- nilValue = reflect .ValueOf (nil )
312
+ nilValue = reflect .ValueOf (nil )
313
+ timeType = reflect .TypeOf (time.Time {})
314
+ durationType = reflect .TypeOf (time .Duration (0 ))
315
+ rePostgresDuration = regexp .MustCompile (`^(\d+):(\d+):(\d+)$` )
304
316
)
305
317
318
+ // convertPGTime returns time from postgres format
319
+ func convertPGTime (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
320
+ // Pass value through
321
+ if src .Type () == dest {
322
+ return src , nil
323
+ }
324
+
325
+ if dest == timeType || dest .Kind () == reflect .Ptr && dest .Elem () == timeType {
326
+ var v reflect.Value
327
+
328
+ // Convert time 2025-05-03T17:29:32.329803 => time.Time
329
+ if t , err := time .Parse ("2006-01-02T15:04:05.999999999" , src .String ()); err == nil {
330
+ v = reflect .ValueOf (t )
331
+ } else if t , err := time .Parse ("2006-01-02T15:04:05.999999999Z" , src .String ()); err == nil {
332
+ v = reflect .ValueOf (t )
333
+ }
334
+
335
+ // Return value
336
+ if v .IsValid () {
337
+ if dest .Kind () == reflect .Ptr {
338
+ value := reflect .New (dest .Elem ())
339
+ value .Elem ().Set (v )
340
+ return value , nil
341
+ } else {
342
+ return v , nil
343
+ }
344
+ }
345
+ }
346
+
347
+ // Skip
348
+ return nilValue , nil
349
+ }
350
+
351
+ // convertPGDuration returns duration from postgres format
352
+ func convertPGDuration (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
353
+ // Pass value through
354
+ if src .Type () == dest {
355
+ return src , nil
356
+ }
357
+
358
+ if dest == durationType {
359
+ // Convert 00:00:00 => time.Duration
360
+ if parts := rePostgresDuration .FindStringSubmatch (src .String ()); len (parts ) == 4 {
361
+ if hours , err := strconv .ParseUint (parts [1 ], 10 , 64 ); err != nil {
362
+ return nilValue , err
363
+ } else if minutes , err := strconv .ParseUint (parts [2 ], 10 , 64 ); err != nil {
364
+ return nilValue , err
365
+ } else if seconds , err := strconv .ParseUint (parts [3 ], 10 , 64 ); err != nil {
366
+ return nilValue , err
367
+ } else {
368
+ return reflect .ValueOf (time .Duration (hours )* time .Hour + time .Duration (minutes )* time .Minute + time .Duration (seconds )* time .Second ), nil
369
+ }
370
+ }
371
+ }
372
+
373
+ // Skip
374
+ return nilValue , nil
375
+ }
376
+
306
377
// convertPtr returns value if pointer
307
378
func convertPtr (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
308
379
// Pass value through
0 commit comments