blob: d4b8b05fd02f82e92309c175bdd8bfe62f136ab1 [file] [log] [blame]
Sergio Slobodrianbe829272017-07-17 14:45:45 -04001package main // import "ciena.com/envoyd"
2
3import (
Richard Jankowski15274592017-12-12 15:52:37 -05004 "context"
Sergio Slobodrianbe829272017-07-17 14:45:45 -04005 "os"
6 "os/exec"
7 "fmt"
8 "log"
9 "strconv"
10 "time"
11 "net"
12 "io/ioutil"
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040013 "io"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040014 "text/template"
15 "encoding/json"
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040016 "sync"
17 "flag"
18 "bufio"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040019 consulapi "github.com/hashicorp/consul/api"
Richard Jankowski15274592017-12-12 15:52:37 -050020 etcdapi "github.com/coreos/etcd/clientv3"
Sergio Slobodrianbe829272017-07-17 14:45:45 -040021)
22
23// DATA STRUCTURES
24
Sergio Slobodrianbe829272017-07-17 14:45:45 -040025//Client provides an interface for getting data out of Consul
26type Client interface {
27// Get a Service from consulapi
28 Service(string, string) ([]string, error)
29// Register a service with local agent
30 Register(string, int) error
31// Deregister a service with local agent
32 DeRegister(string) error
33}
34
35type client struct {
36 consulapi *consulapi.Client
37}
38
Richard Jankowski15274592017-12-12 15:52:37 -050039type KvConnectFunc func(string, string) (error)
40type KvMonitorFunc func()
41
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040042type EnvoyConfigVars struct {
43 VolthaVip string
44 VolthaRR []string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040045 vcorePort string
46 HttpPort string
47 HttpsPort string
48 GrpcPort string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040049}
50
51type VolthaClusterEntry struct {
52 Prefix string
53 Id string
54 Host string
55}
56
Sergio Slobodrianbe829272017-07-17 14:45:45 -040057// This struct is not used yet
58// TODO: Update the daemon to use this structure to for a
59// more object oriented implementation
60type EnvoyControl struct {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040061 // Command line parameters
62 assignmentKey string
63 envoyConfigTemplate string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040064 envoyConfigTemplateBoth string
65 envoyConfigTemplateNoHttps string
66 envoyConfigTemplateNoHttp string
67 envoyHttpPort string
68 envoyHttpsPort string
69 httpDisabled bool
70 httpsDisabled bool
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040071 envoyConfig string
72 vcoreSvcName string
73 consulSvcName string
74 vcorePort string
Sergio Slobodrian6570c742017-08-07 23:11:33 -040075 envoyGrpcPort string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040076 consulPort string
Richard Jankowski15274592017-12-12 15:52:37 -050077 kvStore string
78 kvSvcName string
79 kvPort string
80 kvConnect map[string]KvConnectFunc
81 kvMonitor map[string]KvMonitorFunc
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040082 retries int
Sergio Slobodrianbe829272017-07-17 14:45:45 -040083 waitTime int
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040084 // Runtime variables
85 consul * consulapi.Client
Richard Jankowski15274592017-12-12 15:52:37 -050086 etcd * etcdapi.Client
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040087 vcoreHostIpName string
88 vcoreIdName string
Sergio Slobodrianbe829272017-07-17 14:45:45 -040089 vc []VolthaClusterEntry
Sergio Slobodrianbe829272017-07-17 14:45:45 -040090 ipAddrs map[string][]string
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040091 restartEpoch int
92 reLock sync.Mutex // Exclusive access to the restartEpoch
Sergio Slobodrianbe829272017-07-17 14:45:45 -040093}
94
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -040095
96func NewEnvoyControl() (ec * EnvoyControl) {
97 var envCtrl EnvoyControl = EnvoyControl { // Default values
98 // Command line parameters
99 assignmentKey: "service/voltha/data/core/assignment",
100 envoyConfigTemplate: "/envoy/voltha-grpc-proxy.template.json",
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400101 envoyConfigTemplateBoth: "/envoy/voltha-grpc-proxy.template.json",
102 envoyConfigTemplateNoHttps: "/envoy/voltha-grpc-proxy-no-https.template.json",
103 envoyConfigTemplateNoHttp: "/envoy/voltha-grpc-proxy-no-http.template.json",
104 envoyHttpsPort: "8443",
105 envoyHttpPort: "8882",
106 envoyGrpcPort: "50555",
107 httpDisabled: false,
108 httpsDisabled: false,
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400109 envoyConfig: "/envoy/voltha-grpc-proxy.json",
110 //envoyLogFile: "/envoy/voltha_access_log.log",
111 vcoreSvcName: "vcore",
112 consulSvcName: "consul",
113 vcorePort: "50556",
114 consulPort: "8500",
Richard Jankowski15274592017-12-12 15:52:37 -0500115 kvStore: "consul",
116 kvSvcName: "consul",
117 kvPort: "8500",
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400118 retries: 10,
119 waitTime: 2,
120 // Runtime variables
121 vcoreHostIpName: "host",
122 vcoreIdName: "id",
123 ipAddrs: make(map[string][]string),
124 restartEpoch: 0,
125 }
126 ec = &envCtrl
Richard Jankowski15274592017-12-12 15:52:37 -0500127 ec.kvConnect = make(map[string]KvConnectFunc)
128 ec.kvConnect["consul"] = ec.consulConnect
129 ec.kvConnect["etcd"] = ec.etcdConnect
130 ec.kvMonitor = make(map[string]KvMonitorFunc)
131 ec.kvMonitor["consul"] = ec.monitorConsulKey
132 ec.kvMonitor["etcd"] = ec.monitorEtcdKey
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400133 return
134}
135
136func (ec * EnvoyControl) resolveServiceAddress(serviceName string) (err error) {
137 for i := 0; i < ec.retries; i++ {
138 ec.ipAddrs[serviceName], err = net.LookupHost(serviceName)
139 if err != nil {
140 log.Printf("%s name resolution failed %d time(s) retrying...", serviceName, i+1)
141 } else {
142 //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
143 break
144 }
145 time.Sleep(time.Duration(ec.waitTime) * time.Second)
146 }
147 if err != nil {
Richard Jankowskid4454382018-02-08 16:21:43 -0500148 log.Printf("%s name resolution failed %d times giving up", serviceName, ec.retries)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400149 }
150 return
151}
152
153func (ec * EnvoyControl) consulConnect(serviceName string, port string) (err error) {
154 // Fire up a consul client and get the kv store
155 cConfig := consulapi.DefaultNonPooledConfig()
156 cConfig.Address = ec.ipAddrs[serviceName][0] + ":" + port
157 ec.consul, err = consulapi.NewClient(cConfig)
158 if err != nil {
159 log.Fatal("error creating consul client aborting")
160 return
161 }
162 return
163}
164
Richard Jankowski15274592017-12-12 15:52:37 -0500165func (ec * EnvoyControl) etcdConnect(serviceName string, port string) (err error) {
166 // Fire up an etcd client to access the kv store
167 cfg := etcdapi.Config {
168 Endpoints: []string{serviceName + ":" + port},
169 DialTimeout: 5 * time.Second,
170 }
171 ec.etcd, err = etcdapi.New(cfg)
172 if err != nil {
173 log.Fatal("Failed to create etcd client, aborting...")
174 return
175 }
176 return
177}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400178
179//NewConsul returns a Client interface for given consulapi address
180func NewConsulClient(addr string) (*client, error) {
181 config := consulapi.DefaultConfig()
182 config.Address = addr
183 c, err := consulapi.NewClient(config)
184 if err != nil {
185 return nil, err
186 }
187 return &client{consulapi: c}, nil
188}
189
190// Register a service with consulapi local agent
191func (c *client) Register(name string, port int) error {
192 reg := &consulapi.AgentServiceRegistration{
193 ID: name,
194 Name: name,
195 Port: port,
196 }
197 return c.consulapi.Agent().ServiceRegister(reg)
198}
199
200// DeRegister a service with consulapi local agent
201func (c *client) DeRegister(id string) error {
202 return c.consulapi.Agent().ServiceDeregister(id)
203}
204
205// Service return a service
206func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
207 passingOnly := true
208 addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
209 if len(addrs) == 0 && err == nil {
210 return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
211 }
212 if err != nil {
213 return nil, nil, err
214 }
215 return addrs, meta, nil
216}
217
218// Starts envoy with the current restartEpoch
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400219func (ec * EnvoyControl) startEnvoy() {
220 var curEpoch int
221 var err error
222 var count int
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400223
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400224 ec.reLock.Lock() // Make sure we've got exclusive access to the variable
225 cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(ec.restartEpoch),
226 "--config-path", ec.envoyConfig, "--parent-shutdown-time-s", "10")
227
228 curEpoch = ec.restartEpoch
229 ec.restartEpoch += 1
230 ec.reLock.Unlock() // Done, release the lock.
231
232 stderr, err := cmd.StderrPipe()
233 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400234 log.Fatal("Couldn't attach to stderr running envoy command: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400235 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400236 stdout, err := cmd.StdoutPipe()
237 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400238 log.Fatal("Couldn't attach to stdout running envoy command: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400239 }
240 so := bufio.NewReader(stdout)
241 se := bufio.NewReader(stderr)
242
243 if err = cmd.Start(); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400244 log.Fatal("Error starting envoy: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400245 }
246 log.Printf("Envoy(%d) started", curEpoch)
247 soEof := false
248 seEof := false
249
250 data := make([]byte, 80)
251 log.Printf("Log forwarding for envoy(%d) started", curEpoch)
252 for {
253 data = make([]byte, 80)
254 count, err = so.Read(data)
255 log.Printf("ENVOY_LOG(%d): %s", curEpoch, string(data))
256 if err == io.EOF {
257 if seEof == true {
258 break
259 } else {
260 soEof = true
261 }
262 } else if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400263 log.Fatal("Attempt to read envoy standard out failed: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400264 } else if count > 0 {
265 log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
266 }
267 data = make([]byte, 80)
268 count, err = se.Read(data)
269 if err == io.EOF {
270 if soEof == true {
271 break
272 } else {
273 seEof = true
274 }
275 } else if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400276 log.Fatal("Attempt to read envoy standard err failed: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400277 } else if count > 0 {
278 log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
279 }
280 }
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400281 log.Printf("Waiting on envoy %d to exit", curEpoch)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400282 if err = cmd.Wait(); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400283 log.Fatal("Envoy %d exited with an unexpected exit code: %s", curEpoch, err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400284 }
285 log.Printf("Envoy %d exited", curEpoch)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400286 // Check if this was the primary envoy, if so
287 // something went terribly wrong, panic to force
288 // forcefully exit.
289 ec.reLock.Lock()
290 if ec.restartEpoch == (curEpoch + 1) {
291 ec.reLock.Unlock()
292 log.Fatal("Last running envoy exited, aborting!")
293 panic("This should never happen")
294 }
295 ec.reLock.Unlock()
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400296
297}
298
299// This function will use the provided templete file to generate
300// the targetfile substituting
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400301func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400302 var firstRun bool = true
Sergio Slobodrian19628742017-09-05 19:52:54 -0400303 var firstRun2 bool = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400304 f := func() (bool) {
305 var rtrn bool = firstRun
306 firstRun = false
307 return rtrn
308 }
Sergio Slobodrian19628742017-09-05 19:52:54 -0400309 g := func() (bool) {
310 var rtrn bool = firstRun2
311 firstRun2 = false
312 return rtrn
313 }
314 var funcs = template.FuncMap{"isFirst": f, "isFirst2": g}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400315 // Slurp up the template file.
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400316 tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400317 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400318 log.Fatal("ERROR reading the template file, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400319 }
320 //fmt.Println(string(tplt))
321 configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
322 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400323 log.Fatal("Unexpected error loading the Envoy template, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400324 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400325 outFile,err := os.Create(ec.envoyConfig)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400326 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400327 log.Fatal("Unexpected error opening the Envoy config file for write, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400328 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400329 if err = configTemplate.Execute(outFile, ecv); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400330 log.Fatal("Unexpected error executing the Envoy config template, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400331 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400332 //cfgFile, err := ioutil.ReadFile(ec.envoyConfig)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400333 //if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400334 // log.Fatal("ERROR reading the config file, aborting: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400335 // panic(err)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400336 //}
337 //fmt.Println(string(cfgFile))
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400338 return
339}
340
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400341func (ec * EnvoyControl) parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400342 var f interface{}
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400343 var vc VolthaClusterEntry
344 //var isErr bool
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400345
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400346 log.Printf("Parsing %s", string(jsonString))
347 //err = json.Unmarshal(jsonString, &f)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400348 err = json.Unmarshal(jsonString, &f)
349 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400350 log.Fatal("Unable to parse json record %s : %s", jsonString, err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400351 } else {
352 m := f.(map[string]interface{})
353 for k, v := range m {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400354 isErr := false
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400355 vc.Prefix = k
356 //log.Printf("Processing key %s\n", k)
357 switch vv := v.(type) {
358 case map[string]interface{}:
359 for i, u := range vv {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400360 //log.Printf("Processing key %sn", i)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400361 switch uu := u.(type) {
362 case string:
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400363 if i == ec.vcoreHostIpName {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400364 vc.Host = uu
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400365 } else if i == ec.vcoreIdName {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400366 vc.Id = uu
367 } else {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400368 log.Printf("WARNING: unexpected descriptor,%s", i)
369 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400370 }
371 default:
372 log.Printf("WARNING: unexpected type, ")
373 log.Println(i, u)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400374 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400375 }
376 }
377 default:
378 log.Printf("WARNING: unexpected type, ")
379 log.Println(k, v)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400380 isErr = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400381 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400382 if ! isErr {
383 vCluster = append(vCluster, vc)
384 }
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400385 }
386 }
387 log.Println("Parsing complete")
388 return
389}
390
Richard Jankowski15274592017-12-12 15:52:37 -0500391func (ec * EnvoyControl) prepareEnvoyConfig(keyValue []byte, ecv * EnvoyConfigVars) (err error) {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400392 var vCluster []VolthaClusterEntry
393
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400394 ecv.HttpPort = ec.envoyHttpPort
395 ecv.HttpsPort = ec.envoyHttpsPort
396 ecv.GrpcPort = ec.envoyGrpcPort
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400397 ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort
398
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400399 // Extract all values from the KV record
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400400 // In the future, the values should all be compared to what we currently have
Richard Jankowski15274592017-12-12 15:52:37 -0500401 vCluster, err = ec.parseAssignment(keyValue)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400402 if err == nil {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400403 ec.vc = vCluster // For future use to determine if there's been a real change
404 //templateValues["VolthaRR"] = []string{}
405 ecv.VolthaRR = []string{}
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400406 for i := range vCluster {
407 //log.Printf("Processing %s\n", vCluster[i].Host)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400408 //templateValues["VolthaRR"] = append(templateValues["VolthaRR"].([]string), vCluster[i].Host)
409 ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400410 }
411 } else {
Richard Jankowski15274592017-12-12 15:52:37 -0500412 log.Fatal("Couldn't parse the KV record %s: %s", string(keyValue), err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400413 }
414 return
415}
416
Richard Jankowski15274592017-12-12 15:52:37 -0500417func (ec * EnvoyControl) runEnvoy(keyValue []byte) {
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400418 var err error
419 var ecv EnvoyConfigVars
420
Richard Jankowski15274592017-12-12 15:52:37 -0500421 if err = ec.prepareEnvoyConfig(keyValue, &ecv); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400422 log.Fatal("Error preparing envoy config variables, aborting: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400423 }
424
425 // Now that we have the data loaded, update the envoy config and start envoy
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400426 ec.updateEnvoyConfig(&ecv)
427 go ec.startEnvoy()
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400428}
429
Richard Jankowski15274592017-12-12 15:52:37 -0500430func (ec * EnvoyControl) readConsulKey(key string, qo * consulapi.QueryOptions) (value []byte, meta * consulapi.QueryMeta, err error) {
431
432 var kvp *consulapi.KVPair
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400433
434 kv := ec.consul.KV()
435 // Get the initial values of the assignment key which contains individual
436 // voltha core IP addresses. This may be empty until voltha populates it
437 // so it must be checked
438 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
439 for i := 0; i < ec.retries; i++ {
440 if err != nil {
441 fmt.Println(err)
442 log.Printf("Unable to read assignment consul key, retry %d", i+1)
443 time.Sleep(time.Duration(ec.waitTime) * time.Second)
444 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
445 } else if kvp != nil && len(string(kvp.Value)) > 10 {
446 // A valid read, return
Richard Jankowski15274592017-12-12 15:52:37 -0500447 value = kvp.Value
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400448 break
449 } else {
450 log.Printf("Voltha assignment key invalid, retry %d", i+1)
451 time.Sleep(time.Duration(ec.waitTime) * time.Second)
452 kvp, meta, err = kv.Get(ec.assignmentKey, qo)
453 }
454 if i == ec.retries {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400455 log.Fatal("Failed to read the assignment key after %d retries, aborting: %s", ec.retries, err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400456 }
457 }
458 return
459}
460
Richard Jankowski15274592017-12-12 15:52:37 -0500461func (ec * EnvoyControl) readEtcdKey(key string) (value []byte, index int64, err error) {
462 // Get the initial values of the assignment key which contains individual
463 // voltha core IP addresses. This may be empty until voltha populates it
464 // so it must be checked
465 resp, err := ec.etcd.Get(context.Background(), ec.assignmentKey)
466 for i := 0; i < ec.retries; i++ {
467 if err != nil {
468 fmt.Println(err)
469 log.Printf("Unable to read assignment etcd key, retry %d", i+1)
470 time.Sleep(time.Duration(ec.waitTime) * time.Second)
471 resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
472 } else if resp != nil && len(resp.Kvs) > 0 && len(resp.Kvs[0].Value) > 10 {
473 // A valid read, return
474 kv := resp.Kvs[0]
475 value = kv.Value
476 index = kv.ModRevision
477 break
478 } else {
479 log.Printf("Voltha assignment key from etcd invalid, retry %d", i+1)
480 time.Sleep(time.Duration(ec.waitTime) * time.Second)
481 resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
482 }
483 if i == ec.retries {
484 log.Fatal("Failed to read assignment key from etcd after %d retries, aborting: %s", ec.retries, err.Error())
485 }
486 }
487 return
488}
489
490func (ec * EnvoyControl) monitorConsulKey() {
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400491 var err error
492 var qo consulapi.QueryOptions
493
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400494 // Get the initial values of the assignment key which contains individual
495 // voltha core IP addresses. This may be empty until voltha populates it
496 // so it must be checked
Richard Jankowski15274592017-12-12 15:52:37 -0500497 log.Printf("Monitoring consul key")
498 val, meta, err := ec.readConsulKey(ec.assignmentKey, nil)
499 log.Printf("Starting Envoy, initial index = %d", meta.LastIndex)
500 ec.runEnvoy(val)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400501
502 for {
503 qo.WaitIndex = meta.LastIndex
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400504 qo.RequireConsistent = true
Sergio Slobodrian19628742017-09-05 19:52:54 -0400505 //qo.AllowStale = true
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400506 for {
507 if qo.WaitIndex != meta.LastIndex {
508 break
509 }
Richard Jankowski15274592017-12-12 15:52:37 -0500510 val, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400511 if err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400512 log.Fatal("Unable to read assignment consul key: %s\n", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400513 } else {
Richard Jankowski15274592017-12-12 15:52:37 -0500514 log.Println(string(val))
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400515 log.Printf("meta.LastIndex = %d", meta.LastIndex)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400516 }
517 }
518 // Fell through, the index has changed thus the key has changed
519
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400520 log.Printf("Starting Envoy")
Richard Jankowski15274592017-12-12 15:52:37 -0500521 ec.runEnvoy(val)
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400522 log.Printf("meta.LastIndex = %d", meta.LastIndex)
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400523 }
524}
525
Richard Jankowski15274592017-12-12 15:52:37 -0500526func (ec * EnvoyControl) monitorEtcdKey() {
527 var err error
528
Richard Jankowski15274592017-12-12 15:52:37 -0500529 // Get the initial values of the assignment key which contains individual
530 // voltha core IP addresses. This may be empty until voltha populates it
531 // so it must be checked
Richard Jankowski15274592017-12-12 15:52:37 -0500532
Richard Jankowski9fb5b082018-05-16 17:58:00 -0400533 log.Printf("Monitoring etcd key %s", ec.assignmentKey)
534 val, index, err := ec.readEtcdKey(ec.assignmentKey)
535 if err == nil {
536 lastIndex := index
537 log.Printf("Starting Envoy, initial index = %d", lastIndex)
Richard Jankowski15274592017-12-12 15:52:37 -0500538 ec.runEnvoy(val)
Richard Jankowski9fb5b082018-05-16 17:58:00 -0400539 }
540
541 rch := ec.etcd.Watch(context.Background(), ec.assignmentKey)
542 for resp := range rch {
543 for _, ev := range resp.Events {
544 val = ev.Kv.Value
545 log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
546 if ev.Type == etcdapi.EventTypePut {
547 log.Printf("Starting Envoy")
548 ec.runEnvoy(val)
549 }
550 }
Richard Jankowski15274592017-12-12 15:52:37 -0500551 }
552}
553
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400554func (ec * EnvoyControl) ParseCommandArguments() {
555 flag.StringVar(&(ec.assignmentKey), "assignment-key", ec.assignmentKey,
556 "The key for the voltha assignment value in consul")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400557
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400558 flag.StringVar(&( ec.envoyConfigTemplate),"envoy-cfg-template", ec.envoyConfigTemplate,
559 "The path to envoy's configuration template")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400560
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400561 flag.StringVar(&( ec.envoyConfigTemplateBoth),"envoy-cfg-template-both", ec.envoyConfigTemplateBoth,
562 "The path to envoy's configuration template for both http and https")
563
564 flag.StringVar(&( ec.envoyConfigTemplateNoHttps),"envoy-cfg-template-no-https", ec.envoyConfigTemplateNoHttps,
565 "The path to envoy's configuration template with no https")
566
567 flag.StringVar(&( ec.envoyConfigTemplateNoHttp),"envoy-cfg-template-no-http", ec.envoyConfigTemplateNoHttp,
568 "The path to envoy's configuration template with no http")
569
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400570 flag.StringVar(&(ec.envoyConfig), "envoy-config", ec.envoyConfig,
571 "The path to envoy's configuration file" )
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400572
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400573 flag.StringVar(&(ec.vcoreSvcName), "vcore-svc-name", ec.vcoreSvcName,
574 "The service name of the voltha core service")
575
576 flag.StringVar(&(ec.consulSvcName),"consul-svc-nme", ec.consulSvcName,
577 "The service name of the consul service")
578
579 flag.StringVar(&(ec.vcorePort), "vcore-port", ec.vcorePort,
580 "The port where the vcore's GRPC service can be found")
581
582 flag.StringVar(&(ec.consulPort), "consul-port", ec.consulPort,
583 "The port where the consul service api can be found")
584
Richard Jankowski15274592017-12-12 15:52:37 -0500585 flag.StringVar(&(ec.kvStore), "kv", ec.kvStore,
586 "The KV store: consul or etcd")
587
588 flag.StringVar(&(ec.kvSvcName), "kv-svc-name", ec.kvSvcName,
589 "The name of the KV store service")
590
591 flag.StringVar(&(ec.kvPort), "kv-port", ec.kvPort,
592 "The port where the KV service api can be found")
593
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400594 flag.StringVar(&(ec.envoyHttpPort), "http-port", ec.envoyHttpPort,
595 "The port where the http front-end is served ")
596
597 flag.StringVar(&(ec.envoyHttpsPort), "https-port", ec.envoyHttpsPort,
598 "The port where the https front-end is served ")
599
600 flag.StringVar(&(ec.envoyGrpcPort), "grpc-port", ec.envoyGrpcPort,
601 "The port where the grpc front-end is served ")
602
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400603 flag.IntVar(&(ec.retries), "retries", ec.retries,
604 "The number of times to retry name lookups and connect requests before failing")
605
606 flag.IntVar(&(ec.waitTime), "wait-time", ec.waitTime,
607 "The number of seconds to wait between retries")
608
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400609 flag.BoolVar(&(ec.httpDisabled), "disable-http", ec.httpDisabled,
610 "Disables the http front-end")
611
612 flag.BoolVar(&(ec.httpsDisabled), "disable-https", ec.httpsDisabled,
613 "Disables ths https front-end")
614
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400615 flag.Parse()
616}
617
618func (ec * EnvoyControl) Initialize() (err error) {
Richard Jankowskid4454382018-02-08 16:21:43 -0500619 // Resolve KV store's virtual ip address
620 if err = ec.resolveServiceAddress(ec.kvSvcName); err != nil {
621 log.Fatal("Can't proceed without KV store's vIP address: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400622 }
623
624 // Resolve voltha's virtual ip address
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400625 if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil {
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400626 log.Fatal("Can't proceed without voltha's vIP address: %s", err.Error())
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400627 }
628
Richard Jankowski15274592017-12-12 15:52:37 -0500629 if err = ec.kvConnect[ec.kvStore](ec.kvSvcName, ec.kvPort); err != nil {
630 log.Fatal("Failed to create KV client, aborting: %s", err.Error())
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400631 }
632
633 if ec.httpDisabled == true && ec.httpsDisabled == true {
Richard Jankowskid4454382018-02-08 16:21:43 -0500634 log.Printf("Cowardly refusing to disable both http and https, leaving them both enabled\n")
Sergio Slobodrian6570c742017-08-07 23:11:33 -0400635 } else if ec.httpDisabled == true {
636 log.Printf("Diasabling http\n")
637 ec.envoyConfigTemplate = ec.envoyConfigTemplateNoHttp
638 } else if ec.httpsDisabled == true {
639 log.Printf("Diasabling https\n")
640 ec.envoyConfigTemplate = ec.envoyConfigTemplateNoHttps
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400641 }
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400642
643 return
644}
645
646func main() {
647
648 var err error
649 var ec * EnvoyControl
650
651 ec = NewEnvoyControl()
652 ec.ParseCommandArguments()
Richard Jankowski15274592017-12-12 15:52:37 -0500653 if ec.kvStore != "etcd" {
654 ec.kvStore = "consul"
655 }
656 log.Printf("KV-store %s at %s:%s", ec.kvStore, ec.kvSvcName, ec.kvPort)
657
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400658 if err = ec.Initialize(); err != nil {
Richard Jankowskid4454382018-02-08 16:21:43 -0500659 log.Fatal("Envoy control initialization failed, aborting: %s", err.Error())
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400660 }
661
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400662
663 // Start envoy and monitor changes to the KV store to reload
Richard Jankowski15274592017-12-12 15:52:37 -0500664 // consul's config. This never returns unless something crashes.
665 ec.kvMonitor[ec.kvStore]()
Sergio Slobodrian8dec8de2017-07-20 12:45:07 -0400666 log.Fatal("Monitor returned, this shouldn't happen")
Sergio Slobodrianbe829272017-07-17 14:45:45 -0400667}