blob: 6766539ffac59824dde969e8ec54e9e9f9ed4ed3 [file] [log] [blame]
package main // import "ciena.com/envoyd"
import (
"os"
"os/exec"
"fmt"
"log"
"strconv"
"time"
"net"
"io/ioutil"
"text/template"
"encoding/json"
consulapi "github.com/hashicorp/consul/api"
)
// DATA STRUCTURES
type ConfigVars struct {
VolthaVip string
VolthaRR []string
}
type VolthaClusterEntry struct {
Prefix string
Id string
Host string
}
//Client provides an interface for getting data out of Consul
type Client interface {
// Get a Service from consulapi
Service(string, string) ([]string, error)
// Register a service with local agent
Register(string, int) error
// Deregister a service with local agent
DeRegister(string) error
}
type client struct {
consulapi *consulapi.Client
}
// This struct is not used yet
// TODO: Update the daemon to use this structure to for a
// more object oriented implementation
type EnvoyControl struct {
retrys int
waitTime int
cv ConfigVars
vc []VolthaClusterEntry
meta * consulapi.QueryMeta
kvp * consulapi.KVPair
ipAddrs map[string][]string
}
// CONSTANTS
var assignmentKey string = "service/voltha/data/core/assignment"
var vcoreHostIpName string = "host"
var vcoreIdName string = "id"
var restartEpoch int = 0
var volthaPort string = "50556" // This will be passed inas an option.
var consulPort string = "8500" // This will be passed in as an option.
//NewConsul returns a Client interface for given consulapi address
func NewConsulClient(addr string) (*client, error) {
config := consulapi.DefaultConfig()
config.Address = addr
c, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
return &client{consulapi: c}, nil
}
// Register a service with consulapi local agent
func (c *client) Register(name string, port int) error {
reg := &consulapi.AgentServiceRegistration{
ID: name,
Name: name,
Port: port,
}
return c.consulapi.Agent().ServiceRegister(reg)
}
// DeRegister a service with consulapi local agent
func (c *client) DeRegister(id string) error {
return c.consulapi.Agent().ServiceDeregister(id)
}
// Service return a service
func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
passingOnly := true
addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
if len(addrs) == 0 && err == nil {
return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
}
if err != nil {
return nil, nil, err
}
return addrs, meta, nil
}
// Starts envoy with the current restartEpoch
func startEnvoy(cfg_file string) {
cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(restartEpoch),
"--config-path", cfg_file)
curEpoch := restartEpoch
restartEpoch += 1
if err := cmd.Start(); err != nil {
log.Fatal(err)
panic(err)
}
log.Printf("Waiting on envoy %d to exit", curEpoch)
if err := cmd.Wait(); err != nil {
log.Fatal(err, "Unexpected exit code")
}
log.Printf("Envoy %d exited", curEpoch)
}
// This function will use the provided templete file to generate
// the targetfile substituting
func updateEnvoyConfig(templateFile string, targetFile string, cv ConfigVars) {
var firstRun bool = true
f := func() (bool) {
var rtrn bool = firstRun
firstRun = false
return rtrn
}
var funcs = template.FuncMap{"isFirst": f}
// Slurp up the template file.
tplt, err := ioutil.ReadFile(templateFile)
if err != nil {
panic("ERROR reading the template file, aborting")
}
//fmt.Println(string(tplt))
configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
if err != nil {
panic(err)
}
outFile,err := os.Create(targetFile)
if err != nil {
panic(err)
}
if err := configTemplate.Execute(outFile, cv); err != nil {
panic(err)
}
//cfgFile, err := ioutil.ReadFile(targetFile)
//if err != nil {
// panic("ERROR reading the config file, aborting")
//}
//fmt.Println(string(cfgFile))
}
func getServiceAddr(serviceName string, retrys int, waitTime int) (addrs []string, err error) {
for i := 0; i < retrys; i++ {
addrs,err = net.LookupHost(serviceName)
if err != nil {
log.Printf("%s name resolution failed %d time(s) retrying...\n", serviceName, i+1)
} else {
//fmt.Printf("%s address = %s\n",serviceName, addrs[0])
break
}
time.Sleep(time.Duration(waitTime) * time.Second)
}
if err != nil {
log.Printf("%s name resolution failed %d times gving up\n", serviceName, retrys)
}
return
}
func parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
var vc VolthaClusterEntry
var f interface{}
log.Printf("Parsing %s\n", string(jsonString))
err = json.Unmarshal(jsonString, &f)
if err != nil {
log.Fatal("Unable to parse json record %s", jsonString)
panic(err)
} else {
m := f.(map[string]interface{})
for k, v := range m {
vc.Prefix = k
//log.Printf("Processing key %s\n", k)
switch vv := v.(type) {
case map[string]interface{}:
for i, u := range vv {
//log.Printf("Processing key %s\n", i)
switch uu := u.(type) {
case string:
if i == vcoreHostIpName {
vc.Host = uu
} else if i == vcoreIdName {
vc.Id = uu
} else {
log.Printf("WARNING: unexpected descriptor,%s\n", i)
}
default:
log.Printf("WARNING: unexpected type, ")
log.Println(i, u)
}
}
default:
log.Printf("WARNING: unexpected type, ")
log.Println(k, v)
}
vCluster = append(vCluster, vc)
}
}
log.Println("Parsing complete")
return
}
func runEnvoy(meta * consulapi.QueryMeta, kvp * consulapi.KVPair, values * map[string]interface{}, cv ConfigVars,
templatePath string, configPath string) {
var err error
var vCluster []VolthaClusterEntry
// Extract all values from the KV record
vCluster, err = parseAssignment([]byte(kvp.Value))
if err == nil {
(*values)["volthaRR"] = []string{}
for i := range vCluster {
//log.Printf("Processing %s\n", vCluster[i].Host)
(*values)["volthaRR"] = append((*values)["volthaRR"].([]string), vCluster[i].Host)
cv.VolthaRR = append(cv.VolthaRR, vCluster[i].Host + ":" + volthaPort)
}
} else {
log.Fatal("Couldn't parse the KV record %s\n", string(kvp.Value))
panic(err)
}
// Now that we have the data loaded, update the envoy config and start envoy
updateEnvoyConfig(templatePath, configPath, cv)
go startEnvoy(configPath)
log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
}
func runMonitorEnvoy(kv * consulapi.KV, values * map[string]interface{}, cv ConfigVars,
templatePath string, configPath string) {
var err error
var qo consulapi.QueryOptions
// Get the initial values of the assignment key which contains individual
// voltha core IP addresses. This may be empty until voltha populates it
// so it must be checked
kvp, meta, err := kv.Get(assignmentKey, nil)
for i := 0; i < 10; i++ {
if err != nil {
fmt.Println(err)
log.Printf("Unable to read assignment consul key, retry %d\n", i+1)
time.Sleep(time.Duration(2) * time.Second)
kvp, meta, err = kv.Get(assignmentKey, nil)
} else if kvp != nil && len(string(kvp.Value)) > 10 {
log.Printf("Starting Envoy")
runEnvoy(meta, kvp, values, cv, templatePath, configPath)
break
} else {
log.Printf("Voltha assignment key invalid, retry %d\n", i+1)
time.Sleep(time.Duration(2) * time.Second)
kvp, meta, err = kv.Get(assignmentKey, nil)
}
}
for {
qo.WaitIndex = meta.LastIndex
for {
if qo.WaitIndex != meta.LastIndex {
break
}
kvp, meta, err = kv.Get(assignmentKey, &qo)
if err != nil {
log.Fatal("Unable to read assignment consul key")
panic(err)
} else {
log.Println(string(kvp.Value))
log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
}
}
// Fell through, the index has changed thus the key has changed
runEnvoy(meta, kvp, values, cv, templatePath, configPath)
}
}
func main() {
var err error
var addrs []string
var cv ConfigVars // Template variables.
var consul * consulapi.Client
var values map[string]interface{} // Key values map
values = make(map[string]interface{})
// Resolve consul's virtual ip address
addrs, err = getServiceAddr("consul", 10, 2)
if err == nil {
values["consulvip"] = addrs[0]
log.Printf("Consul's address = %s\n",addrs[0])
} else {
log.Fatal("Can't proceed without consul's vIP address")
panic(err)
}
// Resolve voltha's virtual ip address
addrs,err = getServiceAddr("vcore", 10, 2)
if err == nil {
log.Printf("Voltha address = %s\n",addrs[0])
// Config var for the template
cv.VolthaVip = addrs[0] + ":" + volthaPort
values["volthavip"] = addrs[0]
} else {
log.Fatal("Can't proceed without voltha's vIP address")
panic(err)
}
// Fire up a consul client and get the kv store
config := consulapi.DefaultConfig()
config.Address = values["consulvip"].(string) + ":" + consulPort
consul, err = consulapi.NewClient(config)
if err != nil {
log.Fatal("error creating consul client aborting")
panic(err)
}
kv := consul.KV()
// Start envoy and monitor changes to the KV store to reload
// consul's config. This never returns unless somethign crashes.
runMonitorEnvoy(kv, &values, cv, "/envoy/voltha-grpc-proxy.template.json", "/envoy/voltha-grpc-proxy.json")
}