blob: d448e19aef880c78ae3dd82d70ecda8608deaf57 [file] [log] [blame]
David K. Bainbridgef0da8732016-06-01 16:15:37 -07001package main
2
3import (
4 "log"
5 "os/exec"
David K. Bainbridge068e87d2016-06-30 13:53:19 -07006 "time"
David K. Bainbridgef0da8732016-06-01 16:15:37 -07007)
8
9type WorkRequest struct {
10 Info *RequestInfo
11 Script string
12 Role string
13}
14
15type Worker struct {
16 ID int
17 Work chan WorkRequest
18 StatusChan chan StatusMsg
19 WorkerQueue chan chan WorkRequest
20 QuitChan chan bool
21}
22
23type StatusMsg struct {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070024 Request *WorkRequest `json:"request"`
25 Worker int `json:"worker"`
26 Status TaskStatus `json:"status"`
27 Message string `json:"message"`
28 Timestamp int64 `json:"timestamp"`
David K. Bainbridgef0da8732016-06-01 16:15:37 -070029}
30
31func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
32 // Create, and return the worker.
33 worker := Worker{
34 ID: id,
35 Work: make(chan WorkRequest),
36 StatusChan: statusChan,
37 WorkerQueue: workerQueue,
David K. Bainbridge38501582016-06-01 18:15:45 -070038 QuitChan: make(chan bool),
39 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -070040
41 return worker
42}
43
44func (w *Worker) Start() {
45 go func() {
46 for {
47 // Add ourselves into the worker queue.
48 w.WorkerQueue <- w.Work
49
50 select {
51 case work := <-w.Work:
52 // Receive a work request.
David K. Bainbridge068e87d2016-06-30 13:53:19 -070053 w.StatusChan <- StatusMsg{&work, w.ID, Running, "", time.Now().Unix()}
David K. Bainbridge97ee8052016-06-14 00:52:07 -070054 log.Printf("[debug] RUN: %s %s %s %s %s %s",
David K. Bainbridge38501582016-06-01 18:15:45 -070055 work.Script, work.Info.Id, work.Info.Name,
56 work.Info.Ip, work.Info.Mac, work.Role)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070057 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
58 work.Info.Ip, work.Info.Mac, work.Role).Run()
59 if err != nil {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070060 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error(),
61 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070062 } else {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070063 w.StatusChan <- StatusMsg{&work, w.ID, Complete, "",
64 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070065 }
66 case <-w.QuitChan:
67 // We have been asked to stop.
68 log.Printf("worker%d stopping\n", w.ID)
69 return
70 }
71 }
72 }()
73}
74
75func (w *Worker) Stop() {
76 go func() {
77 w.QuitChan <- true
78 }()
79}
80
81type Dispatcher struct {
82 Storage Storage
83 WorkQueue chan WorkRequest
84 WorkerQueue chan chan WorkRequest
85 StatusChan chan StatusMsg
86 QuitChan chan bool
87 NumWorkers int
88}
89
90func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
91 d := Dispatcher{
92 Storage: storage,
93 WorkQueue: make(chan WorkRequest, 100),
94 StatusChan: make(chan StatusMsg, 100),
95 NumWorkers: numWorkers,
96 WorkerQueue: make(chan chan WorkRequest, numWorkers),
97 QuitChan: make(chan bool),
98 }
99
100 return &d
101}
102
David K. Bainbridge38501582016-06-01 18:15:45 -0700103func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700104 d.WorkQueue <- WorkRequest{
David K. Bainbridge38501582016-06-01 18:15:45 -0700105 Info: info,
106 Script: script,
107 Role: role,
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700108 }
109 return nil
110}
111
112func (d *Dispatcher) Start() {
113 // Now, create all of our workers.
114 for i := 0; i < d.NumWorkers; i++ {
115 log.Printf("Creating worker %d", i)
116 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
117 worker.Start()
118 }
119
120 go func() {
121 for {
122 select {
123 case work := <-d.WorkQueue:
David K. Bainbridge97ee8052016-06-14 00:52:07 -0700124 log.Println("[debug] Received work requeust")
David K. Bainbridge068e87d2016-06-30 13:53:19 -0700125 d.StatusChan <- StatusMsg{&work, -1, Pending, "", time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700126 go func() {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700127 worker := <-d.WorkerQueue
128
David K. Bainbridge97ee8052016-06-14 00:52:07 -0700129 log.Println("[debug] Dispatching work request")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700130 worker <- work
131 }()
132 case update := <-d.StatusChan:
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700133 err := d.Storage.Put(update.Request.Info.Id, update)
134 if err != nil {
135 log.Printf("[error] Unable to update storage with status for '%s' : %s",
136 update.Request.Info.Id, err)
137 } else {
138 log.Printf("[debug] Storage updated for '%s'", update.Request.Info.Id)
139 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700140 case <-d.QuitChan:
141 log.Println("[info] Stopping dispatcher")
142 return
143 }
144 }
145 }()
146}
147
148func (d *Dispatcher) Stop() {
149 go func() {
150 d.QuitChan <- true
151 }()
152}