blob: 6523c90cc64eade56b5b5db43bee21c81d52200e [file] [log] [blame]
Sergio Slobodrianbe829272017-07-17 14:45:45 -04001package main // import "ciena.com/envoyd"
2
3import (
Richard Jankowski15274592017-12-12 15:52:37 -05004 "context"
Sergio Slobodrianbe829272017-07-17 14:45:45 -04005 "os"
6 "os/exec"
7 "fmt"
8 "log"
9 "strconv"
10 "time"
11 "net"
12 "io/ioutil"
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040013 "io"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040014 "text/template"
15 "encoding/json"
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040016 "sync"
17 "flag"
18 "bufio"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040019 consulapi "github.com/hashicorp/consul/api"
Richard Jankowski15274592017-12-12 15:52:37 -050020 etcdapi "github.com/coreos/etcd/clientv3"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040021)
22
23// DATA STRUCTURES
24
Sergio Slobodrianbe829272017-07-17 14:45:45 -040025//Client provides an interface for getting data out of Consul
26type Client interface {
27// Get a Service from consulapi
28 Service(string, string) ([]string, error)
29// Register a service with local agent
30 Register(string, int) error
31// Deregister a service with local agent
32 DeRegister(string) error
33}
34
35type client struct {
36 consulapi *consulapi.Client
37}
38
Richard Jankowski15274592017-12-12 15:52:37 -050039type KvConnectFunc func(string, string) (error)
40type KvMonitorFunc func()
41
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040042type EnvoyConfigVars struct {
43 VolthaVip string
44 VolthaRR []string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040045 vcorePort string
46 HttpPort string
47 HttpsPort string
48 GrpcPort string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040049}
50
51type VolthaClusterEntry struct {
52 Prefix string
53 Id string
54 Host string
55}
56
Sergio Slobodrianbe829272017-07-17 14:45:45 -040057// This struct is not used yet
58// TODO: Update the daemon to use this structure to for a
59// more object oriented implementation
60type EnvoyControl struct {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040061 // Command line parameters
62 assignmentKey string
63 envoyConfigTemplate string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040064 envoyConfigTemplateBoth string
65 envoyConfigTemplateNoHttps string
66 envoyConfigTemplateNoHttp string
67 envoyHttpPort string
68 envoyHttpsPort string
69 httpDisabled bool
70 httpsDisabled bool
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040071 envoyConfig string
72 vcoreSvcName string
73 consulSvcName string
74 vcorePort string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040075 envoyGrpcPort string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040076 consulPort string
Richard Jankowski15274592017-12-12 15:52:37 -050077 kvStore string
78 kvSvcName string
79 kvPort string
80 kvConnect map[string]KvConnectFunc
81 kvMonitor map[string]KvMonitorFunc
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040082 retries int
Sergio Slobodrianbe829272017-07-17 14:45:45 -040083 waitTime int
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040084 // Runtime variables
85 consul * consulapi.Client
Richard Jankowski15274592017-12-12 15:52:37 -050086 etcd * etcdapi.Client
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040087 vcoreHostIpName string
88 vcoreIdName string
Sergio Slobodrianbe829272017-07-17 14:45:45 -040089 vc []VolthaClusterEntry
Sergio Slobodrianbe829272017-07-17 14:45:45 -040090 ipAddrs map[string][]string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040091 restartEpoch int
92 reLock sync.Mutex // Exclusive access to the restartEpoch
Sergio Slobodrianbe829272017-07-17 14:45:45 -040093}
94
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040095
96func NewEnvoyControl() (ec * EnvoyControl) {
97 var envCtrl EnvoyControl = EnvoyControl { // Default values
98 // Command line parameters
99 assignmentKey: "service/voltha/data/core/assignment",
100 envoyConfigTemplate: "/envoy/voltha-grpc-proxy.template.json",
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400101 envoyConfigTemplateBoth: "/envoy/voltha-grpc-proxy.template.json",
102 envoyConfigTemplateNoHttps: "/envoy/voltha-grpc-proxy-no-https.template.json",
103 envoyConfigTemplateNoHttp: "/envoy/voltha-grpc-proxy-no-http.template.json",
104 envoyHttpsPort: "8443",
105 envoyHttpPort: "8882",
106 envoyGrpcPort: "50555",
107 httpDisabled: false,
108 httpsDisabled: false,
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400109 envoyConfig: "/envoy/voltha-grpc-proxy.json",
110 //envoyLogFile: "/envoy/voltha_access_log.log",
111 vcoreSvcName: "vcore",
112 consulSvcName: "consul",
113 vcorePort: "50556",
114 consulPort: "8500",
Richard Jankowski15274592017-12-12 15:52:37 -0500115 kvStore: "consul",
116 kvSvcName: "consul",
117 kvPort: "8500",
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400118 retries: 10,
119 waitTime: 2,
120 // Runtime variables
121 vcoreHostIpName: "host",
122 vcoreIdName: "id",
123 ipAddrs: make(map[string][]string),
124 restartEpoch: 0,
125 }
126 ec = &envCtrl
Richard Jankowski15274592017-12-12 15:52:37 -0500127 ec.kvConnect = make(map[string]KvConnectFunc)
128 ec.kvConnect["consul"] = ec.consulConnect
129 ec.kvConnect["etcd"] = ec.etcdConnect
130 ec.kvMonitor = make(map[string]KvMonitorFunc)
131 ec.kvMonitor["consul"] = ec.monitorConsulKey
132 ec.kvMonitor["etcd"] = ec.monitorEtcdKey
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400133 return
134}
135
136func (ec * EnvoyControl) resolveServiceAddress(serviceName string) (err error) {
137 for i := 0; i < ec.retries; i++ {
138 ec.ipAddrs[serviceName], err = net.LookupHost(serviceName)
139 if err != nil {
140 log.Printf("%s name resolution failed %d time(s) retrying...", serviceName, i+1)
141 } else {
142 //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
143 break
144 }
145 time.Sleep(time.Duration(ec.waitTime) * time.Second)
146 }
147 if err != nil {
148 log.Printf("%s name resolution failed %d times gving up", serviceName, ec.retries)
149 }
150 return
151}
152
153func (ec * EnvoyControl) consulConnect(serviceName string, port string) (err error) {
154 // Fire up a consul client and get the kv store
155 cConfig := consulapi.DefaultNonPooledConfig()
156 cConfig.Address = ec.ipAddrs[serviceName][0] + ":" + port
157 ec.consul, err = consulapi.NewClient(cConfig)
158 if err != nil {
159 log.Fatal("error creating consul client aborting")
160 return
161 }
162 return
163}
164
Richard Jankowski15274592017-12-12 15:52:37 -0500165func (ec * EnvoyControl) etcdConnect(serviceName string, port string) (err error) {
166 // Fire up an etcd client to access the kv store
167 cfg := etcdapi.Config {
168 Endpoints: []string{serviceName + ":" + port},
169 DialTimeout: 5 * time.Second,
170 }
171 ec.etcd, err = etcdapi.New(cfg)
172 if err != nil {
173 log.Fatal("Failed to create etcd client, aborting...")
174 return
175 }
176 return
177}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400178
179//NewConsul returns a Client interface for given consulapi address
180func NewConsulClient(addr string) (*client, error) {
181 config := consulapi.DefaultConfig()
182 config.Address = addr
183 c, err := consulapi.NewClient(config)
184 if err != nil {
185 return nil, err
186 }
187 return &client{consulapi: c}, nil
188}
189
190// Register a service with consulapi local agent
191func (c *client) Register(name string, port int) error {
192 reg := &consulapi.AgentServiceRegistration{
193 ID: name,
194 Name: name,
195 Port: port,
196 }
197 return c.consulapi.Agent().ServiceRegister(reg)
198}
199
200// DeRegister a service with consulapi local agent
201func (c *client) DeRegister(id string) error {
202 return c.consulapi.Agent().ServiceDeregister(id)
203}
204
205// Service return a service
206func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
207 passingOnly := true
208 addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
209 if len(addrs) == 0 && err == nil {
210 return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
211 }
212 if err != nil {
213 return nil, nil, err
214 }
215 return addrs, meta, nil
216}
217
218// Starts envoy with the current restartEpoch
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400219func (ec * EnvoyControl) startEnvoy() {
220 var curEpoch int
221 var err error
222 var count int
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400223
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400224 ec.reLock.Lock() // Make sure we've got exclusive access to the variable
225 cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(ec.restartEpoch),
226 "--config-path", ec.envoyConfig, "--parent-shutdown-time-s", "10")
227
228 curEpoch = ec.restartEpoch
229 ec.restartEpoch += 1
230 ec.reLock.Unlock() // Done, release the lock.
231
232 stderr, err := cmd.StderrPipe()
233 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400234 log.Fatal("Couldn't attach to stderr running envoy command: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400235 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400236 stdout, err := cmd.StdoutPipe()
237 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400238 log.Fatal("Couldn't attach to stdout running envoy command: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400239 }
240 so := bufio.NewReader(stdout)
241 se := bufio.NewReader(stderr)
242
243 if err = cmd.Start(); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400244 log.Fatal("Error starting envoy: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400245 }
246 log.Printf("Envoy(%d) started", curEpoch)
247 soEof := false
248 seEof := false
249
250 data := make([]byte, 80)
251 log.Printf("Log forwarding for envoy(%d) started", curEpoch)
252 for {
253 data = make([]byte, 80)
254 count, err = so.Read(data)
255 log.Printf("ENVOY_LOG(%d): %s", curEpoch, string(data))
256 if err == io.EOF {
257 if seEof == true {
258 break
259 } else {
260 soEof = true
261 }
262 } else if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400263 log.Fatal("Attempt to read envoy standard out failed: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400264 } else if count > 0 {
265 log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
266 }
267 data = make([]byte, 80)
268 count, err = se.Read(data)
269 if err == io.EOF {
270 if soEof == true {
271 break
272 } else {
273 seEof = true
274 }
275 } else if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400276 log.Fatal("Attempt to read envoy standard err failed: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400277 } else if count > 0 {
278 log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
279 }
280 }
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400281 log.Printf("Waiting on envoy %d to exit", curEpoch)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400282 if err = cmd.Wait(); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400283 log.Fatal("Envoy %d exited with an unexpected exit code: %s", curEpoch, err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400284 }
285 log.Printf("Envoy %d exited", curEpoch)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400286 // Check if this was the primary envoy, if so
287 // something went terribly wrong, panic to force
288 // forcefully exit.
289 ec.reLock.Lock()
290 if ec.restartEpoch == (curEpoch + 1) {
291 ec.reLock.Unlock()
292 log.Fatal("Last running envoy exited, aborting!")
293 panic("This should never happen")
294 }
295 ec.reLock.Unlock()
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400296
297}
298
299// This function will use the provided templete file to generate
300// the targetfile substituting
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400301func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400302 var firstRun bool = true
Sergio Slobodrian19628742017-09-05 19:52:54 -0400303 var firstRun2 bool = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400304 f := func() (bool) {
305 var rtrn bool = firstRun
306 firstRun = false
307 return rtrn
308 }
Sergio Slobodrian19628742017-09-05 19:52:54 -0400309 g := func() (bool) {
310 var rtrn bool = firstRun2
311 firstRun2 = false
312 return rtrn
313 }
314 var funcs = template.FuncMap{"isFirst": f, "isFirst2": g}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400315 // Slurp up the template file.
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400316 tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400317 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400318 log.Fatal("ERROR reading the template file, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400319 }
320 //fmt.Println(string(tplt))
321 configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
322 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400323 log.Fatal("Unexpected error loading the Envoy template, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400324 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400325 outFile,err := os.Create(ec.envoyConfig)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400326 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400327 log.Fatal("Unexpected error opening the Envoy config file for write, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400328 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400329 if err = configTemplate.Execute(outFile, ecv); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400330 log.Fatal("Unexpected error executing the Envoy config template, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400331 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400332 //cfgFile, err := ioutil.ReadFile(ec.envoyConfig)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400333 //if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400334 // log.Fatal("ERROR reading the config file, aborting: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400335 // panic(err)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400336 //}
337 //fmt.Println(string(cfgFile))
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400338 return
339}
340
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400341func (ec * EnvoyControl) parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400342 var f interface{}
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400343 var vc VolthaClusterEntry
344 //var isErr bool
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400345
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400346 log.Printf("Parsing %s", string(jsonString))
347 //err = json.Unmarshal(jsonString, &f)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400348 err = json.Unmarshal(jsonString, &f)
349 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400350 log.Fatal("Unable to parse json record %s : %s", jsonString, err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400351 } else {
352 m := f.(map[string]interface{})
353 for k, v := range m {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400354 isErr := false
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400355 vc.Prefix = k
356 //log.Printf("Processing key %s\n", k)
357 switch vv := v.(type) {
358 case map[string]interface{}:
359 for i, u := range vv {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400360 //log.Printf("Processing key %sn", i)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400361 switch uu := u.(type) {
362 case string:
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400363 if i == ec.vcoreHostIpName {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400364 vc.Host = uu
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400365 } else if i == ec.vcoreIdName {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400366 vc.Id = uu
367 } else {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400368 log.Printf("WARNING: unexpected descriptor,%s", i)
369 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400370 }
371 default:
372 log.Printf("WARNING: unexpected type, ")
373 log.Println(i, u)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400374 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400375 }
376 }
377 default:
378 log.Printf("WARNING: unexpected type, ")
379 log.Println(k, v)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400380 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400381 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400382 if ! isErr {
383 vCluster = append(vCluster, vc)
384 }
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400385 }
386 }
387 log.Println("Parsing complete")
388 return
389}
390
Richard Jankowski15274592017-12-12 15:52:37 -0500391func (ec * EnvoyControl) prepareEnvoyConfig(keyValue []byte, ecv * EnvoyConfigVars) (err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400392 var vCluster []VolthaClusterEntry
393
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400394 ecv.HttpPort = ec.envoyHttpPort
395 ecv.HttpsPort = ec.envoyHttpsPort
396 ecv.GrpcPort = ec.envoyGrpcPort
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400397 ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort
398
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400399 // Extract all values from the KV record
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400400 // In the future, the values should all be compared to what we currently have
Richard Jankowski15274592017-12-12 15:52:37 -0500401 vCluster, err = ec.parseAssignment(keyValue)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400402 if err == nil {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400403 ec.vc = vCluster // For future use to determine if there's been a real change
404 //templateValues["VolthaRR"] = []string{}
405 ecv.VolthaRR = []string{}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400406 for i := range vCluster {
407 //log.Printf("Processing %s\n", vCluster[i].Host)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400408 //templateValues["VolthaRR"] = append(templateValues["VolthaRR"].([]string), vCluster[i].Host)
409 ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400410 }
411 } else {
Richard Jankowski15274592017-12-12 15:52:37 -0500412 log.Fatal("Couldn't parse the KV record %s: %s", string(keyValue), err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400413 }
414 return
415}
416
Richard Jankowski15274592017-12-12 15:52:37 -0500417func (ec * EnvoyControl) runEnvoy(keyValue []byte) {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400418 var err error
419 var ecv EnvoyConfigVars
420
Richard Jankowski15274592017-12-12 15:52:37 -0500421 if err = ec.prepareEnvoyConfig(keyValue, &ecv); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400422 log.Fatal("Error preparing envoy config variables, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400423 }
424
425 // Now that we have the data loaded, update the envoy config and start envoy
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400426 ec.updateEnvoyConfig(&ecv)
427 go ec.startEnvoy()
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400428}
429
Richard Jankowski15274592017-12-12 15:52:37 -0500430func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (value []byte, meta * consulapi.QueryMeta, err error) {
431
432 var kvp *consulapi.KVPair
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400433
434 kv := ec.consul.KV()
435 // Get the initial values of the assignment key which contains individual
436 // voltha core IP addresses. This may be empty until voltha populates it
437 // so it must be checked
438 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
439 for i := 0; i < ec.retries; i++ {
440 if err != nil {
441 fmt.Println(err)
442 log.Printf("Unable to read assignment consul key, retry %d", i+1)
443 time.Sleep(time.Duration(ec.waitTime) * time.Second)
444 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
445 } else if kvp != nil && len(string(kvp.Value)) > 10 {
446 // A valid read, return
Richard Jankowski15274592017-12-12 15:52:37 -0500447 value = kvp.Value
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400448 break
449 } else {
450 log.Printf("Voltha assignment key invalid, retry %d", i+1)
451 time.Sleep(time.Duration(ec.waitTime) * time.Second)
452 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
453 }
454 if i == ec.retries {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400455 log.Fatal("Failed to read the assignment key after %d retries, aborting: %s", ec.retries, err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400456 }
457 }
458 return
459}
460
Richard Jankowski15274592017-12-12 15:52:37 -0500461func (ec * EnvoyControl) readEtcdKey(key string) (value []byte, index int64, err error) {
462 // Get the initial values of the assignment key which contains individual
463 // voltha core IP addresses. This may be empty until voltha populates it
464 // so it must be checked
465 resp, err := ec.etcd.Get(context.Background(), ec.assignmentKey)
466 for i := 0; i < ec.retries; i++ {
467 if err != nil {
468 fmt.Println(err)
469 log.Printf("Unable to read assignment etcd key, retry %d", i+1)
470 time.Sleep(time.Duration(ec.waitTime) * time.Second)
471 resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
472 } else if resp != nil && len(resp.Kvs) > 0 && len(resp.Kvs[0].Value) > 10 {
473 // A valid read, return
474 kv := resp.Kvs[0]
475 value = kv.Value
476 index = kv.ModRevision
477 break
478 } else {
479 log.Printf("Voltha assignment key from etcd invalid, retry %d", i+1)
480 time.Sleep(time.Duration(ec.waitTime) * time.Second)
481 resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
482 }
483 if i == ec.retries {
484 log.Fatal("Failed to read assignment key from etcd after %d retries, aborting: %s", ec.retries, err.Error())
485 }
486 }
487 return
488}
489
490func (ec * EnvoyControl) monitorConsulKey() {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400491 var err error
492 var qo consulapi.QueryOptions
493
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400494 // Get the initial values of the assignment key which contains individual
495 // voltha core IP addresses. This may be empty until voltha populates it
496 // so it must be checked
Richard Jankowski15274592017-12-12 15:52:37 -0500497 log.Printf("Monitoring consul key")
498 val, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
499 log.Printf("Starting Envoy, initial index = %d", meta.LastIndex)
500 ec.runEnvoy(val)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400501
502 for {
503 qo.WaitIndex = meta.LastIndex
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400504 qo.RequireConsistent = true
Sergio Slobodrian19628742017-09-05 19:52:54 -0400505 //qo.AllowStale = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400506 for {
507 if qo.WaitIndex != meta.LastIndex {
508 break
509 }
Richard Jankowski15274592017-12-12 15:52:37 -0500510 val, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400511 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400512 log.Fatal("Unable to read assignment consul key: %s\n", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400513 } else {
Richard Jankowski15274592017-12-12 15:52:37 -0500514 log.Println(string(val))
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400515 log.Printf("meta.LastIndex = %d", meta.LastIndex)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400516 }
517 }
518 // Fell through, the index has changed thus the key has changed
519
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400520 log.Printf("Starting Envoy")
Richard Jankowski15274592017-12-12 15:52:37 -0500521 ec.runEnvoy(val)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400522 log.Printf("meta.LastIndex = %d", meta.LastIndex)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400523 }
524}
525
Richard Jankowski15274592017-12-12 15:52:37 -0500526func (ec * EnvoyControl) monitorEtcdKey() {
527 var err error
528
529 // TODO: Check into the use of any data consistency options
530 //
531 // Get the initial values of the assignment key which contains individual
532 // voltha core IP addresses. This may be empty until voltha populates it
533 // so it must be checked
534 log.Printf("Monitoring etcd key")
535 val, index, err := ec.readEtcdKey(ec.assignmentKey)
536 lastIndex := index
537 log.Printf("Starting Envoy, initial index = %d", lastIndex)
538 ec.runEnvoy(val)
539
540 for {
541 for {
542 time.Sleep(60 * time.Second)
543 val, index, err = ec.readEtcdKey(ec.assignmentKey)
544 if err != nil {
545 log.Fatal("Unable to read assignment etcd key: %s\n", err.Error())
546 } else if index != lastIndex {
547 break
548 } else {
549 log.Println(string(val))
550 log.Printf("Last index = %d", index)
551 }
552 }
553 // Fell through, the index has changed thus the key has changed
554 log.Printf("Starting Envoy")
555 ec.runEnvoy(val)
556 log.Printf("Last index = %d", index)
557 lastIndex = index
558 }
559}
560
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400561func (ec * EnvoyControl) ParseCommandArguments() {
562 flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey,
563 "The key for the voltha assignment value in consul")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400564
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400565 flag.StringVar(&( ec.envoyConfigTemplate),"envoy-cfg-template", ec.envoyConfigTemplate,
566 "The path to envoy's configuration template")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400567
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400568 flag.StringVar(&( ec.envoyConfigTemplateBoth),"envoy-cfg-template-both", ec.envoyConfigTemplateBoth,
569 "The path to envoy's configuration template for both http and https")
570
571 flag.StringVar(&( ec.envoyConfigTemplateNoHttps),"envoy-cfg-template-no-https", ec.envoyConfigTemplateNoHttps,
572 "The path to envoy's configuration template with no https")
573
574 flag.StringVar(&( ec.envoyConfigTemplateNoHttp),"envoy-cfg-template-no-http", ec.envoyConfigTemplateNoHttp,
575 "The path to envoy's configuration template with no http")
576
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400577 flag.StringVar(&(ec.envoyConfig), "envoy-config", ec.envoyConfig,
578 "The path to envoy's configuration file" )
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400579
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400580 flag.StringVar(&(ec.vcoreSvcName), "vcore-svc-name", ec.vcoreSvcName,
581 "The service name of the voltha core service")
582
583 flag.StringVar(&(ec.consulSvcName),"consul-svc-nme", ec.consulSvcName,
584 "The service name of the consul service")
585
586 flag.StringVar(&(ec.vcorePort), "vcore-port", ec.vcorePort,
587 "The port where the vcore's GRPC service can be found")
588
589 flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort,
590 "The port where the consul service api can be found")
591
Richard Jankowski15274592017-12-12 15:52:37 -0500592 flag.StringVar(&(ec.kvStore), "kv", ec.kvStore,
593 "The KV store: consul or etcd")
594
595 flag.StringVar(&(ec.kvSvcName), "kv-svc-name", ec.kvSvcName,
596 "The name of the KV store service")
597
598 flag.StringVar(&(ec.kvPort), "kv-port", ec.kvPort,
599 "The port where the KV service api can be found")
600
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400601 flag.StringVar(&(ec.envoyHttpPort), "http-port", ec.envoyHttpPort,
602 "The port where the http front-end is served ")
603
604 flag.StringVar(&(ec.envoyHttpsPort), "https-port", ec.envoyHttpsPort,
605 "The port where the https front-end is served ")
606
607 flag.StringVar(&(ec.envoyGrpcPort), "grpc-port", ec.envoyGrpcPort,
608 "The port where the grpc front-end is served ")
609
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400610 flag.IntVar(&(ec.retries), "retries", ec.retries,
611 "The number of times to retry name lookups and connect requests before failing")
612
613 flag.IntVar(&(ec.waitTime), "wait-time", ec.waitTime,
614 "The number of seconds to wait between retries")
615
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400616 flag.BoolVar(&(ec.httpDisabled), "disable-http", ec.httpDisabled,
617 "Disables the http front-end")
618
619 flag.BoolVar(&(ec.httpsDisabled), "disable-https", ec.httpsDisabled,
620 "Disables ths https front-end")
621
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400622 flag.Parse()
623}
624
625func (ec * EnvoyControl) Initialize() (err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400626 // Resolve consul's virtual ip address
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400627 if err = ec.resolveServiceAddress(ec.consulSvcName); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400628 log.Fatal("Can't proceed without consul's vIP address: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400629 }
630
631 // Resolve voltha's virtual ip address
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400632 if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400633 log.Fatal("Can't proceed without voltha's vIP address: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400634 }
635
Richard Jankowski15274592017-12-12 15:52:37 -0500636 if err = ec.kvConnect[ec.kvStore](ec.kvSvcName, ec.kvPort); err != nil {
637 log.Fatal("Failed to create KV client, aborting: %s", err.Error())
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400638 }
639
640 if ec.httpDisabled == true && ec.httpsDisabled == true {
641 log.Printf("Cowardly refusing to disable both http and https, leavign them both enabled\n")
642 } else if ec.httpDisabled == true {
643 log.Printf("Diasabling http\n")
644 ec.envoyConfigTemplate = ec.envoyConfigTemplateNoHttp
645 } else if ec.httpsDisabled == true {
646 log.Printf("Diasabling https\n")
647 ec.envoyConfigTemplate = ec.envoyConfigTemplateNoHttps
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400648 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400649
650 return
651}
652
653func main() {
654
655 var err error
656 var ec * EnvoyControl
657
658 ec = NewEnvoyControl()
659 ec.ParseCommandArguments()
Richard Jankowski15274592017-12-12 15:52:37 -0500660 if ec.kvStore != "etcd" {
661 ec.kvStore = "consul"
662 }
663 log.Printf("KV-store %s at %s:%s", ec.kvStore, ec.kvSvcName, ec.kvPort)
664
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400665 if err = ec.Initialize(); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400666 log.Fatal("Envoy control initialization failed, aboring: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400667 }
668
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400669
670 // Start envoy and monitor changes to the KV store to reload
Richard Jankowski15274592017-12-12 15:52:37 -0500671 // consul's config. This never returns unless something crashes.
672 ec.kvMonitor[ec.kvStore]()
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400673 log.Fatal("Monitor returned, this shouldn't happen")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400674}