package main // import ""
import (
consulapi ""
//Client provides an interface for getting data out of Consul
type Client interface {
// Get a Service from consulapi
Service(string, string) ([]string, error)
// Register a service with local agent
Register(string, int) error
// Deregister a service with local agent
DeRegister(string) error
type client struct {
consulapi *consulapi.Client
type EnvoyConfigVars struct {
VolthaVip string
VolthaRR []string
type VolthaClusterEntry struct {
Prefix string
Id string
Host string
// This struct is not used yet
// TODO: Update the daemon to use this structure to for a
// more object oriented implementation
type EnvoyControl struct {
// Command line parameters
assignmentKey string
envoyConfigTemplate string
envoyConfig string
vcoreSvcName string
consulSvcName string
vcorePort string
consulPort string
retries int
waitTime int
// Runtime variables
consul * consulapi.Client
vcoreHostIpName string
vcoreIdName string
vc []VolthaClusterEntry
ipAddrs map[string][]string
restartEpoch int
reLock sync.Mutex // Exclusive access to the restartEpoch
func NewEnvoyControl() (ec * EnvoyControl) {
var envCtrl EnvoyControl = EnvoyControl { // Default values
// Command line parameters
assignmentKey: "service/voltha/data/core/assignment",
envoyConfigTemplate: "/envoy/voltha-grpc-proxy.template.json",
envoyConfig: "/envoy/voltha-grpc-proxy.json",
//envoyLogFile: "/envoy/voltha_access_log.log",
vcoreSvcName: "vcore",
consulSvcName: "consul",
vcorePort: "50556",
consulPort: "8500",
retries: 10,
waitTime: 2,
// Runtime variables
vcoreHostIpName: "host",
vcoreIdName: "id",
ipAddrs: make(map[string][]string),
restartEpoch: 0,
ec = &envCtrl
func (ec * EnvoyControl) resolveServiceAddress(serviceName string) (err error) {
for i := 0; i < ec.retries; i++ {
ec.ipAddrs[serviceName], err = net.LookupHost(serviceName)
if err != nil {
log.Printf("%s name resolution failed %d time(s) retrying...", serviceName, i+1)
} else {
//fmt.Printf("%s address = %s\n",serviceName, addrs[0])
time.Sleep(time.Duration(ec.waitTime) * time.Second)
if err != nil {
log.Printf("%s name resolution failed %d times gving up", serviceName, ec.retries)
func (ec * EnvoyControl) consulConnect(serviceName string, port string) (err error) {
// Fire up a consul client and get the kv store
cConfig := consulapi.DefaultNonPooledConfig()
cConfig.Address = ec.ipAddrs[serviceName][0] + ":" + port
ec.consul, err = consulapi.NewClient(cConfig)
if err != nil {
log.Fatal("error creating consul client aborting")
//NewConsul returns a Client interface for given consulapi address
func NewConsulClient(addr string) (*client, error) {
config := consulapi.DefaultConfig()
config.Address = addr
c, err := consulapi.NewClient(config)
if err != nil {
return nil, err
return &client{consulapi: c}, nil
// Register a service with consulapi local agent
func (c *client) Register(name string, port int) error {
reg := &consulapi.AgentServiceRegistration{
ID: name,
Name: name,
Port: port,
return c.consulapi.Agent().ServiceRegister(reg)
// DeRegister a service with consulapi local agent
func (c *client) DeRegister(id string) error {
return c.consulapi.Agent().ServiceDeregister(id)
// Service return a service
func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
passingOnly := true
addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
if len(addrs) == 0 && err == nil {
return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
if err != nil {
return nil, nil, err
return addrs, meta, nil
// Starts envoy with the current restartEpoch
func (ec * EnvoyControl) startEnvoy() {
var curEpoch int
var err error
var count int
ec.reLock.Lock() // Make sure we've got exclusive access to the variable
cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(ec.restartEpoch),
"--config-path", ec.envoyConfig, "--parent-shutdown-time-s", "10")
curEpoch = ec.restartEpoch
ec.restartEpoch += 1
ec.reLock.Unlock() // Done, release the lock.
stderr, err := cmd.StderrPipe()
if err != nil {
log.Printf("Couldn't attach to stderr running envoy command")
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Couldn't attach to stdout running envoy command")
so := bufio.NewReader(stdout)
se := bufio.NewReader(stderr)
if err = cmd.Start(); err != nil {
log.Fatal("Error starting envoy")
log.Printf("Envoy(%d) started", curEpoch)
soEof := false
seEof := false
data := make([]byte, 80)
log.Printf("Log forwarding for envoy(%d) started", curEpoch)
for {
data = make([]byte, 80)
count, err = so.Read(data)
log.Printf("ENVOY_LOG(%d): %s", curEpoch, string(data))
if err == io.EOF {
if seEof == true {
} else {
soEof = true
} else if err != nil {
log.Printf("Attempt to read envoy standard out failed")
} else if count > 0 {
log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
data = make([]byte, 80)
count, err = se.Read(data)
if err == io.EOF {
if soEof == true {
} else {
seEof = true
} else if err != nil {
log.Fatal("Attempt to read envoy standard err failed")
} else if count > 0 {
log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
log.Printf("Waiting on envoy %d to exit", curEpoch)
if err = cmd.Wait(); err != nil {
log.Printf("Envoy %d exited with an unexpected exit code", curEpoch)
log.Printf("Envoy %d exited", curEpoch)
// Check if this was the primary envoy, if so
// something went terribly wrong, panic to force
// forcefully exit.
if ec.restartEpoch == (curEpoch + 1) {
log.Fatal("Last running envoy exited, aborting!")
panic("This should never happen")
// This function will use the provided templete file to generate
// the targetfile substituting
func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) {
var firstRun bool = true
f := func() (bool) {
var rtrn bool = firstRun
firstRun = false
return rtrn
var funcs = template.FuncMap{"isFirst": f}
// Slurp up the template file.
tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate)
if err != nil {
log.Fatal("ERROR reading the template file, aborting")
configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
if err != nil {
log.Fatal("Unexpected error loading the Envoy template, aborting")
outFile,err := os.Create(ec.envoyConfig)
if err != nil {
log.Fatal("Unexpected error opening the Envoy config file for write, aborting")
if err = configTemplate.Execute(outFile, ecv); err != nil {
log.Fatal("Unexpected error executing the Envoy config template, aborting")
//cfgFile, err := ioutil.ReadFile(ec.envoyConfig)
//if err != nil {
// log.Fatal("ERROR reading the config file, aborting")
// panic(err)
func (ec * EnvoyControl) parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
var f interface{}
var vc VolthaClusterEntry
//var isErr bool
log.Printf("Parsing %s", string(jsonString))
//err = json.Unmarshal(jsonString, &f)
err = json.Unmarshal(jsonString, &f)
if err != nil {
log.Fatal("Unable to parse json record %s", jsonString)
} else {
m := f.(map[string]interface{})
for k, v := range m {
isErr := false
vc.Prefix = k
//log.Printf("Processing key %s\n", k)
switch vv := v.(type) {
case map[string]interface{}:
for i, u := range vv {
//log.Printf("Processing key %sn", i)
switch uu := u.(type) {
case string:
if i == ec.vcoreHostIpName {
vc.Host = uu
} else if i == ec.vcoreIdName {
vc.Id = uu
} else {
log.Printf("WARNING: unexpected descriptor,%s", i)
isErr = true
log.Printf("WARNING: unexpected type, ")
log.Println(i, u)
isErr = true
log.Printf("WARNING: unexpected type, ")
log.Println(k, v)
isErr = true
if ! isErr {
vCluster = append(vCluster, vc)
log.Println("Parsing complete")
func (ec * EnvoyControl) prepareEnvoyConfig(kvp * consulapi.KVPair, ecv * EnvoyConfigVars) (err error) {
var vCluster []VolthaClusterEntry
ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort
// Extract all values from the KV record
// In the future, the values should all be compared to what we currently have
vCluster, err = ec.parseAssignment([]byte(kvp.Value))
if err == nil { = vCluster // For future use to determine if there's been a real change
//templateValues["VolthaRR"] = []string{}
ecv.VolthaRR = []string{}
for i := range vCluster {
//log.Printf("Processing %s\n", vCluster[i].Host)
//templateValues["VolthaRR"] = append(templateValues["VolthaRR"].([]string), vCluster[i].Host)
ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
} else {
log.Fatal("Couldn't parse the KV record %s", string(kvp.Value))
func (ec * EnvoyControl) runEnvoy(kvp * consulapi.KVPair) {
var err error
var ecv EnvoyConfigVars
if err = ec.prepareEnvoyConfig(kvp, &ecv); err != nil {
log.Fatal("Error preparing envoy config variables, aborting")
// Now that we have the data loaded, update the envoy config and start envoy
go ec.startEnvoy()
func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (kvp * consulapi.KVPair, meta * consulapi.QueryMeta, err error) {
kv := ec.consul.KV()
// Get the initial values of the assignment key which contains individual
// voltha core IP addresses. This may be empty until voltha populates it
// so it must be checked
kvp, meta, err = kv.Get(ec.assignmentKey, qo)
for i := 0; i < ec.retries; i++ {
if err != nil {
log.Printf("Unable to read assignment consul key, retry %d", i+1)
time.Sleep(time.Duration(ec.waitTime) * time.Second)
kvp, meta, err = kv.Get(ec.assignmentKey, qo)
} else if kvp != nil && len(string(kvp.Value)) > 10 {
// A valid read, return
} else {
log.Printf("Voltha assignment key invalid, retry %d", i+1)
time.Sleep(time.Duration(ec.waitTime) * time.Second)
kvp, meta, err = kv.Get(ec.assignmentKey, qo)
if i == ec.retries {
log.Printf("Failed to read the assignment key after %d retries, aborting", ec.retries)
func (ec * EnvoyControl) runMonitorEnvoy() {
var err error
var qo consulapi.QueryOptions
// Get the initial values of the assignment key which contains individual
// voltha core IP addresses. This may be empty until voltha populates it
// so it must be checked
kvp, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
log.Printf("Starting Envoy")
for {
qo.WaitIndex = meta.LastIndex
qo.RequireConsistent = true
for {
if qo.WaitIndex != meta.LastIndex {
kvp, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
if err != nil {
log.Printf("Unable to read assignment consul key")
} else {
log.Printf("meta.LastIndex = %d", meta.LastIndex)
// Fell through, the index has changed thus the key has changed
log.Printf("Starting Envoy")
log.Printf("meta.LastIndex = %d", meta.LastIndex)
func (ec * EnvoyControl) ParseCommandArguments() {
flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey,
"The key for the voltha assignment value in consul")
flag.StringVar(&( ec.envoyConfigTemplate),"envoy-cfg-template", ec.envoyConfigTemplate,
"The path to envoy's configuration template")
flag.StringVar(&(ec.envoyConfig), "envoy-config", ec.envoyConfig,
"The path to envoy's configuration file" )
flag.StringVar(&(ec.vcoreSvcName), "vcore-svc-name", ec.vcoreSvcName,
"The service name of the voltha core service")
flag.StringVar(&(ec.consulSvcName),"consul-svc-nme", ec.consulSvcName,
"The service name of the consul service")
flag.StringVar(&(ec.vcorePort), "vcore-port", ec.vcorePort,
"The port where the vcore's GRPC service can be found")
flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort,
"The port where the consul service api can be found")
flag.IntVar(&(ec.retries), "retries", ec.retries,
"The number of times to retry name lookups and connect requests before failing")
flag.IntVar(&(ec.waitTime), "wait-time", ec.waitTime,
"The number of seconds to wait between retries")
func (ec * EnvoyControl) Initialize() (err error) {
// Resolve consul's virtual ip address
if err = ec.resolveServiceAddress(ec.consulSvcName); err != nil {
log.Fatal("Can't proceed without consul's vIP address")
// Resolve voltha's virtual ip address
if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil {
log.Fatal("Can't proceed without voltha's vIP address")
if err = ec.consulConnect(ec.consulSvcName, ec.consulPort); err != nil {
log.Fatal("error creating consul client aborting")
func main() {
var err error
var ec * EnvoyControl
ec = NewEnvoyControl()
if err = ec.Initialize(); err != nil {
log.Fatal("Envoy control initialization failed, aboring")
// Start envoy and monitor changes to the KV store to reload
// consul's config. This never returns unless somethign crashes.
log.Fatal("Monitor returned, this shouldn't happen")