| package main // import "ciena.com/envoyd" |
| |
| import ( |
| "os" |
| "os/exec" |
| "fmt" |
| "log" |
| "strconv" |
| "time" |
| "net" |
| "io/ioutil" |
| "io" |
| "text/template" |
| "encoding/json" |
| "sync" |
| "flag" |
| "bufio" |
| consulapi "github.com/hashicorp/consul/api" |
| ) |
| |
| // DATA STRUCTURES |
| |
| //Client provides an interface for getting data out of Consul |
| type Client interface { |
| // Get a Service from consulapi |
| Service(string, string) ([]string, error) |
| // Register a service with local agent |
| Register(string, int) error |
| // Deregister a service with local agent |
| DeRegister(string) error |
| } |
| |
| type client struct { |
| consulapi *consulapi.Client |
| } |
| |
| type EnvoyConfigVars struct { |
| VolthaVip string |
| VolthaRR []string |
| } |
| |
| type VolthaClusterEntry struct { |
| Prefix string |
| Id string |
| Host string |
| } |
| |
| // This struct is not used yet |
| // TODO: Update the daemon to use this structure to for a |
| // more object oriented implementation |
| type EnvoyControl struct { |
| // Command line parameters |
| assignmentKey string |
| envoyConfigTemplate string |
| envoyConfig string |
| vcoreSvcName string |
| consulSvcName string |
| vcorePort string |
| consulPort string |
| retries int |
| waitTime int |
| // Runtime variables |
| consul * consulapi.Client |
| vcoreHostIpName string |
| vcoreIdName string |
| vc []VolthaClusterEntry |
| ipAddrs map[string][]string |
| restartEpoch int |
| reLock sync.Mutex // Exclusive access to the restartEpoch |
| } |
| |
| |
| func NewEnvoyControl() (ec * EnvoyControl) { |
| var envCtrl EnvoyControl = EnvoyControl { // Default values |
| // Command line parameters |
| assignmentKey: "service/voltha/data/core/assignment", |
| envoyConfigTemplate: "/envoy/voltha-grpc-proxy.template.json", |
| envoyConfig: "/envoy/voltha-grpc-proxy.json", |
| //envoyLogFile: "/envoy/voltha_access_log.log", |
| vcoreSvcName: "vcore", |
| consulSvcName: "consul", |
| vcorePort: "50556", |
| consulPort: "8500", |
| retries: 10, |
| waitTime: 2, |
| // Runtime variables |
| vcoreHostIpName: "host", |
| vcoreIdName: "id", |
| ipAddrs: make(map[string][]string), |
| restartEpoch: 0, |
| } |
| ec = &envCtrl |
| return |
| } |
| |
| func (ec * EnvoyControl) resolveServiceAddress(serviceName string) (err error) { |
| for i := 0; i < ec.retries; i++ { |
| ec.ipAddrs[serviceName], err = net.LookupHost(serviceName) |
| if err != nil { |
| log.Printf("%s name resolution failed %d time(s) retrying...", serviceName, i+1) |
| } else { |
| //fmt.Printf("%s address = %s\n",serviceName, addrs[0]) |
| break |
| } |
| time.Sleep(time.Duration(ec.waitTime) * time.Second) |
| } |
| if err != nil { |
| log.Printf("%s name resolution failed %d times gving up", serviceName, ec.retries) |
| } |
| return |
| } |
| |
| func (ec * EnvoyControl) consulConnect(serviceName string, port string) (err error) { |
| // Fire up a consul client and get the kv store |
| cConfig := consulapi.DefaultNonPooledConfig() |
| cConfig.Address = ec.ipAddrs[serviceName][0] + ":" + port |
| ec.consul, err = consulapi.NewClient(cConfig) |
| if err != nil { |
| log.Fatal("error creating consul client aborting") |
| return |
| } |
| return |
| } |
| |
| |
| //NewConsul returns a Client interface for given consulapi address |
| func NewConsulClient(addr string) (*client, error) { |
| config := consulapi.DefaultConfig() |
| config.Address = addr |
| c, err := consulapi.NewClient(config) |
| if err != nil { |
| return nil, err |
| } |
| return &client{consulapi: c}, nil |
| } |
| |
| // Register a service with consulapi local agent |
| func (c *client) Register(name string, port int) error { |
| reg := &consulapi.AgentServiceRegistration{ |
| ID: name, |
| Name: name, |
| Port: port, |
| } |
| return c.consulapi.Agent().ServiceRegister(reg) |
| } |
| |
| // DeRegister a service with consulapi local agent |
| func (c *client) DeRegister(id string) error { |
| return c.consulapi.Agent().ServiceDeregister(id) |
| } |
| |
| // Service return a service |
| func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) { |
| passingOnly := true |
| addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil) |
| if len(addrs) == 0 && err == nil { |
| return nil, nil, fmt.Errorf("service ( %s ) was not found", service) |
| } |
| if err != nil { |
| return nil, nil, err |
| } |
| return addrs, meta, nil |
| } |
| |
| // Starts envoy with the current restartEpoch |
| func (ec * EnvoyControl) startEnvoy() { |
| var curEpoch int |
| var err error |
| var count int |
| |
| ec.reLock.Lock() // Make sure we've got exclusive access to the variable |
| cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(ec.restartEpoch), |
| "--config-path", ec.envoyConfig, "--parent-shutdown-time-s", "10") |
| |
| curEpoch = ec.restartEpoch |
| ec.restartEpoch += 1 |
| ec.reLock.Unlock() // Done, release the lock. |
| |
| stderr, err := cmd.StderrPipe() |
| if err != nil { |
| log.Printf("Couldn't attach to stderr running envoy command") |
| panic(err) |
| } |
| stdout, err := cmd.StdoutPipe() |
| if err != nil { |
| log.Printf("Couldn't attach to stdout running envoy command") |
| panic(err) |
| } |
| so := bufio.NewReader(stdout) |
| se := bufio.NewReader(stderr) |
| |
| if err = cmd.Start(); err != nil { |
| log.Fatal("Error starting envoy") |
| panic(err) |
| } |
| log.Printf("Envoy(%d) started", curEpoch) |
| soEof := false |
| seEof := false |
| |
| data := make([]byte, 80) |
| log.Printf("Log forwarding for envoy(%d) started", curEpoch) |
| for { |
| data = make([]byte, 80) |
| count, err = so.Read(data) |
| log.Printf("ENVOY_LOG(%d): %s", curEpoch, string(data)) |
| if err == io.EOF { |
| if seEof == true { |
| break |
| } else { |
| soEof = true |
| } |
| } else if err != nil { |
| log.Printf("Attempt to read envoy standard out failed") |
| panic(err) |
| } else if count > 0 { |
| log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data)) |
| } |
| data = make([]byte, 80) |
| count, err = se.Read(data) |
| if err == io.EOF { |
| if soEof == true { |
| break |
| } else { |
| seEof = true |
| } |
| } else if err != nil { |
| log.Fatal("Attempt to read envoy standard err failed") |
| panic(err) |
| } else if count > 0 { |
| log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data)) |
| } |
| } |
| log.Printf("Waiting on envoy %d to exit", curEpoch) |
| if err = cmd.Wait(); err != nil { |
| log.Printf("Envoy %d exited with an unexpected exit code", curEpoch) |
| panic(err) |
| } |
| log.Printf("Envoy %d exited", curEpoch) |
| // Check if this was the primary envoy, if so |
| // something went terribly wrong, panic to force |
| // forcefully exit. |
| ec.reLock.Lock() |
| if ec.restartEpoch == (curEpoch + 1) { |
| ec.reLock.Unlock() |
| log.Fatal("Last running envoy exited, aborting!") |
| panic("This should never happen") |
| } |
| ec.reLock.Unlock() |
| |
| } |
| |
| // This function will use the provided templete file to generate |
| // the targetfile substituting |
| func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) { |
| var firstRun bool = true |
| f := func() (bool) { |
| var rtrn bool = firstRun |
| firstRun = false |
| return rtrn |
| } |
| var funcs = template.FuncMap{"isFirst": f} |
| // Slurp up the template file. |
| tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate) |
| if err != nil { |
| log.Fatal("ERROR reading the template file, aborting") |
| panic(err) |
| } |
| //fmt.Println(string(tplt)) |
| configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt)); |
| if err != nil { |
| log.Fatal("Unexpected error loading the Envoy template, aborting") |
| panic(err) |
| } |
| outFile,err := os.Create(ec.envoyConfig) |
| if err != nil { |
| log.Fatal("Unexpected error opening the Envoy config file for write, aborting") |
| panic(err) |
| } |
| if err = configTemplate.Execute(outFile, ecv); err != nil { |
| log.Fatal("Unexpected error executing the Envoy config template, aborting") |
| panic(err) |
| } |
| //cfgFile, err := ioutil.ReadFile(ec.envoyConfig) |
| //if err != nil { |
| // log.Fatal("ERROR reading the config file, aborting") |
| // panic(err) |
| //} |
| //fmt.Println(string(cfgFile)) |
| return |
| } |
| |
| func (ec * EnvoyControl) parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) { |
| var f interface{} |
| var vc VolthaClusterEntry |
| //var isErr bool |
| |
| log.Printf("Parsing %s", string(jsonString)) |
| //err = json.Unmarshal(jsonString, &f) |
| err = json.Unmarshal(jsonString, &f) |
| if err != nil { |
| log.Fatal("Unable to parse json record %s", jsonString) |
| panic(err) |
| } else { |
| m := f.(map[string]interface{}) |
| for k, v := range m { |
| isErr := false |
| vc.Prefix = k |
| //log.Printf("Processing key %s\n", k) |
| switch vv := v.(type) { |
| case map[string]interface{}: |
| for i, u := range vv { |
| //log.Printf("Processing key %sn", i) |
| switch uu := u.(type) { |
| case string: |
| if i == ec.vcoreHostIpName { |
| vc.Host = uu |
| } else if i == ec.vcoreIdName { |
| vc.Id = uu |
| } else { |
| log.Printf("WARNING: unexpected descriptor,%s", i) |
| isErr = true |
| } |
| default: |
| log.Printf("WARNING: unexpected type, ") |
| log.Println(i, u) |
| isErr = true |
| } |
| } |
| default: |
| log.Printf("WARNING: unexpected type, ") |
| log.Println(k, v) |
| isErr = true |
| } |
| if ! isErr { |
| vCluster = append(vCluster, vc) |
| } |
| } |
| } |
| log.Println("Parsing complete") |
| return |
| } |
| |
| func (ec * EnvoyControl) prepareEnvoyConfig(kvp * consulapi.KVPair, ecv * EnvoyConfigVars) (err error) { |
| var vCluster []VolthaClusterEntry |
| |
| ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort |
| |
| // Extract all values from the KV record |
| // In the future, the values should all be compared to what we currently have |
| vCluster, err = ec.parseAssignment([]byte(kvp.Value)) |
| if err == nil { |
| ec.vc = vCluster // For future use to determine if there's been a real change |
| //templateValues["VolthaRR"] = []string{} |
| ecv.VolthaRR = []string{} |
| for i := range vCluster { |
| //log.Printf("Processing %s\n", vCluster[i].Host) |
| //templateValues["VolthaRR"] = append(templateValues["VolthaRR"].([]string), vCluster[i].Host) |
| ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort) |
| } |
| } else { |
| log.Fatal("Couldn't parse the KV record %s", string(kvp.Value)) |
| panic(err) |
| } |
| return |
| } |
| |
| func (ec * EnvoyControl) runEnvoy(kvp * consulapi.KVPair) { |
| var err error |
| var ecv EnvoyConfigVars |
| |
| if err = ec.prepareEnvoyConfig(kvp, &ecv); err != nil { |
| log.Fatal("Error preparing envoy config variables, aborting") |
| panic(err) |
| } |
| |
| // Now that we have the data loaded, update the envoy config and start envoy |
| ec.updateEnvoyConfig(&ecv) |
| go ec.startEnvoy() |
| } |
| |
| func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (kvp * consulapi.KVPair, meta * consulapi.QueryMeta, err error) { |
| |
| kv := ec.consul.KV() |
| // Get the initial values of the assignment key which contains individual |
| // voltha core IP addresses. This may be empty until voltha populates it |
| // so it must be checked |
| kvp, meta, err = kv.Get(ec.assignmentKey, qo) |
| for i := 0; i < ec.retries; i++ { |
| if err != nil { |
| fmt.Println(err) |
| log.Printf("Unable to read assignment consul key, retry %d", i+1) |
| time.Sleep(time.Duration(ec.waitTime) * time.Second) |
| kvp, meta, err = kv.Get(ec.assignmentKey, qo) |
| } else if kvp != nil && len(string(kvp.Value)) > 10 { |
| // A valid read, return |
| break |
| } else { |
| log.Printf("Voltha assignment key invalid, retry %d", i+1) |
| time.Sleep(time.Duration(ec.waitTime) * time.Second) |
| kvp, meta, err = kv.Get(ec.assignmentKey, qo) |
| } |
| if i == ec.retries { |
| log.Printf("Failed to read the assignment key after %d retries, aborting", ec.retries) |
| panic(err) |
| } |
| } |
| return |
| } |
| |
| func (ec * EnvoyControl) runMonitorEnvoy() { |
| var err error |
| var qo consulapi.QueryOptions |
| |
| // Get the initial values of the assignment key which contains individual |
| // voltha core IP addresses. This may be empty until voltha populates it |
| // so it must be checked |
| kvp, meta, err := ec.readConsulKey(ec.assignmentKey, nil) |
| log.Printf("Starting Envoy") |
| ec.runEnvoy(kvp) |
| |
| for { |
| qo.WaitIndex = meta.LastIndex |
| qo.RequireConsistent = true |
| for { |
| if qo.WaitIndex != meta.LastIndex { |
| break |
| } |
| kvp, meta, err = ec.readConsulKey(ec.assignmentKey, &qo) |
| if err != nil { |
| log.Printf("Unable to read assignment consul key") |
| panic(err) |
| } else { |
| log.Println(string(kvp.Value)) |
| log.Printf("meta.LastIndex = %d", meta.LastIndex) |
| } |
| } |
| // Fell through, the index has changed thus the key has changed |
| |
| log.Printf("Starting Envoy") |
| ec.runEnvoy(kvp) |
| log.Printf("meta.LastIndex = %d", meta.LastIndex) |
| } |
| } |
| |
| func (ec * EnvoyControl) ParseCommandArguments() { |
| flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey, |
| "The key for the voltha assignment value in consul") |
| |
| flag.StringVar(&( ec.envoyConfigTemplate),"envoy-cfg-template", ec.envoyConfigTemplate, |
| "The path to envoy's configuration template") |
| |
| flag.StringVar(&(ec.envoyConfig), "envoy-config", ec.envoyConfig, |
| "The path to envoy's configuration file" ) |
| |
| flag.StringVar(&(ec.vcoreSvcName), "vcore-svc-name", ec.vcoreSvcName, |
| "The service name of the voltha core service") |
| |
| flag.StringVar(&(ec.consulSvcName),"consul-svc-nme", ec.consulSvcName, |
| "The service name of the consul service") |
| |
| flag.StringVar(&(ec.vcorePort), "vcore-port", ec.vcorePort, |
| "The port where the vcore's GRPC service can be found") |
| |
| flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort, |
| "The port where the consul service api can be found") |
| |
| flag.IntVar(&(ec.retries), "retries", ec.retries, |
| "The number of times to retry name lookups and connect requests before failing") |
| |
| flag.IntVar(&(ec.waitTime), "wait-time", ec.waitTime, |
| "The number of seconds to wait between retries") |
| |
| flag.Parse() |
| } |
| |
| func (ec * EnvoyControl) Initialize() (err error) { |
| // Resolve consul's virtual ip address |
| if err = ec.resolveServiceAddress(ec.consulSvcName); err != nil { |
| log.Fatal("Can't proceed without consul's vIP address") |
| panic(err) |
| } |
| |
| // Resolve voltha's virtual ip address |
| if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil { |
| log.Fatal("Can't proceed without voltha's vIP address") |
| panic(err) |
| } |
| |
| if err = ec.consulConnect(ec.consulSvcName, ec.consulPort); err != nil { |
| log.Fatal("error creating consul client aborting") |
| panic(err) |
| } |
| |
| return |
| } |
| |
| func main() { |
| |
| var err error |
| var ec * EnvoyControl |
| |
| ec = NewEnvoyControl() |
| ec.ParseCommandArguments() |
| if err = ec.Initialize(); err != nil { |
| log.Fatal("Envoy control initialization failed, aboring") |
| panic(err) |
| } |
| |
| |
| // Start envoy and monitor changes to the KV store to reload |
| // consul's config. This never returns unless somethign crashes. |
| ec.runMonitorEnvoy() |
| log.Fatal("Monitor returned, this shouldn't happen") |
| } |