-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.go
More file actions
74 lines (62 loc) · 1.5 KB
/
worker.go
File metadata and controls
74 lines (62 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package workerpool
import (
"sync"
)
// Task encapsulates a work item that should go in a work pool.
type Task struct {
// Err holds an error that occurred during a task. Its result is only
// meaningful after Run has been called for the pool that holds it.
Payload interface{}
F Work
}
type Work func(interface{})interface{}
// Run runs a Task and does appropriate accounting via a given sync.WorkGroup.
func (t *Task) Run(p *Pool) {
var ret=t.F(t.Payload)
select {
case p.RetChan <-ret:
}
}
// Pool is a worker group that runs a number of tasks at a configured
// concurrency.
type Pool struct {
concurrency int
tasksChan chan *Task
RetChan chan interface{}
wg sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and at the given
// concurrency.
func NewPool(concurrency int) *Pool {
return &Pool{
concurrency: concurrency,
tasksChan: make(chan *Task,concurrency*2),
RetChan: make(chan interface{},concurrency*2),
}
}
func (p *Pool) Start(){
for i := 0; i < p.concurrency; i++ {
p.wg.Add(1)
go p.work()
}
}
func (p *Pool) NewTask(f Work,payload interface{}){
select {
case p.tasksChan<-&Task{F:f, Payload:payload}:
}
}
// Run runs all work within the pool and blocks until it's finished.
func (p *Pool) Close() {
// all workers return
close(p.tasksChan)
p.wg.Wait()
close(p.RetChan)
}
// The work loop for any single goroutine.
func (p *Pool) work() {
defer p.wg.Done()
var todo *Task
for todo= range p.tasksChan {
todo.Run(p)
}
}