VOL-65 A complete re-factor of envoyd to use a more object oriented approach
also added command line options and defaults for most of them. Also
added envoy log forwarding. Updated the compose file to leverage the
newly added command line options.
Change-Id: Ib07d6a4dbe923ede24c8ce25b6b679c5957f9c53
diff --git a/compose/docker-compose-envoy-swarm.yml b/compose/docker-compose-envoy-swarm.yml
index a109af9..4aba002 100644
--- a/compose/docker-compose-envoy-swarm.yml
+++ b/compose/docker-compose-envoy-swarm.yml
@@ -17,8 +17,10 @@
DOCKER_HOST_IP: "${DOCKER_HOST_IP}"
entrypoint:
- /usr/local/bin/envoyd
- - -t /envoy/voltha-grpc-proxy.template.json
- - -c /envoy/voltha-grpc-proxy.json
+ - -envoy-cfg-template
+ - "/envoy/voltha-grpc-proxy.template.json"
+ - -envoy-config
+ - "/envoy/voltha-grpc-proxy.json"
networks:
- voltha-net
ports:
diff --git a/envoy/go/envoyd/envoyd.go b/envoy/go/envoyd/envoyd.go
index 6766539..57bd446 100644
--- a/envoy/go/envoyd/envoyd.go
+++ b/envoy/go/envoyd/envoyd.go
@@ -9,24 +9,17 @@
"time"
"net"
"io/ioutil"
+ "io"
"text/template"
"encoding/json"
+ "sync"
+ "flag"
+ "bufio"
consulapi "github.com/hashicorp/consul/api"
)
// DATA STRUCTURES
-type ConfigVars struct {
- VolthaVip string
- VolthaRR []string
-}
-
-type VolthaClusterEntry struct {
- Prefix string
- Id string
- Host string
-}
-
//Client provides an interface for getting data out of Consul
type Client interface {
// Get a Service from consulapi
@@ -41,26 +34,94 @@
consulapi *consulapi.Client
}
+type EnvoyConfigVars struct {
+ VolthaVip string
+ VolthaRR []string
+}
+
+type VolthaClusterEntry struct {
+ Prefix string
+ Id string
+ Host string
+}
+
// This struct is not used yet
// TODO: Update the daemon to use this structure to for a
// more object oriented implementation
type EnvoyControl struct {
- retrys int
+ // Command line parameters
+ assignmentKey string
+ envoyConfigTemplate string
+ envoyConfig string
+ vcoreSvcName string
+ consulSvcName string
+ vcorePort string
+ consulPort string
+ retries int
waitTime int
- cv ConfigVars
+ // Runtime variables
+ consul * consulapi.Client
+ vcoreHostIpName string
+ vcoreIdName string
vc []VolthaClusterEntry
- meta * consulapi.QueryMeta
- kvp * consulapi.KVPair
ipAddrs map[string][]string
+ restartEpoch int
+ reLock sync.Mutex // Exclusive access to the restartEpoch
}
-// CONSTANTS
-var assignmentKey string = "service/voltha/data/core/assignment"
-var vcoreHostIpName string = "host"
-var vcoreIdName string = "id"
-var restartEpoch int = 0
-var volthaPort string = "50556" // This will be passed inas an option.
-var consulPort string = "8500" // This will be passed in as an option.
+
+func NewEnvoyControl() (ec * EnvoyControl) {
+ var envCtrl EnvoyControl = EnvoyControl { // Default values
+ // Command line parameters
+ assignmentKey: "service/voltha/data/core/assignment",
+ envoyConfigTemplate: "/envoy/voltha-grpc-proxy.template.json",
+ envoyConfig: "/envoy/voltha-grpc-proxy.json",
+ //envoyLogFile: "/envoy/voltha_access_log.log",
+ vcoreSvcName: "vcore",
+ consulSvcName: "consul",
+ vcorePort: "50556",
+ consulPort: "8500",
+ retries: 10,
+ waitTime: 2,
+ // Runtime variables
+ vcoreHostIpName: "host",
+ vcoreIdName: "id",
+ ipAddrs: make(map[string][]string),
+ restartEpoch: 0,
+ }
+ ec = &envCtrl
+ return
+}
+
+func (ec * EnvoyControl) resolveServiceAddress(serviceName string) (err error) {
+ for i := 0; i < ec.retries; i++ {
+ ec.ipAddrs[serviceName], err = net.LookupHost(serviceName)
+ if err != nil {
+ log.Printf("%s name resolution failed %d time(s) retrying...", serviceName, i+1)
+ } else {
+ //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
+ break
+ }
+ time.Sleep(time.Duration(ec.waitTime) * time.Second)
+ }
+ if err != nil {
+ log.Printf("%s name resolution failed %d times gving up", serviceName, ec.retries)
+ }
+ return
+}
+
+func (ec * EnvoyControl) consulConnect(serviceName string, port string) (err error) {
+ // Fire up a consul client and get the kv store
+ cConfig := consulapi.DefaultNonPooledConfig()
+ cConfig.Address = ec.ipAddrs[serviceName][0] + ":" + port
+ ec.consul, err = consulapi.NewClient(cConfig)
+ if err != nil {
+ log.Fatal("error creating consul client aborting")
+ return
+ }
+ return
+}
+
//NewConsul returns a Client interface for given consulapi address
func NewConsulClient(addr string) (*client, error) {
@@ -102,27 +163,95 @@
}
// Starts envoy with the current restartEpoch
-func startEnvoy(cfg_file string) {
- cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(restartEpoch),
- "--config-path", cfg_file)
+func (ec * EnvoyControl) startEnvoy() {
+ var curEpoch int
+ var err error
+ var count int
- curEpoch := restartEpoch
- restartEpoch += 1
- if err := cmd.Start(); err != nil {
- log.Fatal(err)
+ ec.reLock.Lock() // Make sure we've got exclusive access to the variable
+ cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(ec.restartEpoch),
+ "--config-path", ec.envoyConfig, "--parent-shutdown-time-s", "10")
+
+ curEpoch = ec.restartEpoch
+ ec.restartEpoch += 1
+ ec.reLock.Unlock() // Done, release the lock.
+
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ log.Printf("Couldn't attach to stderr running envoy command")
panic(err)
}
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ log.Printf("Couldn't attach to stdout running envoy command")
+ panic(err)
+ }
+ so := bufio.NewReader(stdout)
+ se := bufio.NewReader(stderr)
+
+ if err = cmd.Start(); err != nil {
+ log.Fatal("Error starting envoy")
+ panic(err)
+ }
+ log.Printf("Envoy(%d) started", curEpoch)
+ soEof := false
+ seEof := false
+
+ data := make([]byte, 80)
+ log.Printf("Log forwarding for envoy(%d) started", curEpoch)
+ for {
+ data = make([]byte, 80)
+ count, err = so.Read(data)
+ log.Printf("ENVOY_LOG(%d): %s", curEpoch, string(data))
+ if err == io.EOF {
+ if seEof == true {
+ break
+ } else {
+ soEof = true
+ }
+ } else if err != nil {
+ log.Printf("Attempt to read envoy standard out failed")
+ panic(err)
+ } else if count > 0 {
+ log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
+ }
+ data = make([]byte, 80)
+ count, err = se.Read(data)
+ if err == io.EOF {
+ if soEof == true {
+ break
+ } else {
+ seEof = true
+ }
+ } else if err != nil {
+ log.Fatal("Attempt to read envoy standard err failed")
+ panic(err)
+ } else if count > 0 {
+ log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
+ }
+ }
log.Printf("Waiting on envoy %d to exit", curEpoch)
- if err := cmd.Wait(); err != nil {
- log.Fatal(err, "Unexpected exit code")
+ if err = cmd.Wait(); err != nil {
+ log.Printf("Envoy %d exited with an unexpected exit code", curEpoch)
+ panic(err)
}
log.Printf("Envoy %d exited", curEpoch)
+ // Check if this was the primary envoy, if so
+ // something went terribly wrong, panic to force
+ // forcefully exit.
+ ec.reLock.Lock()
+ if ec.restartEpoch == (curEpoch + 1) {
+ ec.reLock.Unlock()
+ log.Fatal("Last running envoy exited, aborting!")
+ panic("This should never happen")
+ }
+ ec.reLock.Unlock()
}
// This function will use the provided templete file to generate
// the targetfile substituting
-func updateEnvoyConfig(templateFile string, targetFile string, cv ConfigVars) {
+func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) {
var firstRun bool = true
f := func() (bool) {
var rtrn bool = firstRun
@@ -131,51 +260,42 @@
}
var funcs = template.FuncMap{"isFirst": f}
// Slurp up the template file.
- tplt, err := ioutil.ReadFile(templateFile)
+ tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate)
if err != nil {
- panic("ERROR reading the template file, aborting")
+ log.Fatal("ERROR reading the template file, aborting")
+ panic(err)
}
//fmt.Println(string(tplt))
configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
if err != nil {
+ log.Fatal("Unexpected error loading the Envoy template, aborting")
panic(err)
}
- outFile,err := os.Create(targetFile)
+ outFile,err := os.Create(ec.envoyConfig)
if err != nil {
+ log.Fatal("Unexpected error opening the Envoy config file for write, aborting")
panic(err)
}
- if err := configTemplate.Execute(outFile, cv); err != nil {
+ if err = configTemplate.Execute(outFile, ecv); err != nil {
+ log.Fatal("Unexpected error executing the Envoy config template, aborting")
panic(err)
}
- //cfgFile, err := ioutil.ReadFile(targetFile)
+ //cfgFile, err := ioutil.ReadFile(ec.envoyConfig)
//if err != nil {
- // panic("ERROR reading the config file, aborting")
+ // log.Fatal("ERROR reading the config file, aborting")
+ // panic(err)
//}
//fmt.Println(string(cfgFile))
-}
-
-func getServiceAddr(serviceName string, retrys int, waitTime int) (addrs []string, err error) {
- for i := 0; i < retrys; i++ {
- addrs,err = net.LookupHost(serviceName)
- if err != nil {
- log.Printf("%s name resolution failed %d time(s) retrying...\n", serviceName, i+1)
- } else {
- //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
- break
- }
- time.Sleep(time.Duration(waitTime) * time.Second)
- }
- if err != nil {
- log.Printf("%s name resolution failed %d times gving up\n", serviceName, retrys)
- }
return
}
-func parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
- var vc VolthaClusterEntry
+func (ec * EnvoyControl) parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
var f interface{}
+ var vc VolthaClusterEntry
+ //var isErr bool
- log.Printf("Parsing %s\n", string(jsonString))
+ log.Printf("Parsing %s", string(jsonString))
+ //err = json.Unmarshal(jsonString, &f)
err = json.Unmarshal(jsonString, &f)
if err != nil {
log.Fatal("Unable to parse json record %s", jsonString)
@@ -183,153 +303,212 @@
} else {
m := f.(map[string]interface{})
for k, v := range m {
+ isErr := false
vc.Prefix = k
//log.Printf("Processing key %s\n", k)
switch vv := v.(type) {
case map[string]interface{}:
for i, u := range vv {
- //log.Printf("Processing key %s\n", i)
+ //log.Printf("Processing key %sn", i)
switch uu := u.(type) {
case string:
- if i == vcoreHostIpName {
+ if i == ec.vcoreHostIpName {
vc.Host = uu
- } else if i == vcoreIdName {
+ } else if i == ec.vcoreIdName {
vc.Id = uu
} else {
- log.Printf("WARNING: unexpected descriptor,%s\n", i)
+ log.Printf("WARNING: unexpected descriptor,%s", i)
+ isErr = true
}
default:
log.Printf("WARNING: unexpected type, ")
log.Println(i, u)
+ isErr = true
}
}
default:
log.Printf("WARNING: unexpected type, ")
log.Println(k, v)
+ isErr = true
}
- vCluster = append(vCluster, vc)
+ if ! isErr {
+ vCluster = append(vCluster, vc)
+ }
}
}
log.Println("Parsing complete")
return
}
-func runEnvoy(meta * consulapi.QueryMeta, kvp * consulapi.KVPair, values * map[string]interface{}, cv ConfigVars,
- templatePath string, configPath string) {
- var err error
+func (ec * EnvoyControl) prepareEnvoyConfig(kvp * consulapi.KVPair, ecv * EnvoyConfigVars) (err error) {
var vCluster []VolthaClusterEntry
+ ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort
+
// Extract all values from the KV record
- vCluster, err = parseAssignment([]byte(kvp.Value))
+ // In the future, the values should all be compared to what we currently have
+ vCluster, err = ec.parseAssignment([]byte(kvp.Value))
if err == nil {
- (*values)["volthaRR"] = []string{}
+ ec.vc = vCluster // For future use to determine if there's been a real change
+ //templateValues["VolthaRR"] = []string{}
+ ecv.VolthaRR = []string{}
for i := range vCluster {
//log.Printf("Processing %s\n", vCluster[i].Host)
- (*values)["volthaRR"] = append((*values)["volthaRR"].([]string), vCluster[i].Host)
- cv.VolthaRR = append(cv.VolthaRR, vCluster[i].Host + ":" + volthaPort)
+ //templateValues["VolthaRR"] = append(templateValues["VolthaRR"].([]string), vCluster[i].Host)
+ ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
}
} else {
- log.Fatal("Couldn't parse the KV record %s\n", string(kvp.Value))
+ log.Fatal("Couldn't parse the KV record %s", string(kvp.Value))
+ panic(err)
+ }
+ return
+}
+
+func (ec * EnvoyControl) runEnvoy(kvp * consulapi.KVPair) {
+ var err error
+ var ecv EnvoyConfigVars
+
+ if err = ec.prepareEnvoyConfig(kvp, &ecv); err != nil {
+ log.Fatal("Error preparing envoy config variables, aborting")
panic(err)
}
// Now that we have the data loaded, update the envoy config and start envoy
- updateEnvoyConfig(templatePath, configPath, cv)
- go startEnvoy(configPath)
- log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
+ ec.updateEnvoyConfig(&ecv)
+ go ec.startEnvoy()
}
-func runMonitorEnvoy(kv * consulapi.KV, values * map[string]interface{}, cv ConfigVars,
- templatePath string, configPath string) {
+func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (kvp * consulapi.KVPair, meta * consulapi.QueryMeta, err error) {
+
+ kv := ec.consul.KV()
+ // Get the initial values of the assignment key which contains individual
+ // voltha core IP addresses. This may be empty until voltha populates it
+ // so it must be checked
+ kvp, meta, err = kv.Get(ec.assignmentKey, qo)
+ for i := 0; i < ec.retries; i++ {
+ if err != nil {
+ fmt.Println(err)
+ log.Printf("Unable to read assignment consul key, retry %d", i+1)
+ time.Sleep(time.Duration(ec.waitTime) * time.Second)
+ kvp, meta, err = kv.Get(ec.assignmentKey, qo)
+ } else if kvp != nil && len(string(kvp.Value)) > 10 {
+ // A valid read, return
+ break
+ } else {
+ log.Printf("Voltha assignment key invalid, retry %d", i+1)
+ time.Sleep(time.Duration(ec.waitTime) * time.Second)
+ kvp, meta, err = kv.Get(ec.assignmentKey, qo)
+ }
+ if i == ec.retries {
+ log.Printf("Failed to read the assignment key after %d retries, aborting", ec.retries)
+ panic(err)
+ }
+ }
+ return
+}
+
+func (ec * EnvoyControl) runMonitorEnvoy() {
var err error
var qo consulapi.QueryOptions
-
// Get the initial values of the assignment key which contains individual
// voltha core IP addresses. This may be empty until voltha populates it
// so it must be checked
- kvp, meta, err := kv.Get(assignmentKey, nil)
- for i := 0; i < 10; i++ {
- if err != nil {
- fmt.Println(err)
- log.Printf("Unable to read assignment consul key, retry %d\n", i+1)
- time.Sleep(time.Duration(2) * time.Second)
- kvp, meta, err = kv.Get(assignmentKey, nil)
- } else if kvp != nil && len(string(kvp.Value)) > 10 {
- log.Printf("Starting Envoy")
- runEnvoy(meta, kvp, values, cv, templatePath, configPath)
- break
- } else {
- log.Printf("Voltha assignment key invalid, retry %d\n", i+1)
- time.Sleep(time.Duration(2) * time.Second)
- kvp, meta, err = kv.Get(assignmentKey, nil)
- }
- }
+ kvp, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
+ log.Printf("Starting Envoy")
+ ec.runEnvoy(kvp)
for {
qo.WaitIndex = meta.LastIndex
+ qo.RequireConsistent = true
for {
if qo.WaitIndex != meta.LastIndex {
break
}
- kvp, meta, err = kv.Get(assignmentKey, &qo)
+ kvp, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
if err != nil {
- log.Fatal("Unable to read assignment consul key")
+ log.Printf("Unable to read assignment consul key")
panic(err)
} else {
log.Println(string(kvp.Value))
- log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
+ log.Printf("meta.LastIndex = %d", meta.LastIndex)
}
}
// Fell through, the index has changed thus the key has changed
- runEnvoy(meta, kvp, values, cv, templatePath, configPath)
+ log.Printf("Starting Envoy")
+ ec.runEnvoy(kvp)
+ log.Printf("meta.LastIndex = %d", meta.LastIndex)
}
}
-func main() {
+func (ec * EnvoyControl) ParseCommandArguments() {
+ flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey,
+ "The key for the voltha assignment value in consul")
- var err error
- var addrs []string
- var cv ConfigVars // Template variables.
- var consul * consulapi.Client
- var values map[string]interface{} // Key values map
+ flag.StringVar(&( ec.envoyConfigTemplate),"envoy-cfg-template", ec.envoyConfigTemplate,
+ "The path to envoy's configuration template")
- values = make(map[string]interface{})
+ flag.StringVar(&(ec.envoyConfig), "envoy-config", ec.envoyConfig,
+ "The path to envoy's configuration file" )
+ flag.StringVar(&(ec.vcoreSvcName), "vcore-svc-name", ec.vcoreSvcName,
+ "The service name of the voltha core service")
+
+ flag.StringVar(&(ec.consulSvcName),"consul-svc-nme", ec.consulSvcName,
+ "The service name of the consul service")
+
+ flag.StringVar(&(ec.vcorePort), "vcore-port", ec.vcorePort,
+ "The port where the vcore's GRPC service can be found")
+
+ flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort,
+ "The port where the consul service api can be found")
+
+ flag.IntVar(&(ec.retries), "retries", ec.retries,
+ "The number of times to retry name lookups and connect requests before failing")
+
+ flag.IntVar(&(ec.waitTime), "wait-time", ec.waitTime,
+ "The number of seconds to wait between retries")
+
+ flag.Parse()
+}
+
+func (ec * EnvoyControl) Initialize() (err error) {
// Resolve consul's virtual ip address
- addrs, err = getServiceAddr("consul", 10, 2)
- if err == nil {
- values["consulvip"] = addrs[0]
- log.Printf("Consul's address = %s\n",addrs[0])
- } else {
+ if err = ec.resolveServiceAddress(ec.consulSvcName); err != nil {
log.Fatal("Can't proceed without consul's vIP address")
panic(err)
}
// Resolve voltha's virtual ip address
- addrs,err = getServiceAddr("vcore", 10, 2)
- if err == nil {
- log.Printf("Voltha address = %s\n",addrs[0])
- // Config var for the template
- cv.VolthaVip = addrs[0] + ":" + volthaPort
- values["volthavip"] = addrs[0]
- } else {
+ if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil {
log.Fatal("Can't proceed without voltha's vIP address")
panic(err)
}
- // Fire up a consul client and get the kv store
- config := consulapi.DefaultConfig()
- config.Address = values["consulvip"].(string) + ":" + consulPort
- consul, err = consulapi.NewClient(config)
- if err != nil {
+ if err = ec.consulConnect(ec.consulSvcName, ec.consulPort); err != nil {
log.Fatal("error creating consul client aborting")
panic(err)
}
- kv := consul.KV()
+
+ return
+}
+
+func main() {
+
+ var err error
+ var ec * EnvoyControl
+
+ ec = NewEnvoyControl()
+ ec.ParseCommandArguments()
+ if err = ec.Initialize(); err != nil {
+ log.Fatal("Envoy control initialization failed, aboring")
+ panic(err)
+ }
+
// Start envoy and monitor changes to the KV store to reload
// consul's config. This never returns unless somethign crashes.
- runMonitorEnvoy(kv, &values, cv, "/envoy/voltha-grpc-proxy.template.json", "/envoy/voltha-grpc-proxy.json")
+ ec.runMonitorEnvoy()
+ log.Fatal("Monitor returned, this shouldn't happen")
}