-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprocess.go
More file actions
132 lines (117 loc) · 2.8 KB
/
process.go
File metadata and controls
132 lines (117 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package process
import (
"bufio"
"fmt"
"os/exec"
"sync"
)
/******************
* PROCESS MANAGER *
*******************/
// Processor takes a driver and runs external commands
type Processor struct {
stdout chan string
stderr chan string
success chan bool
failure chan bool
done chan bool
driver Driver
}
// Driver is the interface that you configure the process with
// has handlers for each type of output event
type Driver interface {
Initialize(string)
HandleNothing()
HandleOut(string)
HandleErr(string)
HandleSuccess()
HandleFailure()
}
// New returns a new processor
func New(driver Driver) *Processor {
stdout := make(chan string)
stderr := make(chan string)
success := make(chan bool)
failure := make(chan bool)
done := make(chan bool)
return &Processor{
stdout: stdout, stderr: stderr, success: success,
failure: failure, done: done, driver: driver}
}
// Run runs an external command and does something with the errors and output
func (p *Processor) Run(task, cmdName string, cmdArgs ...string) error {
// attach the driver to respond to output
go p.attachDriver(task)
// run the command
cmd := exec.Command(cmdName, cmdArgs...)
// set stdout and stderr to a pipe
cmdOut, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error creating pipe for stdout: %v", err)
}
cmdErr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("error creating pipe for stderr: %v", err)
}
// create scanners for each pipe
outScanner := bufio.NewScanner(cmdOut)
errScanner := bufio.NewScanner(cmdErr)
// create a waitgroup for each pipe listener
var wg sync.WaitGroup
wg.Add(2)
// spawn a listener for stdout
go func() {
for outScanner.Scan() {
p.stdout <- outScanner.Text()
}
wg.Done()
}()
// spawn a listener for stderr
go func() {
for errScanner.Scan() {
p.stderr <- errScanner.Text()
}
wg.Done()
}()
// start the command
if err = cmd.Start(); err != nil {
return fmt.Errorf("error starting command: %v", err)
}
// wait for the listeners to drain their scanners
wg.Wait()
if err := cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
// The program has exited with an exit code != 0
p.failure <- true
} else {
return fmt.Errorf("error waiting on command: %v", err)
}
} else {
// signal that the process completed successfully
p.success <- true
}
// wait for the driver to handle all of the output
<-p.done
return nil
}
func (p *Processor) attachDriver(task string) {
p.driver.Initialize(task)
jump:
for {
select {
case out := <-p.stdout:
p.driver.HandleOut(out)
case err := <-p.stderr:
p.driver.HandleErr(err)
case _ = <-p.success:
p.driver.HandleSuccess()
break jump
case _ = <-p.failure:
p.driver.HandleFailure()
break jump
default:
p.driver.HandleNothing()
}
}
p.done <- true
}