blob: 29c4460bda0504d6729458aae89d5ad703b344a3 [file] [log] [blame]
Brian O'Connor6a37ea92017-08-03 22:45:59 -07001// Copyright 2016 Open Networking Foundation
David K. Bainbridgedf9df632016-07-07 18:47:46 -07002//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
David K. Bainbridgef0da8732016-06-01 16:15:37 -070014package main
15
16import (
David K. Bainbridgef0da8732016-06-01 16:15:37 -070017 "os/exec"
David K. Bainbridge068e87d2016-06-30 13:53:19 -070018 "time"
David K. Bainbridgef0da8732016-06-01 16:15:37 -070019)
20
21type WorkRequest struct {
22 Info *RequestInfo
23 Script string
24 Role string
25}
26
27type Worker struct {
28 ID int
29 Work chan WorkRequest
30 StatusChan chan StatusMsg
31 WorkerQueue chan chan WorkRequest
32 QuitChan chan bool
33}
34
35type StatusMsg struct {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070036 Request *WorkRequest `json:"request"`
37 Worker int `json:"worker"`
38 Status TaskStatus `json:"status"`
39 Message string `json:"message"`
40 Timestamp int64 `json:"timestamp"`
David K. Bainbridgef0da8732016-06-01 16:15:37 -070041}
42
43func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
44 // Create, and return the worker.
45 worker := Worker{
46 ID: id,
47 Work: make(chan WorkRequest),
48 StatusChan: statusChan,
49 WorkerQueue: workerQueue,
David K. Bainbridge38501582016-06-01 18:15:45 -070050 QuitChan: make(chan bool),
51 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -070052
53 return worker
54}
55
56func (w *Worker) Start() {
57 go func() {
58 for {
59 // Add ourselves into the worker queue.
60 w.WorkerQueue <- w.Work
61
62 select {
63 case work := <-w.Work:
64 // Receive a work request.
David K. Bainbridge068e87d2016-06-30 13:53:19 -070065 w.StatusChan <- StatusMsg{&work, w.ID, Running, "", time.Now().Unix()}
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -070066 log.Debugf("RUN: %s %s %s %s %s %s",
David K. Bainbridge38501582016-06-01 18:15:45 -070067 work.Script, work.Info.Id, work.Info.Name,
68 work.Info.Ip, work.Info.Mac, work.Role)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070069 err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
70 work.Info.Ip, work.Info.Mac, work.Role).Run()
71 if err != nil {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070072 w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error(),
73 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070074 } else {
David K. Bainbridge068e87d2016-06-30 13:53:19 -070075 w.StatusChan <- StatusMsg{&work, w.ID, Complete, "",
76 time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -070077 }
78 case <-w.QuitChan:
79 // We have been asked to stop.
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -070080 log.Infof("worker%d stopping\n", w.ID)
David K. Bainbridgef0da8732016-06-01 16:15:37 -070081 return
82 }
83 }
84 }()
85}
86
87func (w *Worker) Stop() {
88 go func() {
89 w.QuitChan <- true
90 }()
91}
92
93type Dispatcher struct {
94 Storage Storage
95 WorkQueue chan WorkRequest
96 WorkerQueue chan chan WorkRequest
97 StatusChan chan StatusMsg
98 QuitChan chan bool
99 NumWorkers int
100}
101
102func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
103 d := Dispatcher{
104 Storage: storage,
105 WorkQueue: make(chan WorkRequest, 100),
106 StatusChan: make(chan StatusMsg, 100),
107 NumWorkers: numWorkers,
108 WorkerQueue: make(chan chan WorkRequest, numWorkers),
109 QuitChan: make(chan bool),
110 }
111
112 return &d
113}
114
David K. Bainbridge38501582016-06-01 18:15:45 -0700115func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700116 d.WorkQueue <- WorkRequest{
David K. Bainbridge38501582016-06-01 18:15:45 -0700117 Info: info,
118 Script: script,
119 Role: role,
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700120 }
121 return nil
122}
123
124func (d *Dispatcher) Start() {
125 // Now, create all of our workers.
126 for i := 0; i < d.NumWorkers; i++ {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700127 log.Infof("Creating worker %d", i)
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700128 worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
129 worker.Start()
130 }
131
132 go func() {
133 for {
134 select {
135 case work := <-d.WorkQueue:
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700136 log.Debugf("Received work requeust")
David K. Bainbridge068e87d2016-06-30 13:53:19 -0700137 d.StatusChan <- StatusMsg{&work, -1, Pending, "", time.Now().Unix()}
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700138 go func() {
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700139 worker := <-d.WorkerQueue
140
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700141 log.Debugf("Dispatching work request")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700142 worker <- work
143 }()
144 case update := <-d.StatusChan:
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700145 err := d.Storage.Put(update.Request.Info.Id, update)
146 if err != nil {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700147 log.Errorf("Unable to update storage with status for '%s' : %s",
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700148 update.Request.Info.Id, err)
149 } else {
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700150 log.Debugf("Storage updated for '%s'", update.Request.Info.Id)
David K. Bainbridge546cdc32016-06-29 15:30:22 -0700151 }
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700152 case <-d.QuitChan:
David K. Bainbridgea9c2e0a2016-07-01 18:33:50 -0700153 log.Infof("Stopping dispatcher")
David K. Bainbridgef0da8732016-06-01 16:15:37 -0700154 return
155 }
156 }
157 }()
158}
159
160func (d *Dispatcher) Stop() {
161 go func() {
162 d.QuitChan <- true
163 }()
164}