update provisioner to support consul based storage
Change-Id: Iaa649396d23bbf28a4d99a188c9846aea1310cd0
diff --git a/provisioner/Godeps/Godeps.json b/provisioner/Godeps/Godeps.json
index 0b4660b..a96e9e3 100644
--- a/provisioner/Godeps/Godeps.json
+++ b/provisioner/Godeps/Godeps.json
@@ -14,6 +14,11 @@
"Rev": "9fa818a44c2bf1396a17f9d5a3c0f6dd39d2ff8e"
},
{
+ "ImportPath": "github.com/hashicorp/consul",
+ "Comment": "v0.6.4-341-g22938ab",
+ "Rev": "22938ab8b3ca069e490a4897a8ec13308ac61605"
+ },
+ {
"ImportPath": "github.com/kelseyhightower/envconfig",
"Comment": "1.1.0-17-g91921eb",
"Rev": "91921eb4cf999321cdbeebdba5a03555800d493b"
diff --git a/provisioner/consul_storage.go b/provisioner/consul_storage.go
new file mode 100644
index 0000000..7592ac0
--- /dev/null
+++ b/provisioner/consul_storage.go
@@ -0,0 +1,87 @@
+package main
+
+import (
+ "encoding/json"
+ consul "github.com/hashicorp/consul/api"
+ "log"
+ "net/url"
+)
+
+const (
+ PREFIX = "cord/provisioner/"
+)
+
+type ConsulStorage struct {
+ client *consul.Client
+ kv *consul.KV
+}
+
+func NewConsulStorage(spec string) (*ConsulStorage, error) {
+ conn, err := url.Parse(spec)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg := consul.Config{
+ Address: conn.Host,
+ Scheme: "http",
+ }
+
+ log.Printf("%+v", cfg)
+
+ client, err := consul.NewClient(&cfg)
+ if err != nil {
+ return nil, err
+ }
+ return &ConsulStorage{
+ client: client,
+ kv: client.KV(),
+ }, nil
+}
+
+func (s *ConsulStorage) Put(id string, update StatusMsg) error {
+ data, err := json.Marshal(update)
+ if err != nil {
+ return err
+ }
+ _, err = s.kv.Put(&consul.KVPair{
+ Key: PREFIX + id,
+ Value: data,
+ }, nil)
+ return err
+}
+
+func (s *ConsulStorage) Get(id string) (*StatusMsg, error) {
+ pair, _, err := s.kv.Get(PREFIX+id, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if pair == nil {
+ return nil, nil
+ }
+
+ var record StatusMsg
+ err = json.Unmarshal([]byte(pair.Value), &record)
+ if err != nil {
+ return nil, err
+ }
+ return &record, nil
+}
+
+func (s *ConsulStorage) List() ([]StatusMsg, error) {
+ pairs, _, err := s.kv.List(PREFIX, nil)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]StatusMsg, len(pairs))
+ i := 0
+ for _, pair := range pairs {
+ err = json.Unmarshal([]byte(pair.Value), &(result[i]))
+ if err != nil {
+ return nil, err
+ }
+ i += 1
+ }
+ return result, nil
+}
diff --git a/provisioner/dispatcher.go b/provisioner/dispatcher.go
index bf6ebbf..fed57a4 100644
--- a/provisioner/dispatcher.go
+++ b/provisioner/dispatcher.go
@@ -126,7 +126,13 @@
worker <- work
}()
case update := <-d.StatusChan:
- d.Storage.Put(update.Request.Info.Id, update)
+ err := d.Storage.Put(update.Request.Info.Id, update)
+ if err != nil {
+ log.Printf("[error] Unable to update storage with status for '%s' : %s",
+ update.Request.Info.Id, err)
+ } else {
+ log.Printf("[debug] Storage updated for '%s'", update.Request.Info.Id)
+ }
case <-d.QuitChan:
log.Println("[info] Stopping dispatcher")
return
diff --git a/provisioner/handlers.go b/provisioner/handlers.go
index 32812da..5bc6e57 100644
--- a/provisioner/handlers.go
+++ b/provisioner/handlers.go
@@ -65,7 +65,7 @@
}
if !c.validateData(&info) {
- log.Printf("[errpr] Provisioning request not valid for '%s'", info.Name)
+ log.Printf("[error] Provisioning request not valid for '%s'", info.Name)
w.WriteHeader(http.StatusBadRequest)
return
}
@@ -84,7 +84,7 @@
}
err = c.dispatcher.Dispatch(&info, role, script)
if err != nil {
- log.Printf("[errpr] unable to dispatch provisioning request for node '%s' : %s", info.Name, err)
+ log.Printf("[error] unable to dispatch provisioning request for node '%s' : %s", info.Name, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
diff --git a/provisioner/provisioner.go b/provisioner/provisioner.go
index 362c06d..559e23c 100644
--- a/provisioner/provisioner.go
+++ b/provisioner/provisioner.go
@@ -14,6 +14,7 @@
RoleSelectorURL string `default:"" envconfig:"role_selector_url"`
DefaultRole string `default:"compute-node" envconfig:"default_role"`
Script string `default:"do-ansible"`
+ StorageURL string `default:"memory:" envconfig:"storage_url"`
}
type Context struct {
@@ -36,11 +37,16 @@
Port: %d
RoleSelectorURL: %s
DefaultRole: %s
- Script: %s`,
+ Script: %s
+ StorageURL: %s`,
context.config.Listen, context.config.Port, context.config.RoleSelectorURL,
- context.config.DefaultRole, context.config.Script)
+ context.config.DefaultRole, context.config.Script, context.config.StorageURL)
- context.storage = NewMemoryStorage()
+ context.storage, err = NewStorage(context.config.StorageURL)
+ if err != nil {
+ log.Fatalf("[error] Unable to connect to specified storage '%s' : %s",
+ context.config.StorageURL, err)
+ }
router := mux.NewRouter()
router.HandleFunc("/provision/", context.ProvisionRequestHandler).Methods("POST")
diff --git a/provisioner/storage.go b/provisioner/storage.go
index 92c5339..b12b7ef 100644
--- a/provisioner/storage.go
+++ b/provisioner/storage.go
@@ -1,11 +1,33 @@
package main
+import (
+ "fmt"
+ "net/url"
+ "strings"
+)
+
type Storage interface {
Put(id string, update StatusMsg) error
Get(id string) (*StatusMsg, error)
List() ([]StatusMsg, error)
}
+func NewStorage(spec string) (Storage, error) {
+ conn, err := url.Parse(spec)
+ if err != nil {
+ return nil, err
+ }
+
+ switch strings.ToUpper(conn.Scheme) {
+ case "MEMORY":
+ return NewMemoryStorage(), nil
+ case "CONSUL":
+ return NewConsulStorage(spec)
+ default:
+ return nil, fmt.Errorf("Unknown storage scheme specified, '%s'", conn.Scheme)
+ }
+}
+
type MemoryStorage struct {
Data map[string]StatusMsg
}