blob: 29c4460bda0504d6729458aae89d5ad703b344a3 [file] [log] [blame]
// Copyright 2016 Open Networking Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"os/exec"
"time"
)
type WorkRequest struct {
Info *RequestInfo
Script string
Role string
}
type Worker struct {
ID int
Work chan WorkRequest
StatusChan chan StatusMsg
WorkerQueue chan chan WorkRequest
QuitChan chan bool
}
type StatusMsg struct {
Request *WorkRequest `json:"request"`
Worker int `json:"worker"`
Status TaskStatus `json:"status"`
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
}
func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
StatusChan: statusChan,
WorkerQueue: workerQueue,
QuitChan: make(chan bool),
}
return worker
}
func (w *Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
select {
case work := <-w.Work:
// Receive a work request.
w.StatusChan <- StatusMsg{&work, w.ID, Running, "", time.Now().Unix()}
log.Debugf("RUN: %s %s %s %s %s %s",
work.Script, work.Info.Id, work.Info.Name,
work.Info.Ip, work.Info.Mac, work.Role)
err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
work.Info.Ip, work.Info.Mac, work.Role).Run()
if err != nil {
w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error(),
time.Now().Unix()}
} else {
w.StatusChan <- StatusMsg{&work, w.ID, Complete, "",
time.Now().Unix()}
}
case <-w.QuitChan:
// We have been asked to stop.
log.Infof("worker%d stopping\n", w.ID)
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
type Dispatcher struct {
Storage Storage
WorkQueue chan WorkRequest
WorkerQueue chan chan WorkRequest
StatusChan chan StatusMsg
QuitChan chan bool
NumWorkers int
}
func NewDispatcher(numWorkers int, storage Storage) *Dispatcher {
d := Dispatcher{
Storage: storage,
WorkQueue: make(chan WorkRequest, 100),
StatusChan: make(chan StatusMsg, 100),
NumWorkers: numWorkers,
WorkerQueue: make(chan chan WorkRequest, numWorkers),
QuitChan: make(chan bool),
}
return &d
}
func (d *Dispatcher) Dispatch(info *RequestInfo, role string, script string) error {
d.WorkQueue <- WorkRequest{
Info: info,
Script: script,
Role: role,
}
return nil
}
func (d *Dispatcher) Start() {
// Now, create all of our workers.
for i := 0; i < d.NumWorkers; i++ {
log.Infof("Creating worker %d", i)
worker := NewWorker(i, d.WorkerQueue, d.StatusChan)
worker.Start()
}
go func() {
for {
select {
case work := <-d.WorkQueue:
log.Debugf("Received work requeust")
d.StatusChan <- StatusMsg{&work, -1, Pending, "", time.Now().Unix()}
go func() {
worker := <-d.WorkerQueue
log.Debugf("Dispatching work request")
worker <- work
}()
case update := <-d.StatusChan:
err := d.Storage.Put(update.Request.Info.Id, update)
if err != nil {
log.Errorf("Unable to update storage with status for '%s' : %s",
update.Request.Info.Id, err)
} else {
log.Debugf("Storage updated for '%s'", update.Request.Info.Id)
}
case <-d.QuitChan:
log.Infof("Stopping dispatcher")
return
}
}
}()
}
func (d *Dispatcher) Stop() {
go func() {
d.QuitChan <- true
}()
}