blob: eaa65cb143fa24b0585299d4a51b00ae1bcbbb39 [file] [log] [blame]
David K. Bainbridgef0da8732016-06-01 16:15:37 -07001package main
2
3import (
David K. Bainbridgef0da8732016-06-01 16:15:37 -07004 "os/exec"
David K. Bainbridge068e87d2016-06-30 13:53:19 -07005 "time"
David K. Bainbridgef0da8732016-06-01 16:15:37 -07006)
7
8type WorkRequest struct {
9 Info *RequestInfo
10 Script string
11 Role string
12}
13
14type Worker struct {
15 ID int
16 Work chan WorkRequest
17 StatusChan chan StatusMsg
18 WorkerQueue chan chan WorkRequest
19 QuitChan chan bool
20}
21
22type StatusMsg struct {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070023 Request *WorkRequest `json:"request"`
24 Worker int `json:"worker"`
25 Status TaskStatus `json:"status"`
26 Message string `json:"message"`
27 Timestamp int64 `json:"timestamp"`
David K. Bainbridgef0da8732016-06-01 16:15:37 -070028}
29
30func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
31 // Create, and return the worker.
32 worker := Worker{
33 ID: id,
34 Work: make(chan WorkRequest),
35 StatusChan: statusChan,
36 WorkerQueue: workerQueue,
David K. Bainbridge38501582016-06-01 18:15:45 -070037 QuitChan: make(chan bool),
38 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -070039
40 return worker
41}
42
43func (w *Worker) Start() {
44 go func() {
45 for {
46 // Add ourselves into the worker queue.
47 w.WorkerQueue <- w.Work
48
49 select {
50 case work := <-w.Work:
51 // Receive a work request.
David K. Bainbridge068e87d2016-06-30 13:53:19 -070052 w.StatusChan <- StatusMsg{&work, w.ID, Running, "", time.Now().Unix()}
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -070053 log.Debugf("RUN: %s %s %s %s %s %s",
David K. Bainbridge38501582016-06-01 18:15:45 -070054 work.Script, work.Info.Id, work.Info.Name,
55 work.Info.Ip, work.Info.Mac, work.Role)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070056 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
57 work.Info.Ip, work.Info.Mac, work.Role).Run()
58 if err != nil {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070059 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error(),
60 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070061 } else {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070062 w.StatusChan <- StatusMsg{&work, w.ID, Complete, "",
63 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070064 }
65 case <-w.QuitChan:
66 // We have been asked to stop.
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -070067 log.Infof("worker%d stopping\n", w.ID)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070068 return
69 }
70 }
71 }()
72}
73
74func (w *Worker) Stop() {
75 go func() {
76 w.QuitChan <- true
77 }()
78}
79
80type Dispatcher struct {
81 Storage Storage
82 WorkQueue chan WorkRequest
83 WorkerQueue chan chan WorkRequest
84 StatusChan chan StatusMsg
85 QuitChan chan bool
86 NumWorkers int
87}
88
89func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
90 d := Dispatcher{
91 Storage: storage,
92 WorkQueue: make(chan WorkRequest, 100),
93 StatusChan: make(chan StatusMsg, 100),
94 NumWorkers: numWorkers,
95 WorkerQueue: make(chan chan WorkRequest, numWorkers),
96 QuitChan: make(chan bool),
97 }
98
99 return &d
100}
101
David K. Bainbridge38501582016-06-01 18:15:45 -0700102func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700103 d.WorkQueue <- WorkRequest{
David K. Bainbridge38501582016-06-01 18:15:45 -0700104 Info: info,
105 Script: script,
106 Role: role,
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700107 }
108 return nil
109}
110
111func (d *Dispatcher) Start() {
112 // Now, create all of our workers.
113 for i := 0; i < d.NumWorkers; i++ {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700114 log.Infof("Creating worker %d", i)
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700115 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
116 worker.Start()
117 }
118
119 go func() {
120 for {
121 select {
122 case work := <-d.WorkQueue:
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700123 log.Debugf("Received work requeust")
David K. Bainbridge068e87d2016-06-30 13:53:19 -0700124 d.StatusChan <- StatusMsg{&work, -1, Pending, "", time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700125 go func() {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700126 worker := <-d.WorkerQueue
127
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700128 log.Debugf("Dispatching work request")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700129 worker <- work
130 }()
131 case update := <-d.StatusChan:
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700132 err := d.Storage.Put(update.Request.Info.Id, update)
133 if err != nil {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700134 log.Errorf("Unable to update storage with status for '%s' : %s",
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700135 update.Request.Info.Id, err)
136 } else {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700137 log.Debugf("Storage updated for '%s'", update.Request.Info.Id)
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700138 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700139 case <-d.QuitChan:
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700140 log.Infof("Stopping dispatcher")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700141 return
142 }
143 }
144 }()
145}
146
147func (d *Dispatcher) Stop() {
148 go func() {
149 d.QuitChan <- true
150 }()
151}