updated to add persistence to provisioning and roll that through the rest of the services
Change-Id: Ia0d5a49dc0d88dbe6780c76483fd2247ad631bdf
diff --git a/provisioner/consul_storage.go b/provisioner/consul_storage.go
index 7592ac0..5701c72 100644
--- a/provisioner/consul_storage.go
+++ b/provisioner/consul_storage.go
@@ -51,6 +51,11 @@
return err
}
+func (s *ConsulStorage) Delete(id string) error {
+ _, err := s.kv.Delete(PREFIX+id, nil)
+ return err
+}
+
func (s *ConsulStorage) Get(id string) (*StatusMsg, error) {
pair, _, err := s.kv.Get(PREFIX+id, nil)
if err != nil {
diff --git a/provisioner/dispatcher.go b/provisioner/dispatcher.go
index fed57a4..d448e19 100644
--- a/provisioner/dispatcher.go
+++ b/provisioner/dispatcher.go
@@ -3,6 +3,7 @@
import (
"log"
"os/exec"
+ "time"
)
type WorkRequest struct {
@@ -20,10 +21,11 @@
}
type StatusMsg struct {
- Request *WorkRequest `json:"request"`
- Worker int `json:"worker"`
- Status TaskStatus `json:"status"`
- Message string `json:"message"`
+ Request *WorkRequest `json:"request"`
+ Worker int `json:"worker"`
+ Status TaskStatus `json:"status"`
+ Message string `json:"message"`
+ Timestamp int64 `json:"timestamp"`
}
func NewWorker(id int, workerQueue chan chan WorkRequest, statusChan chan StatusMsg) Worker {
@@ -48,16 +50,18 @@
select {
case work := <-w.Work:
// Receive a work request.
- w.StatusChan <- StatusMsg{&work, w.ID, Running, ""}
+ w.StatusChan <- StatusMsg{&work, w.ID, Running, "", time.Now().Unix()}
log.Printf("[debug] RUN: %s %s %s %s %s %s",
work.Script, work.Info.Id, work.Info.Name,
work.Info.Ip, work.Info.Mac, work.Role)
err := exec.Command(work.Script, work.Info.Id, work.Info.Name,
work.Info.Ip, work.Info.Mac, work.Role).Run()
if err != nil {
- w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error()}
+ w.StatusChan <- StatusMsg{&work, w.ID, Failed, err.Error(),
+ time.Now().Unix()}
} else {
- w.StatusChan <- StatusMsg{&work, w.ID, Complete, ""}
+ w.StatusChan <- StatusMsg{&work, w.ID, Complete, "",
+ time.Now().Unix()}
}
case <-w.QuitChan:
// We have been asked to stop.
@@ -118,8 +122,8 @@
select {
case work := <-d.WorkQueue:
log.Println("[debug] Received work requeust")
+ d.StatusChan <- StatusMsg{&work, -1, Pending, "", time.Now().Unix()}
go func() {
- d.StatusChan <- StatusMsg{&work, -1, Pending, ""}
worker := <-d.WorkerQueue
log.Println("[debug] Dispatching work request")
diff --git a/provisioner/handlers.go b/provisioner/handlers.go
index 5bc6e57..7fd53fa 100644
--- a/provisioner/handlers.go
+++ b/provisioner/handlers.go
@@ -102,6 +102,24 @@
w.Write(bytes)
}
+func (c *Context) DeleteStatusHandler(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ id, ok := vars["nodeid"]
+ if !ok || strings.TrimSpace(id) == "" {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ err := c.storage.Delete(id)
+ if err != nil {
+ log.Printf("[warn] Error while deleting status fo '%s' from storage : %s", id, err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+}
+
func (c *Context) QueryStatusHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, ok := vars["nodeid"]
diff --git a/provisioner/provisioner.go b/provisioner/provisioner.go
index 559e23c..41240b5 100644
--- a/provisioner/provisioner.go
+++ b/provisioner/provisioner.go
@@ -52,6 +52,7 @@
router.HandleFunc("/provision/", context.ProvisionRequestHandler).Methods("POST")
router.HandleFunc("/provision/", context.ListRequestsHandler).Methods("GET")
router.HandleFunc("/provision/{nodeid}", context.QueryStatusHandler).Methods("GET")
+ router.HandleFunc("/provision/{nodeid}", context.DeleteStatusHandler).Methods("DELETE")
http.Handle("/", router)
// Start the dispatcher and workers
diff --git a/provisioner/storage.go b/provisioner/storage.go
index b12b7ef..2d2fb6d 100644
--- a/provisioner/storage.go
+++ b/provisioner/storage.go
@@ -9,6 +9,7 @@
type Storage interface {
Put(id string, update StatusMsg) error
Get(id string) (*StatusMsg, error)
+ Delete(id string) error
List() ([]StatusMsg, error)
}
@@ -51,6 +52,11 @@
return &m, nil
}
+func (s *MemoryStorage) Delete(id string) error {
+ delete(s.Data, id)
+ return nil
+}
+
func (s *MemoryStorage) List() ([]StatusMsg, error) {
r := make([]StatusMsg, len(s.Data))
i := 0
diff --git a/provisioner/task.go b/provisioner/task.go
index 4017268..ca0d430 100644
--- a/provisioner/task.go
+++ b/provisioner/task.go
@@ -22,17 +22,3 @@
}
return "INVALID TASK STATUS"
}
-
-type Task struct {
- nodeId string
- status TaskStatus
-}
-
-type TaskQueueEntry struct {
- previous *TaskQueueEntry
- next *TaskQueueEntry
- task *Task
-}
-
-type TaskQueue struct {
-}