update provisioner to support consul based storage

Change-Id: Iaa649396d23bbf28a4d99a188c9846aea1310cd0
diff --git a/provisioner/Godeps/Godeps.json b/provisioner/Godeps/Godeps.json
index 0b4660b..a96e9e3 100644
--- a/provisioner/Godeps/Godeps.json
+++ b/provisioner/Godeps/Godeps.json
@@ -14,6 +14,11 @@
 			"Rev": "9fa818a44c2bf1396a17f9d5a3c0f6dd39d2ff8e"
 		},
 		{
+                        "ImportPath": "github.com/hashicorp/consul",
+                        "Comment": "v0.6.4-341-g22938ab",
+                        "Rev": "22938ab8b3ca069e490a4897a8ec13308ac61605"
+                },
+		{
 			"ImportPath": "github.com/kelseyhightower/envconfig",
 			"Comment": "1.1.0-17-g91921eb",
 			"Rev": "91921eb4cf999321cdbeebdba5a03555800d493b"
diff --git a/provisioner/consul_storage.go b/provisioner/consul_storage.go
new file mode 100644
index 0000000..7592ac0
--- /dev/null
+++ b/provisioner/consul_storage.go
@@ -0,0 +1,87 @@
+package main
+
+import (
+	"encoding/json"
+	consul "github.com/hashicorp/consul/api"
+	"log"
+	"net/url"
+)
+
+const (
+	PREFIX = "cord/provisioner/"
+)
+
+type ConsulStorage struct {
+	client *consul.Client
+	kv     *consul.KV
+}
+
+func NewConsulStorage(spec string) (*ConsulStorage, error) {
+	conn, err := url.Parse(spec)
+	if err != nil {
+		return nil, err
+	}
+
+	cfg := consul.Config{
+		Address: conn.Host,
+		Scheme:  "http",
+	}
+
+	log.Printf("%+v", cfg)
+
+	client, err := consul.NewClient(&cfg)
+	if err != nil {
+		return nil, err
+	}
+	return &ConsulStorage{
+		client: client,
+		kv:     client.KV(),
+	}, nil
+}
+
+func (s *ConsulStorage) Put(id string, update StatusMsg) error {
+	data, err := json.Marshal(update)
+	if err != nil {
+		return err
+	}
+	_, err = s.kv.Put(&consul.KVPair{
+		Key:   PREFIX + id,
+		Value: data,
+	}, nil)
+	return err
+}
+
+func (s *ConsulStorage) Get(id string) (*StatusMsg, error) {
+	pair, _, err := s.kv.Get(PREFIX+id, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	if pair == nil {
+		return nil, nil
+	}
+
+	var record StatusMsg
+	err = json.Unmarshal([]byte(pair.Value), &record)
+	if err != nil {
+		return nil, err
+	}
+	return &record, nil
+}
+
+func (s *ConsulStorage) List() ([]StatusMsg, error) {
+	pairs, _, err := s.kv.List(PREFIX, nil)
+	if err != nil {
+		return nil, err
+	}
+	result := make([]StatusMsg, len(pairs))
+	i := 0
+	for _, pair := range pairs {
+		err = json.Unmarshal([]byte(pair.Value), &(result[i]))
+		if err != nil {
+			return nil, err
+		}
+		i += 1
+	}
+	return result, nil
+}
diff --git a/provisioner/dispatcher.go b/provisioner/dispatcher.go
index bf6ebbf..fed57a4 100644
--- a/provisioner/dispatcher.go
+++ b/provisioner/dispatcher.go
@@ -126,7 +126,13 @@
 					worker <- work
 				}()
 			case update := <-d.StatusChan:
-				d.Storage.Put(update.Request.Info.Id, update)
+				err := d.Storage.Put(update.Request.Info.Id, update)
+				if err != nil {
+					log.Printf("[error] Unable to update storage with status for '%s' : %s",
+						update.Request.Info.Id, err)
+				} else {
+					log.Printf("[debug] Storage updated for '%s'", update.Request.Info.Id)
+				}
 			case <-d.QuitChan:
 				log.Println("[info] Stopping dispatcher")
 				return
diff --git a/provisioner/handlers.go b/provisioner/handlers.go
index 32812da..5bc6e57 100644
--- a/provisioner/handlers.go
+++ b/provisioner/handlers.go
@@ -65,7 +65,7 @@
 	}
 
 	if !c.validateData(&info) {
-		log.Printf("[errpr] Provisioning request not valid for '%s'", info.Name)
+		log.Printf("[error] Provisioning request not valid for '%s'", info.Name)
 		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
@@ -84,7 +84,7 @@
 	}
 	err = c.dispatcher.Dispatch(&info, role, script)
 	if err != nil {
-		log.Printf("[errpr] unable to dispatch provisioning request for node '%s' : %s", info.Name, err)
+		log.Printf("[error] unable to dispatch provisioning request for node '%s' : %s", info.Name, err)
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
diff --git a/provisioner/provisioner.go b/provisioner/provisioner.go
index 362c06d..559e23c 100644
--- a/provisioner/provisioner.go
+++ b/provisioner/provisioner.go
@@ -14,6 +14,7 @@
 	RoleSelectorURL string `default:"" envconfig:"role_selector_url"`
 	DefaultRole     string `default:"compute-node" envconfig:"default_role"`
 	Script          string `default:"do-ansible"`
+	StorageURL      string `default:"memory:" envconfig:"storage_url"`
 }
 
 type Context struct {
@@ -36,11 +37,16 @@
 	    Port:            %d
 	    RoleSelectorURL: %s
 	    DefaultRole:     %s
-	    Script:          %s`,
+	    Script:          %s
+	    StorageURL:      %s`,
 		context.config.Listen, context.config.Port, context.config.RoleSelectorURL,
-		context.config.DefaultRole, context.config.Script)
+		context.config.DefaultRole, context.config.Script, context.config.StorageURL)
 
-	context.storage = NewMemoryStorage()
+	context.storage, err = NewStorage(context.config.StorageURL)
+	if err != nil {
+		log.Fatalf("[error] Unable to connect to specified storage '%s' : %s",
+			context.config.StorageURL, err)
+	}
 
 	router := mux.NewRouter()
 	router.HandleFunc("/provision/", context.ProvisionRequestHandler).Methods("POST")
diff --git a/provisioner/storage.go b/provisioner/storage.go
index 92c5339..b12b7ef 100644
--- a/provisioner/storage.go
+++ b/provisioner/storage.go
@@ -1,11 +1,33 @@
 package main
 
+import (
+	"fmt"
+	"net/url"
+	"strings"
+)
+
 type Storage interface {
 	Put(id string, update StatusMsg) error
 	Get(id string) (*StatusMsg, error)
 	List() ([]StatusMsg, error)
 }
 
+func NewStorage(spec string) (Storage, error) {
+	conn, err := url.Parse(spec)
+	if err != nil {
+		return nil, err
+	}
+
+	switch strings.ToUpper(conn.Scheme) {
+	case "MEMORY":
+		return NewMemoryStorage(), nil
+	case "CONSUL":
+		return NewConsulStorage(spec)
+	default:
+		return nil, fmt.Errorf("Unknown storage scheme specified, '%s'", conn.Scheme)
+	}
+}
+
 type MemoryStorage struct {
 	Data map[string]StatusMsg
 }