This rather large update adds the following
- A golang build container used for building golang executables and/or
containers.
- envoyd, a daemon process that creates and updates the envoy config
file based on consul's KV store and forces envoy to reload the config
as it changes.
- Dockerfile(s) and compose files that integrate envoy into the NBI call
chain to load-balance device-to-core assignment.
- Several developer tools that help build and replace specific
containers in a running cluster. This allows the build process to be
separated from the run-time as it will be in production.
- NOTES: A command line needs to be added to envoyd because now the
values are declared at the start of the file. This will be submitted
in a subsequent commit along with a change toward a more object
oriented implementation.
Addressed reviewer comments.
Addressed even more reviewr comments.
Change-Id: Ia2ec825d48d475398e501f396452fb0306673432
diff --git a/envoy/front-proxy/start_service.sh b/envoy/front-proxy/start_service.sh
deleted file mode 100644
index cf98f2c..0000000
--- a/envoy/front-proxy/start_service.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-python /code/service.py &
-envoy -c /etc/service-envoy.json
diff --git a/envoy/front-proxy/voltha-grpc-proxy.template.json b/envoy/front-proxy/voltha-grpc-proxy.template.json
new file mode 100644
index 0000000..8a83155
--- /dev/null
+++ b/envoy/front-proxy/voltha-grpc-proxy.template.json
@@ -0,0 +1,93 @@
+{
+ "listeners": [
+ {
+ "address": "tcp://0.0.0.0:50555",
+ "filters": [
+ {
+ "type": "read",
+ "name": "http_connection_manager",
+ "config": {
+ "codec_type": "http2",
+ "stat_prefix": "ingress_http2",
+ "access_log": [
+ {
+ "path": "/envoy/voltha_access_log.log"
+ }
+ ],
+ "route_config": {
+ "virtual_hosts": [
+ {
+ "name": "backend",
+ "domains": ["*"],
+ "routes": [
+ {
+ "timeout_ms": 0,
+ "prefix": "/voltha.VolthaGlobalService/CreateDevice",
+ "cluster": "voltha-grpc-RR"
+ },
+ {
+ "timeout_ms": 0,
+ "prefix": "/voltha.VolthaGlobalService",
+ "cluster": "voltha-grpc"
+ },
+ {
+ "timeout_ms": 0,
+ "prefix": "/voltha.",
+ "cluster": "voltha-grpc"
+ }
+ ]
+ }
+ ]
+ },
+ "filters": [
+ {
+ "type": "decoder",
+ "name": "router",
+ "config": {}
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ],
+ "admin": {
+ "access_log_path": "/envoy/access.log",
+ "address": "tcp://0.0.0.0:8001"
+ },
+ "cluster_manager": {
+ "clusters": [
+ {
+ "name": "voltha-grpc",
+ "connect_timeout_ms": 250,
+ "type": "static",
+ "lb_type": "round_robin",
+ "features": "http2",
+ "hosts": [
+ {
+ "url": "tcp://{{- .VolthaVip }}"
+ }
+ ]
+ },
+ {
+ "name": "voltha-grpc-RR",
+ "connect_timeout_ms": 250,
+ "type": "static",
+ "lb_type": "round_robin",
+ "features": "http2",
+ "hosts": [
+ {{block "addrlist" .VolthaRR}}
+ {{- range .}}
+ {{- if isFirst}}
+ {{- printf "{\"url\": \"tcp://%s\"}" . }}
+ {{- else }}
+ {{- printf ",{\"url\": \"tcp://%s\"}" . }}
+ {{- end }}
+ {{- end}}
+ {{- end}}
+ ]
+ }
+ ]
+ }
+}
+
diff --git a/envoy/go/envoyd/Dockerfile b/envoy/go/envoyd/Dockerfile
new file mode 100644
index 0000000..75ef7bc
--- /dev/null
+++ b/envoy/go/envoyd/Dockerfile
@@ -0,0 +1,3 @@
+FROM scratch
+COPY vlocate /
+ENTRYPOINT ["/vlocate"]
diff --git a/envoy/go/envoyd/Makefile b/envoy/go/envoyd/Makefile
new file mode 100644
index 0000000..f164409
--- /dev/null
+++ b/envoy/go/envoyd/Makefile
@@ -0,0 +1,3 @@
+
+all:
+ ./build_binary.sh
diff --git a/envoy/go/envoyd/build_binary.sh b/envoy/go/envoyd/build_binary.sh
new file mode 100755
index 0000000..333a1a4
--- /dev/null
+++ b/envoy/go/envoyd/build_binary.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+rm -fr buildreport
+rm -f envoyd
+docker run -v $(pwd):/src go-builder
+#/build.sh
+uid=`id -u`
+gid=`id -g`
+sudo chown -R ${uid}.${gid} buildreport
+sudo chown ${uid}.${gid} envoyd
diff --git a/envoy/go/envoyd/envoyd.go b/envoy/go/envoyd/envoyd.go
new file mode 100644
index 0000000..6766539
--- /dev/null
+++ b/envoy/go/envoyd/envoyd.go
@@ -0,0 +1,335 @@
+package main // import "ciena.com/envoyd"
+
+import (
+ "os"
+ "os/exec"
+ "fmt"
+ "log"
+ "strconv"
+ "time"
+ "net"
+ "io/ioutil"
+ "text/template"
+ "encoding/json"
+ consulapi "github.com/hashicorp/consul/api"
+)
+
+// DATA STRUCTURES
+
+type ConfigVars struct {
+ VolthaVip string
+ VolthaRR []string
+}
+
+type VolthaClusterEntry struct {
+ Prefix string
+ Id string
+ Host string
+}
+
+//Client provides an interface for getting data out of Consul
+type Client interface {
+// Get a Service from consulapi
+ Service(string, string) ([]string, error)
+// Register a service with local agent
+ Register(string, int) error
+// Deregister a service with local agent
+ DeRegister(string) error
+}
+
+type client struct {
+ consulapi *consulapi.Client
+}
+
+// This struct is not used yet
+// TODO: Update the daemon to use this structure to for a
+// more object oriented implementation
+type EnvoyControl struct {
+ retrys int
+ waitTime int
+ cv ConfigVars
+ vc []VolthaClusterEntry
+ meta * consulapi.QueryMeta
+ kvp * consulapi.KVPair
+ ipAddrs map[string][]string
+}
+
+// CONSTANTS
+var assignmentKey string = "service/voltha/data/core/assignment"
+var vcoreHostIpName string = "host"
+var vcoreIdName string = "id"
+var restartEpoch int = 0
+var volthaPort string = "50556" // This will be passed inas an option.
+var consulPort string = "8500" // This will be passed in as an option.
+
+//NewConsul returns a Client interface for given consulapi address
+func NewConsulClient(addr string) (*client, error) {
+ config := consulapi.DefaultConfig()
+ config.Address = addr
+ c, err := consulapi.NewClient(config)
+ if err != nil {
+ return nil, err
+ }
+ return &client{consulapi: c}, nil
+}
+
+// Register a service with consulapi local agent
+func (c *client) Register(name string, port int) error {
+ reg := &consulapi.AgentServiceRegistration{
+ ID: name,
+ Name: name,
+ Port: port,
+ }
+ return c.consulapi.Agent().ServiceRegister(reg)
+}
+
+// DeRegister a service with consulapi local agent
+func (c *client) DeRegister(id string) error {
+ return c.consulapi.Agent().ServiceDeregister(id)
+}
+
+// Service return a service
+func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
+ passingOnly := true
+ addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
+ if len(addrs) == 0 && err == nil {
+ return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
+ }
+ if err != nil {
+ return nil, nil, err
+ }
+ return addrs, meta, nil
+}
+
+// Starts envoy with the current restartEpoch
+func startEnvoy(cfg_file string) {
+ cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(restartEpoch),
+ "--config-path", cfg_file)
+
+ curEpoch := restartEpoch
+ restartEpoch += 1
+ if err := cmd.Start(); err != nil {
+ log.Fatal(err)
+ panic(err)
+ }
+ log.Printf("Waiting on envoy %d to exit", curEpoch)
+ if err := cmd.Wait(); err != nil {
+ log.Fatal(err, "Unexpected exit code")
+ }
+ log.Printf("Envoy %d exited", curEpoch)
+
+}
+
+// This function will use the provided templete file to generate
+// the targetfile substituting
+func updateEnvoyConfig(templateFile string, targetFile string, cv ConfigVars) {
+ var firstRun bool = true
+ f := func() (bool) {
+ var rtrn bool = firstRun
+ firstRun = false
+ return rtrn
+ }
+ var funcs = template.FuncMap{"isFirst": f}
+ // Slurp up the template file.
+ tplt, err := ioutil.ReadFile(templateFile)
+ if err != nil {
+ panic("ERROR reading the template file, aborting")
+ }
+ //fmt.Println(string(tplt))
+ configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
+ if err != nil {
+ panic(err)
+ }
+ outFile,err := os.Create(targetFile)
+ if err != nil {
+ panic(err)
+ }
+ if err := configTemplate.Execute(outFile, cv); err != nil {
+ panic(err)
+ }
+ //cfgFile, err := ioutil.ReadFile(targetFile)
+ //if err != nil {
+ // panic("ERROR reading the config file, aborting")
+ //}
+ //fmt.Println(string(cfgFile))
+}
+
+func getServiceAddr(serviceName string, retrys int, waitTime int) (addrs []string, err error) {
+ for i := 0; i < retrys; i++ {
+ addrs,err = net.LookupHost(serviceName)
+ if err != nil {
+ log.Printf("%s name resolution failed %d time(s) retrying...\n", serviceName, i+1)
+ } else {
+ //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
+ break
+ }
+ time.Sleep(time.Duration(waitTime) * time.Second)
+ }
+ if err != nil {
+ log.Printf("%s name resolution failed %d times gving up\n", serviceName, retrys)
+ }
+ return
+}
+
+func parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
+ var vc VolthaClusterEntry
+ var f interface{}
+
+ log.Printf("Parsing %s\n", string(jsonString))
+ err = json.Unmarshal(jsonString, &f)
+ if err != nil {
+ log.Fatal("Unable to parse json record %s", jsonString)
+ panic(err)
+ } else {
+ m := f.(map[string]interface{})
+ for k, v := range m {
+ vc.Prefix = k
+ //log.Printf("Processing key %s\n", k)
+ switch vv := v.(type) {
+ case map[string]interface{}:
+ for i, u := range vv {
+ //log.Printf("Processing key %s\n", i)
+ switch uu := u.(type) {
+ case string:
+ if i == vcoreHostIpName {
+ vc.Host = uu
+ } else if i == vcoreIdName {
+ vc.Id = uu
+ } else {
+ log.Printf("WARNING: unexpected descriptor,%s\n", i)
+ }
+ default:
+ log.Printf("WARNING: unexpected type, ")
+ log.Println(i, u)
+ }
+ }
+ default:
+ log.Printf("WARNING: unexpected type, ")
+ log.Println(k, v)
+ }
+ vCluster = append(vCluster, vc)
+ }
+ }
+ log.Println("Parsing complete")
+ return
+}
+
+func runEnvoy(meta * consulapi.QueryMeta, kvp * consulapi.KVPair, values * map[string]interface{}, cv ConfigVars,
+ templatePath string, configPath string) {
+ var err error
+ var vCluster []VolthaClusterEntry
+
+ // Extract all values from the KV record
+ vCluster, err = parseAssignment([]byte(kvp.Value))
+ if err == nil {
+ (*values)["volthaRR"] = []string{}
+ for i := range vCluster {
+ //log.Printf("Processing %s\n", vCluster[i].Host)
+ (*values)["volthaRR"] = append((*values)["volthaRR"].([]string), vCluster[i].Host)
+ cv.VolthaRR = append(cv.VolthaRR, vCluster[i].Host + ":" + volthaPort)
+ }
+ } else {
+ log.Fatal("Couldn't parse the KV record %s\n", string(kvp.Value))
+ panic(err)
+ }
+
+ // Now that we have the data loaded, update the envoy config and start envoy
+ updateEnvoyConfig(templatePath, configPath, cv)
+ go startEnvoy(configPath)
+ log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
+}
+
+func runMonitorEnvoy(kv * consulapi.KV, values * map[string]interface{}, cv ConfigVars,
+ templatePath string, configPath string) {
+ var err error
+ var qo consulapi.QueryOptions
+
+
+ // Get the initial values of the assignment key which contains individual
+ // voltha core IP addresses. This may be empty until voltha populates it
+ // so it must be checked
+ kvp, meta, err := kv.Get(assignmentKey, nil)
+ for i := 0; i < 10; i++ {
+ if err != nil {
+ fmt.Println(err)
+ log.Printf("Unable to read assignment consul key, retry %d\n", i+1)
+ time.Sleep(time.Duration(2) * time.Second)
+ kvp, meta, err = kv.Get(assignmentKey, nil)
+ } else if kvp != nil && len(string(kvp.Value)) > 10 {
+ log.Printf("Starting Envoy")
+ runEnvoy(meta, kvp, values, cv, templatePath, configPath)
+ break
+ } else {
+ log.Printf("Voltha assignment key invalid, retry %d\n", i+1)
+ time.Sleep(time.Duration(2) * time.Second)
+ kvp, meta, err = kv.Get(assignmentKey, nil)
+ }
+ }
+
+ for {
+ qo.WaitIndex = meta.LastIndex
+ for {
+ if qo.WaitIndex != meta.LastIndex {
+ break
+ }
+ kvp, meta, err = kv.Get(assignmentKey, &qo)
+ if err != nil {
+ log.Fatal("Unable to read assignment consul key")
+ panic(err)
+ } else {
+ log.Println(string(kvp.Value))
+ log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
+ }
+ }
+ // Fell through, the index has changed thus the key has changed
+
+ runEnvoy(meta, kvp, values, cv, templatePath, configPath)
+ }
+}
+
+func main() {
+
+ var err error
+ var addrs []string
+ var cv ConfigVars // Template variables.
+ var consul * consulapi.Client
+ var values map[string]interface{} // Key values map
+
+ values = make(map[string]interface{})
+
+ // Resolve consul's virtual ip address
+ addrs, err = getServiceAddr("consul", 10, 2)
+ if err == nil {
+ values["consulvip"] = addrs[0]
+ log.Printf("Consul's address = %s\n",addrs[0])
+ } else {
+ log.Fatal("Can't proceed without consul's vIP address")
+ panic(err)
+ }
+
+ // Resolve voltha's virtual ip address
+ addrs,err = getServiceAddr("vcore", 10, 2)
+ if err == nil {
+ log.Printf("Voltha address = %s\n",addrs[0])
+ // Config var for the template
+ cv.VolthaVip = addrs[0] + ":" + volthaPort
+ values["volthavip"] = addrs[0]
+ } else {
+ log.Fatal("Can't proceed without voltha's vIP address")
+ panic(err)
+ }
+
+ // Fire up a consul client and get the kv store
+ config := consulapi.DefaultConfig()
+ config.Address = values["consulvip"].(string) + ":" + consulPort
+ consul, err = consulapi.NewClient(config)
+ if err != nil {
+ log.Fatal("error creating consul client aborting")
+ panic(err)
+ }
+ kv := consul.KV()
+
+ // Start envoy and monitor changes to the KV store to reload
+ // consul's config. This never returns unless somethign crashes.
+ runMonitorEnvoy(kv, &values, cv, "/envoy/voltha-grpc-proxy.template.json", "/envoy/voltha-grpc-proxy.json")
+}
diff --git a/envoy/go/golang-builder b/envoy/go/golang-builder
new file mode 160000
index 0000000..eb418a5
--- /dev/null
+++ b/envoy/go/golang-builder
@@ -0,0 +1 @@
+Subproject commit eb418a5ec83045d17760a41891af636becde72a6
diff --git a/envoy/start_envoy.sh b/envoy/start_envoy.sh
deleted file mode 100755
index 216e59f..0000000
--- a/envoy/start_envoy.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-RESTART_EPOCH=0
-
-
-echo "Staring envoy re-starter"
-
-
-function fork_envoy()
-{
- echo "Forking envoy"
- /usr/local/bin/envoy -l debug -c envoy/front-proxy/voltha-grpc-proxy.json --restart-epoch $RESTART_EPOCH &
- CUR_PID=$!
- RESTART_EPOCH=`expr $RESTART_EPOCH + 1`
- wait
-}
-
-function end_envoy()
-{
- echo "Killing envoy"
- kill -KILL $CUR_PID
-}
-
-trap fork_envoy SIGHUP
-trap end_envoy SIGTERM
-
-fork_envoy
-
-