blob: fb16458d094d1349075944cf341857e37b8d3411 [file] [log] [blame]
David K. Bainbridgef0da8732016-06-01 16:15:37 -07001package main
2
3import (
4 "log"
5 "os/exec"
6)
7
8type WorkRequest struct {
9 Info *RequestInfo
10 Script string
11 Role string
12}
13
14type Worker struct {
15 ID int
16 Work chan WorkRequest
17 StatusChan chan StatusMsg
18 WorkerQueue chan chan WorkRequest
19 QuitChan chan bool
20}
21
22type StatusMsg struct {
23 Request *WorkRequest
24 Worker int
25 Status TaskStatus
26 Message string
27}
28
29func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
30 // Create, and return the worker.
31 worker := Worker{
32 ID: id,
33 Work: make(chan WorkRequest),
34 StatusChan: statusChan,
35 WorkerQueue: workerQueue,
David K. Bainbridge38501582016-06-01 18:15:45 -070036 QuitChan: make(chan bool),
37 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -070038
39 return worker
40}
41
42func (w *Worker) Start() {
43 go func() {
44 for {
45 // Add ourselves into the worker queue.
46 w.WorkerQueue <- w.Work
47
48 select {
49 case work := <-w.Work:
50 // Receive a work request.
51 w.StatusChan <- StatusMsg{&work, w.ID, Running, ""}
David K. Bainbridge38501582016-06-01 18:15:45 -070052 log.Printf("RUN: %s %s %s %s %s %s",
53 work.Script, work.Info.Id, work.Info.Name,
54 work.Info.Ip, work.Info.Mac, work.Role)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070055 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
56 work.Info.Ip, work.Info.Mac, work.Role).Run()
57 if err != nil {
58 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error()}
59 } else {
60 w.StatusChan <- StatusMsg{&work, w.ID, Complete, ""}
61 }
62 case <-w.QuitChan:
63 // We have been asked to stop.
64 log.Printf("worker%d stopping\n", w.ID)
65 return
66 }
67 }
68 }()
69}
70
71func (w *Worker) Stop() {
72 go func() {
73 w.QuitChan <- true
74 }()
75}
76
77type Dispatcher struct {
78 Storage Storage
79 WorkQueue chan WorkRequest
80 WorkerQueue chan chan WorkRequest
81 StatusChan chan StatusMsg
82 QuitChan chan bool
83 NumWorkers int
84}
85
86func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
87 d := Dispatcher{
88 Storage: storage,
89 WorkQueue: make(chan WorkRequest, 100),
90 StatusChan: make(chan StatusMsg, 100),
91 NumWorkers: numWorkers,
92 WorkerQueue: make(chan chan WorkRequest, numWorkers),
93 QuitChan: make(chan bool),
94 }
95
96 return &d
97}
98
David K. Bainbridge38501582016-06-01 18:15:45 -070099func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700100 d.WorkQueue <- WorkRequest{
David K. Bainbridge38501582016-06-01 18:15:45 -0700101 Info: info,
102 Script: script,
103 Role: role,
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700104 }
105 return nil
106}
107
108func (d *Dispatcher) Start() {
109 // Now, create all of our workers.
110 for i := 0; i < d.NumWorkers; i++ {
111 log.Printf("Creating worker %d", i)
112 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
113 worker.Start()
114 }
115
116 go func() {
117 for {
118 select {
119 case work := <-d.WorkQueue:
120 log.Println("Received work requeust")
121 go func() {
122 d.StatusChan <- StatusMsg{&work, -1, Pending, ""}
123 worker := <-d.WorkerQueue
124
125 log.Println("Dispatching work request")
126 worker <- work
127 }()
128 case update := <-d.StatusChan:
129 d.Storage.Put(update.Request.Info.Id, update)
130 case <-d.QuitChan:
131 log.Println("[info] Stopping dispatcher")
132 return
133 }
134 }
135 }()
136}
137
138func (d *Dispatcher) Stop() {
139 go func() {
140 d.QuitChan <- true
141 }()
142}