VOL-580: Update envoy to support either consul or etcd in a kubernetes cluster

Envoy is refactored such that it interworks with either consul or etcd. The refactoring
should not have changed any of envoy's behaviour with consul. The update was tested with
only the vcli, envoy, vcore, and KV-store (consul or etcd) containers:
- tested with consul on a kubernetes 3-node cluster
- tested with etcd on a kubernetes 3-node cluster
- tested with consul on a 3-node docker swarm
- container and node failure testing not performed

This update enables some experimentation of envoy in the kubernetes environment.
There is more work to be done.  Future updates could:
- replace the current polling of etcd's assignment key with an asynchronous technique
- make use of data consistency options in etcd get requests
- make greater use of object orientation

Change-Id: Ia22893cdf331967eee4d13e060fd2c5ba7adb321
diff --git a/envoy/go/envoyd/envoyd.go b/envoy/go/envoyd/envoyd.go
index a64271a..6523c90 100644
--- a/envoy/go/envoyd/envoyd.go
+++ b/envoy/go/envoyd/envoyd.go
@@ -1,6 +1,7 @@
 package main // import "ciena.com/envoyd"
 
 import (
+	"context"
 	"os"
 	"os/exec"
 	"fmt"
@@ -16,6 +17,7 @@
 	"flag"
 	"bufio"
 	consulapi "github.com/hashicorp/consul/api"
+	etcdapi "github.com/coreos/etcd/clientv3"
 )
 
 // DATA STRUCTURES
@@ -34,6 +36,9 @@
 	consulapi *consulapi.Client
 }
 
+type KvConnectFunc func(string, string) (error)
+type KvMonitorFunc func()
+
 type EnvoyConfigVars struct {
 	VolthaVip string
 	VolthaRR []string
@@ -69,10 +74,16 @@
 	vcorePort string
 	envoyGrpcPort string
 	consulPort string
+	kvStore string
+	kvSvcName string
+	kvPort string
+	kvConnect map[string]KvConnectFunc
+	kvMonitor map[string]KvMonitorFunc
 	retries int
 	waitTime int
 	// Runtime variables
 	consul * consulapi.Client
+	etcd * etcdapi.Client
 	vcoreHostIpName string
 	vcoreIdName string
 	vc []VolthaClusterEntry
@@ -101,6 +112,9 @@
 		consulSvcName: "consul",
 		vcorePort: "50556",
 		consulPort: "8500",
+		kvStore: "consul",
+		kvSvcName: "consul",
+		kvPort: "8500",
 		retries: 10,
 		waitTime: 2,
 		// Runtime variables
@@ -110,6 +124,12 @@
 		restartEpoch: 0,
 	}
 	ec = &envCtrl
+	ec.kvConnect = make(map[string]KvConnectFunc)
+	ec.kvConnect["consul"] = ec.consulConnect
+	ec.kvConnect["etcd"] = ec.etcdConnect
+	ec.kvMonitor = make(map[string]KvMonitorFunc)
+	ec.kvMonitor["consul"] = ec.monitorConsulKey
+	ec.kvMonitor["etcd"] = ec.monitorEtcdKey
 	return
 }
 
@@ -142,6 +162,19 @@
 	return
 }
 
+func (ec * EnvoyControl) etcdConnect(serviceName string, port string) (err error) {
+	// Fire up an etcd client to access the kv store
+	cfg := etcdapi.Config {
+		Endpoints: []string{serviceName + ":" + port},
+		DialTimeout: 5 * time.Second,
+	}
+	ec.etcd, err = etcdapi.New(cfg)
+	if err != nil {
+		log.Fatal("Failed to create etcd client, aborting...")
+		return
+	}
+	return
+}
 
 //NewConsul returns a Client interface for given consulapi address
 func NewConsulClient(addr string) (*client, error) {
@@ -355,7 +388,7 @@
 	return
 }
 
