blob: fed57a4833ea7d28c9ed97aee5d9599437c95b0d [file] [log] [blame]
David K. Bainbridgef0da8732016-06-01 16:15:37 -07001package main
2
3import (
4 "log"
5 "os/exec"
6)
7
8type WorkRequest struct {
9 Info *RequestInfo
10 Script string
11 Role string
12}
13
14type Worker struct {
15 ID int
16 Work chan WorkRequest
17 StatusChan chan StatusMsg
18 WorkerQueue chan chan WorkRequest
19 QuitChan chan bool
20}
21
22type StatusMsg struct {
David K. Bainbridge97ee8052016-06-14 00:52:07 -070023 Request *WorkRequest `json:"request"`
24 Worker int `json:"worker"`
25 Status TaskStatus `json:"status"`
26 Message string `json:"message"`
David K. Bainbridgef0da8732016-06-01 16:15:37 -070027}
28
29func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
30 // Create, and return the worker.
31 worker := Worker{
32 ID: id,
33 Work: make(chan WorkRequest),
34 StatusChan: statusChan,
35 WorkerQueue: workerQueue,
David K. Bainbridge38501582016-06-01 18:15:45 -070036 QuitChan: make(chan bool),
37 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -070038
39 return worker
40}
41
42func (w *Worker) Start() {
43 go func() {
44 for {
45 // Add ourselves into the worker queue.
46 w.WorkerQueue <- w.Work
47
48 select {
49 case work := <-w.Work:
50 // Receive a work request.
51 w.StatusChan <- StatusMsg{&work, w.ID, Running, ""}
David K. Bainbridge97ee8052016-06-14 00:52:07 -070052 log.Printf("[debug] RUN: %s %s %s %s %s %s",
David K. Bainbridge38501582016-06-01 18:15:45 -070053 work.Script, work.Info.Id, work.Info.Name,
54 work.Info.Ip, work.Info.Mac, work.Role)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070055 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
56 work.Info.Ip, work.Info.Mac, work.Role).Run()
57 if err != nil {
58 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error()}
59 } else {
60 w.StatusChan <- StatusMsg{&work, w.ID, Complete, ""}
61 }
62 case <-w.QuitChan:
63 // We have been asked to stop.
64 log.Printf("worker%d stopping\n", w.ID)
65 return
66 }
67 }
68 }()
69}
70
71func (w *Worker) Stop() {
72 go func() {
73 w.QuitChan <- true
74 }()
75}
76
77type Dispatcher struct {
78 Storage Storage
79 WorkQueue chan WorkRequest
80 WorkerQueue chan chan WorkRequest
81 StatusChan chan StatusMsg
82 QuitChan chan bool
83 NumWorkers int
84}
85
86func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
87 d := Dispatcher{
88 Storage: storage,
89 WorkQueue: make(chan WorkRequest, 100),
90 StatusChan: make(chan StatusMsg, 100),
91 NumWorkers: numWorkers,
92 WorkerQueue: make(chan chan WorkRequest, numWorkers),
93 QuitChan: make(chan bool),
94 }
95
96 return &d
97}
98
David K. Bainbridge38501582016-06-01 18:15:45 -070099func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700100 d.WorkQueue <- WorkRequest{
David K. Bainbridge38501582016-06-01 18:15:45 -0700101 Info: info,
102 Script: script,
103 Role: role,
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700104 }
105 return nil
106}
107
108func (d *Dispatcher) Start() {
109 // Now, create all of our workers.
110 for i := 0; i < d.NumWorkers; i++ {
111 log.Printf("Creating worker %d", i)
112 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
113 worker.Start()
114 }
115
116 go func() {
117 for {
118 select {
119 case work := <-d.WorkQueue:
David K. Bainbridge97ee8052016-06-14 00:52:07 -0700120 log.Println("[debug] Received work requeust")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700121 go func() {
122 d.StatusChan <- StatusMsg{&work, -1, Pending, ""}
123 worker := <-d.WorkerQueue
124
David K. Bainbridge97ee8052016-06-14 00:52:07 -0700125 log.Println("[debug] Dispatching work request")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700126 worker <- work
127 }()
128 case update := <-d.StatusChan:
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700129 err := d.Storage.Put(update.Request.Info.Id, update)
130 if err != nil {
131 log.Printf("[error] Unable to update storage with status for '%s' : %s",
132 update.Request.Info.Id, err)
133 } else {
134 log.Printf("[debug] Storage updated for '%s'", update.Request.Info.Id)
135 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700136 case <-d.QuitChan:
137 log.Println("[info] Stopping dispatcher")
138 return
139 }
140 }
141 }()
142}
143
144func (d *Dispatcher) Stop() {
145 go func() {
146 d.QuitChan <- true
147 }()
148}