blob: 44e16e081d97c597f92e7ba132a54c8e3bac0c6e [file] [log] [blame]
David K. Bainbridgef0da8732016-06-01 16:15:37 -07001package main
2
3import (
4 "log"
5 "os/exec"
6)
7
8type WorkRequest struct {
9 Info *RequestInfo
10 Script string
11 Role string
12}
13
14type Worker struct {
15 ID int
16 Work chan WorkRequest
17 StatusChan chan StatusMsg
18 WorkerQueue chan chan WorkRequest
19 QuitChan chan bool
20}
21
22type StatusMsg struct {
23 Request *WorkRequest
24 Worker int
25 Status TaskStatus
26 Message string
27}
28
29func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
30 // Create, and return the worker.
31 worker := Worker{
32 ID: id,
33 Work: make(chan WorkRequest),
34 StatusChan: statusChan,
35 WorkerQueue: workerQueue,
36 QuitChan: make(chan bool)}
37
38 return worker
39}
40
41func (w *Worker) Start() {
42 go func() {
43 for {
44 // Add ourselves into the worker queue.
45 w.WorkerQueue <- w.Work
46
47 select {
48 case work := <-w.Work:
49 // Receive a work request.
50 w.StatusChan <- StatusMsg{&work, w.ID, Running, ""}
51 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
52 work.Info.Ip, work.Info.Mac, work.Role).Run()
53 if err != nil {
54 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error()}
55 } else {
56 w.StatusChan <- StatusMsg{&work, w.ID, Complete, ""}
57 }
58 case <-w.QuitChan:
59 // We have been asked to stop.
60 log.Printf("worker%d stopping\n", w.ID)
61 return
62 }
63 }
64 }()
65}
66
67func (w *Worker) Stop() {
68 go func() {
69 w.QuitChan <- true
70 }()
71}
72
73type Dispatcher struct {
74 Storage Storage
75 WorkQueue chan WorkRequest
76 WorkerQueue chan chan WorkRequest
77 StatusChan chan StatusMsg
78 QuitChan chan bool
79 NumWorkers int
80}
81
82func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
83 d := Dispatcher{
84 Storage: storage,
85 WorkQueue: make(chan WorkRequest, 100),
86 StatusChan: make(chan StatusMsg, 100),
87 NumWorkers: numWorkers,
88 WorkerQueue: make(chan chan WorkRequest, numWorkers),
89 QuitChan: make(chan bool),
90 }
91
92 return &d
93}
94
95func (d *Dispatcher) Dispatch(info *RequestInfo, role string) error {
96 d.WorkQueue <- WorkRequest{
97 Info: info,
98 Role: role,
99 }
100 return nil
101}
102
103func (d *Dispatcher) Start() {
104 // Now, create all of our workers.
105 for i := 0; i < d.NumWorkers; i++ {
106 log.Printf("Creating worker %d", i)
107 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
108 worker.Start()
109 }
110
111 go func() {
112 for {
113 select {
114 case work := <-d.WorkQueue:
115 log.Println("Received work requeust")
116 go func() {
117 d.StatusChan <- StatusMsg{&work, -1, Pending, ""}
118 worker := <-d.WorkerQueue
119
120 log.Println("Dispatching work request")
121 worker <- work
122 }()
123 case update := <-d.StatusChan:
124 d.Storage.Put(update.Request.Info.Id, update)
125 case <-d.QuitChan:
126 log.Println("[info] Stopping dispatcher")
127 return
128 }
129 }
130 }()
131}
132
133func (d *Dispatcher) Stop() {
134 go func() {
135 d.QuitChan <- true
136 }()
137}