Skip to content

Commit 3b083a5

Browse files
author
Francesco Cosentino
committed
better task handling
1 parent a1b6ccf commit 3b083a5

File tree

7 files changed

+208
-35
lines changed

7 files changed

+208
-35
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,12 @@ func main() {
132132
task2 := worker.Task{
133133
ID: uuid.New(),
134134
Priority: 5,
135-
Fn: func() interface{} { return "Hello, World from Task 2!" },
135+
// You can pass in parameters to the function and return a value using a closure
136+
Fn: func() interface{} {
137+
return func(a int, b int) interface{} {
138+
return a + b
139+
}(2, 5)
140+
},
136141
}
137142

138143
task3 := worker.Task{

examples/manual/main.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/google/uuid"
8+
"github.com/hyp3rd/go-worker"
9+
"github.com/hyp3rd/go-worker/middleware"
10+
)
11+
12+
func main() {
13+
tm := worker.NewTaskManager(1, 10)
14+
// Example of using zap logger from uber
15+
logger := log.Default()
16+
17+
// apply middleware in the same order as you want to execute them
18+
tm = worker.RegisterMiddleware(tm,
19+
// middleware.YourMiddleware,
20+
func(next worker.Service) worker.Service {
21+
return middleware.NewLoggerMiddleware(next, logger)
22+
},
23+
)
24+
25+
task := worker.Task{
26+
ID: uuid.New(),
27+
Priority: 1,
28+
Fn: func() interface{} { return "Hello, World from Task!" },
29+
}
30+
31+
// tm.RegisterTask(task)
32+
33+
res, err := tm.ExecuteTask(task.ID)
34+
if err != nil {
35+
fmt.Println(err)
36+
} else {
37+
fmt.Println(res)
38+
}
39+
}

examples/middleware/main.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"fmt"
5-
"log"
65
"time"
76

87
"github.com/google/uuid"
@@ -12,21 +11,29 @@ import (
1211

1312
func main() {
1413
tm := worker.NewTaskManager(5, 10)
15-
// Example of using zap logger from uber
16-
logger := log.Default()
1714

1815
// apply middleware in the same order as you want to execute them
1916
tm = worker.RegisterMiddleware(tm,
2017
// middleware.YourMiddleware,
2118
func(next worker.Service) worker.Service {
22-
return middleware.NewLoggerMiddleware(next, logger)
19+
return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
2320
},
2421
)
2522

2623
task := worker.Task{
2724
ID: uuid.New(),
2825
Priority: 1,
29-
Fn: func() interface{} { return "Hello, World from Task 1!" },
26+
Fn: func() interface{} {
27+
return func(a int, b int) interface{} {
28+
return a + b
29+
}(2, 5)
30+
},
31+
}
32+
33+
// Invalid task, it doesn't have a function
34+
task1 := worker.Task{
35+
ID: uuid.New(),
36+
Priority: 1,
3037
}
3138

3239
task2 := worker.Task{
@@ -55,7 +62,7 @@ func main() {
5562
},
5663
}
5764

58-
tm.RegisterTask(task, task2, task3, task4)
65+
tm.RegisterTask(task, task1, task2, task3, task4)
5966
tm.Start(5)
6067

6168
tm.CancelTask(task3.ID)

middleware/logger.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package middleware
22

33
import (
4+
"log"
5+
"os"
6+
"runtime"
47
"time"
58

69
"github.com/google/uuid"
@@ -13,6 +16,17 @@ type Logger interface {
1316
// Errorf(format string, v ...interface{})
1417
}
1518

19+
// this is a safeguard, breaking on compile time in case
20+
// `log.Logger` does not adhere to our `Logger` interface.
21+
// see https://golang.org/doc/faq#guarantee_satisfies_interface
22+
var _ Logger = &log.Logger{}
23+
24+
// DefaultLogger returns a `Logger` implementation
25+
// backed by stdlib's log
26+
func DefaultLogger() *log.Logger {
27+
return log.New(os.Stdout, "", log.LstdFlags)
28+
}
29+
1630
// loggerMiddleware is a middleware that logs the time it takes to execute the next middleware.
1731
// Must implement the `worker.Service` interface.
1832
type loggerMiddleware struct {
@@ -28,21 +42,23 @@ func NewLoggerMiddleware(next worker.Service, logger Logger) worker.Service {
2842
// RegisterTask registers a new task to the worker
2943
func (mw *loggerMiddleware) RegisterTask(tasks ...worker.Task) {
3044
defer func(begin time.Time) {
45+
for _, t := range tasks {
46+
mw.logger.Printf("registering task ID %v with priority: %v", t.ID, t.Priority)
47+
}
3148
mw.logger.Printf("`RegisterTask` took: %s", time.Since(begin))
3249
}(time.Now())
3350

34-
for _, t := range tasks {
35-
mw.logger.Printf("registered task ID %v with priority: %v", t.ID, t.Priority)
36-
}
37-
3851
mw.next.RegisterTask(tasks...)
3952
}
4053

4154
// Start the task manager
4255
func (mw *loggerMiddleware) Start(numWorkers int) {
4356
defer func(begin time.Time) {
44-
mw.logger.Printf("`Start` took: %s", time.Since(begin))
57+
// var numCPU = runtime.GOMAXPROCS(0)
58+
var numCPU = runtime.NumCPU()
59+
mw.logger.Printf("the task manager is running on %v CPUs", numCPU)
4560
mw.logger.Printf("the task manager started with %v workers", numWorkers)
61+
mw.logger.Printf("`Start` took: %s", time.Since(begin))
4662
}(time.Now())
4763

4864
mw.next.Start(numWorkers)
@@ -68,6 +84,21 @@ func (mw *loggerMiddleware) GetTasks() []worker.Task {
6884
return mw.next.GetTasks()
6985
}
7086

87+
// ExecuteTask executes a task given its ID and returns the result
88+
func (mw *loggerMiddleware) ExecuteTask(id uuid.UUID) (res interface{}, err error) {
89+
defer func(begin time.Time) {
90+
mw.logger.Printf("`ExecuteTask` took: %s", time.Since(begin))
91+
}(time.Now())
92+
93+
mw.logger.Printf("executing task ID %v", id)
94+
95+
// if res, err = mw.next.ExecuteTask(id); err != nil {
96+
// mw.logger.Printf("error while executing task ID %v - %v", id, err)
97+
// }
98+
99+
return mw.next.ExecuteTask(id)
100+
}
101+
71102
// CancelAll cancels all tasks
72103
func (mw *loggerMiddleware) CancelAll() {
73104
mw.next.CancelAll()

service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Service interface {
1616
GetTask(id uuid.UUID) (task Task, ok bool)
1717
// GetTasks gets all tasks
1818
GetTasks() []Task
19+
// ExecuteTask executes a task given its ID and returns the result
20+
ExecuteTask(id uuid.UUID) (interface{}, error)
1921
// CancelAll cancels all tasks
2022
CancelAll()
2123
// CancelTask cancels a task by its ID

task.go

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,70 @@ package worker
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
57
"sync/atomic"
68
"time"
79

810
"github.com/google/uuid"
911
)
1012

13+
var (
14+
// ErrInvalidTaskID is returned when a task has an invalid ID
15+
ErrInvalidTaskID = errors.New("invalid task id")
16+
// ErrInvalidTaskFunc is returned when a task has an invalid function
17+
ErrInvalidTaskFunc = errors.New("invalid task function")
18+
// ErrTaskNotFound is returned when a task is not found
19+
ErrTaskNotFound = errors.New("task not found")
20+
)
21+
22+
// CancelReason is a value used to represent the cancel reason.
23+
type CancelReason uint8
24+
25+
// CancelReason values
26+
// - 1: `ContextDeadlineReached`
27+
// - 2: `RateLimited`
28+
// - 3: `Cancelled`
29+
const (
30+
// ContextDeadlineReached means the context is past its deadline.
31+
ContextDeadlineReached = CancelReason(1)
32+
// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
33+
RateLimited = CancelReason(2)
34+
// Cancelled means `CancelTask` was invked and the `Task` was cancelled.
35+
Cancelled = CancelReason(3)
36+
)
37+
38+
// TaskFunc signature of `Task` function
39+
type TaskFunc func() interface{}
40+
1141
// Task represents a function that can be executed by the task manager
1242
type Task struct {
13-
ID uuid.UUID `json:"id"` // ID is the id of the task
14-
Priority int `json:"priority"` // Priority is the priority of the task
15-
Fn func() interface{} `json:"-"` // Fn is the function that will be executed by the task
16-
Ctx context.Context `json:"context"` // Ctx is the context of the task
17-
Cancel context.CancelFunc `json:"-"` // Cancel is the cancel function of the task
18-
Started atomic.Int64 `json:"started"` // Started is the time the task started
19-
Completed atomic.Int64 `json:"completed"` // Completed is the time the task completed
20-
Cancelled atomic.Int64 `json:"cancelled"` // Cancelled is the time the task was cancelled
43+
ID uuid.UUID `json:"id"` // ID is the id of the task
44+
Priority int `json:"priority"` // Priority is the priority of the task
45+
Fn TaskFunc `json:"-"` // Fn is the function that will be executed by the task
46+
Ctx context.Context `json:"context"` // Ctx is the context of the task
47+
Cancel context.CancelFunc `json:"-"` // Cancel is the cancel function of the task
48+
Error atomic.Value `json:"error"` // Error is the error of the task
49+
Started atomic.Int64 `json:"started"` // Started is the time the task started
50+
Completed atomic.Int64 `json:"completed"` // Completed is the time the task completed
51+
Cancelled atomic.Int64 `json:"cancelled"` // Cancelled is the time the task was cancelled
52+
CancelReason CancelReason `json:"cancel_reason"` // CancelReason is the reason the task was cancelled
53+
}
54+
55+
// IsValid returns an error if the task is invalid
56+
func (t Task) IsValid() (err error) {
57+
fmt.Println("validating task")
58+
if t.ID == uuid.Nil {
59+
err = ErrInvalidTaskID
60+
t.Error.Store(err.Error())
61+
return
62+
}
63+
if t.Fn == nil {
64+
err = ErrInvalidTaskFunc
65+
t.Error.Store(err.Error())
66+
return
67+
}
68+
return
2169
}
2270

2371
// setStarted handles the start of a task by setting the start time
@@ -42,7 +90,7 @@ type taskHeap []Task
4290
func (h taskHeap) Len() int { return len(h) }
4391

4492
// Less returns true if the priority of the first task is less than the second task
45-
func (h taskHeap) Less(i, j int) bool { return h[i].Priority > h[j].Priority }
93+
func (h taskHeap) Less(i, j int) bool { return h[i].Priority < h[j].Priority }
4694

4795
// Swap swaps the position of two tasks in the heap
4896
func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

0 commit comments

Comments
 (0)