-func (ec * EnvoyControl) prepareEnvoyConfig(kvp * consulapi.KVPair, ecv * EnvoyConfigVars) (err error) {
+func (ec * EnvoyControl) prepareEnvoyConfig(keyValue []byte, ecv * EnvoyConfigVars) (err error) {
 	var vCluster []VolthaClusterEntry
 
 	ecv.HttpPort = ec.envoyHttpPort
@@ -365,7 +398,7 @@
 
 	// Extract all values from the KV record
 	// In the future, the values should all be compared to what we currently have 
-	vCluster, err = ec.parseAssignment([]byte(kvp.Value))
+	vCluster, err = ec.parseAssignment(keyValue)
 	if err == nil {
 		ec.vc = vCluster // For future use to determine if there's been a real change
 		//templateValues["VolthaRR"] = []string{}
@@ -376,16 +409,16 @@
 			ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
 		}
 	} else {
-		log.Fatal("Couldn't parse the KV record %s: %s", string(kvp.Value), err.Error())
+		log.Fatal("Couldn't parse the KV record %s: %s", string(keyValue), err.Error())
 	}
 	return
 }
 
-func (ec * EnvoyControl) runEnvoy(kvp * consulapi.KVPair) {
+func (ec * EnvoyControl) runEnvoy(keyValue []byte) {
 	var err error
 	var ecv EnvoyConfigVars
 
-	if err = ec.prepareEnvoyConfig(kvp, &ecv); err != nil {
+	if err = ec.prepareEnvoyConfig(keyValue, &ecv); err != nil {
 		log.Fatal("Error preparing envoy config variables, aborting: %s", err.Error())
 	}
 
@@ -394,7 +427,9 @@
 	go ec.startEnvoy()
 }
 
-func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (kvp * consulapi.KVPair, meta * consulapi.QueryMeta, err error) {
+func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (value []byte, meta * consulapi.QueryMeta, err error) {
+
+	var kvp *consulapi.KVPair
 
 	kv := ec.consul.KV()
 	// Get the initial values of the assignment key which contains individual
@@ -409,6 +444,7 @@
 			kvp, meta, err = kv.Get(ec.assignmentKey, qo)
 		} else if kvp != nil && len(string(kvp.Value)) > 10 {
 			// A valid read, return
+			value = kvp.Value
 			break
 		} else {
 			log.Printf("Voltha assignment key invalid, retry %d", i+1)
@@ -422,16 +458,46 @@
 	return
 }
 
-func (ec * EnvoyControl) runMonitorEnvoy() {
+func (ec * EnvoyControl) readEtcdKey(key string) (value []byte, index int64, err error) {
+	// Get the initial values of the assignment key which contains individual
+	// voltha core IP addresses. This may be empty until voltha populates it
+	// so it must be checked
+	resp, err := ec.etcd.Get(context.Background(), ec.assignmentKey)
+	for i := 0; i < ec.retries; i++ {
+		if err != nil {
+			fmt.Println(err)
+			log.Printf("Unable to read assignment etcd key, retry %d", i+1)
+			time.Sleep(time.Duration(ec.waitTime) * time.Second)
+			resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
+		} else if resp != nil && len(resp.Kvs) > 0 && len(resp.Kvs[0].Value) > 10 {
+			// A valid read, return
+			kv := resp.Kvs[0]
+			value = kv.Value
+			index = kv.ModRevision
+			break
+		} else {
+			log.Printf("Voltha assignment key from etcd invalid, retry %d", i+1)
+			time.Sleep(time.Duration(ec.waitTime) * time.Second)
+			resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
+		}
+		if i == ec.retries {
+			log.Fatal("Failed to read assignment key from etcd after %d retries, aborting: %s", ec.retries, err.Error())
+		}
+	}
+	return
+}
+
+func (ec * EnvoyControl) monitorConsulKey() {
 	var err error
 	var qo consulapi.QueryOptions
 
 	// Get the initial values of the assignment key which contains individual
 	// voltha core IP addresses. This may be empty until voltha populates it
 	// so it must be checked
-	kvp, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
-	log.Printf("Starting Envoy")
-	ec.runEnvoy(kvp)
+	log.Printf("Monitoring consul key")
+	val, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
+	log.Printf("Starting Envoy, initial index = %d", meta.LastIndex)
+	ec.runEnvoy(val)
 
 	for {
 		qo.WaitIndex = meta.LastIndex
@@ -441,22 +507,57 @@
 			if qo.WaitIndex != meta.LastIndex {
 				break
 			}
-			kvp, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
+			val, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
 			if err != nil {
 				log.Fatal("Unable to read assignment consul key: %s\n", err.Error())
 			} else {
-				log.Println(string(kvp.Value))
+				log.Println(string(val))
 				log.Printf("meta.LastIndex = %d", meta.LastIndex)
 			}
 		}
 		// Fell through, the index has changed thus the key has changed
 
 		log.Printf("Starting Envoy")
-		ec.runEnvoy(kvp)
+		ec.runEnvoy(val)
 		log.Printf("meta.LastIndex = %d", meta.LastIndex)
 	}
 }
 
+func (ec * EnvoyControl) monitorEtcdKey() {
+	var err error
+
+	// TODO: Check into the use of any data consistency options
+	//
+	// Get the initial values of the assignment key which contains individual
+	// voltha core IP addresses. This may be empty until voltha populates it
+	// so it must be checked
+	log.Printf("Monitoring etcd key")
+	val, index, err := ec.readEtcdKey(ec.assignmentKey)
+	lastIndex := index
+	log.Printf("Starting Envoy, initial index = %d", lastIndex)
+	ec.runEnvoy(val)
+
+	for {
+		for {
+			time.Sleep(60 * time.Second)
+			val, index, err = ec.readEtcdKey(ec.assignmentKey)
+			if err != nil {
+				log.Fatal("Unable to read assignment etcd key: %s\n", err.Error())
+			} else if index != lastIndex {
+			       break
+			} else {
+				log.Println(string(val))
+				log.Printf("Last index = %d", index)
+			}
+		}
+		// Fell through, the index has changed thus the key has changed
+		log.Printf("Starting Envoy")
+		ec.runEnvoy(val)
+		log.Printf("Last index = %d", index)
+		lastIndex = index
+	}
+}
+
 func (ec * EnvoyControl) ParseCommandArguments() {
 	flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey,
 				"The key for the voltha assignment value in consul")
@@ -488,6 +589,15 @@
 	flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort,
 				"The port where the consul service api can be found")
 
+	flag.StringVar(&(ec.kvStore), "kv", ec.kvStore,
+		"The KV store: consul or etcd")
+
+	flag.StringVar(&(ec.kvSvcName), "kv-svc-name", ec.kvSvcName,
+		"The name of the KV store service")
+
+	flag.StringVar(&(ec.kvPort), "kv-port", ec.kvPort,
+		"The port where the KV service api can be found")
+
 	flag.StringVar(&(ec.envoyHttpPort), "http-port", ec.envoyHttpPort,
 				"The port where the http front-end is served ")
 
@@ -523,8 +633,8 @@
 		log.Fatal("Can't proceed without voltha's vIP address: %s", err.Error())
 	}
 
-	if err = ec.consulConnect(ec.consulSvcName, ec.consulPort); err != nil {
-		log.Fatal("error creating consul client aborting: %s", err.Error())
+	if err = ec.kvConnect[ec.kvStore](ec.kvSvcName, ec.kvPort); err != nil {
+		log.Fatal("Failed to create KV client, aborting: %s", err.Error())
 	}
 
 	if ec.httpDisabled == true && ec.httpsDisabled == true {
@@ -547,13 +657,18 @@
 
 	ec = NewEnvoyControl()
 	ec.ParseCommandArguments()
+	if ec.kvStore != "etcd" {
+		ec.kvStore = "consul"
+	}
+	log.Printf("KV-store %s at %s:%s", ec.kvStore, ec.kvSvcName, ec.kvPort)
+
 	if err = ec.Initialize(); err != nil {
 		log.Fatal("Envoy control initialization failed, aboring: %s", err.Error())
 	}
 
 
 	// Start envoy and monitor changes to the KV store to reload
-	// consul's config. This never returns unless somethign crashes.
-	ec.runMonitorEnvoy()
+	// consul's config. This never returns unless something crashes.
+	ec.kvMonitor[ec.kvStore]()
 	log.Fatal("Monitor returned, this shouldn't happen")
 }