blob: 6766539ffac59824dde969e8ec54e9e9f9ed4ed3 [file] [log] [blame]
Sergio Slobodrianbe829272017-07-17 14:45:45 -04001package main // import "ciena.com/envoyd"
2
3import (
4 "os"
5 "os/exec"
6 "fmt"
7 "log"
8 "strconv"
9 "time"
10 "net"
11 "io/ioutil"
12 "text/template"
13 "encoding/json"
14 consulapi "github.com/hashicorp/consul/api"
15)
16
17// DATA STRUCTURES
18
19type ConfigVars struct {
20 VolthaVip string
21 VolthaRR []string
22}
23
24type VolthaClusterEntry struct {
25 Prefix string
26 Id string
27 Host string
28}
29
30//Client provides an interface for getting data out of Consul
31type Client interface {
32// Get a Service from consulapi
33 Service(string, string) ([]string, error)
34// Register a service with local agent
35 Register(string, int) error
36// Deregister a service with local agent
37 DeRegister(string) error
38}
39
40type client struct {
41 consulapi *consulapi.Client
42}
43
44// This struct is not used yet
45// TODO: Update the daemon to use this structure to for a
46// more object oriented implementation
47type EnvoyControl struct {
48 retrys int
49 waitTime int
50 cv ConfigVars
51 vc []VolthaClusterEntry
52 meta * consulapi.QueryMeta
53 kvp * consulapi.KVPair
54 ipAddrs map[string][]string
55}
56
57// CONSTANTS
58var assignmentKey string = "service/voltha/data/core/assignment"
59var vcoreHostIpName string = "host"
60var vcoreIdName string = "id"
61var restartEpoch int = 0
62var volthaPort string = "50556" // This will be passed inas an option.
63var consulPort string = "8500" // This will be passed in as an option.
64
65//NewConsul returns a Client interface for given consulapi address
66func NewConsulClient(addr string) (*client, error) {
67 config := consulapi.DefaultConfig()
68 config.Address = addr
69 c, err := consulapi.NewClient(config)
70 if err != nil {
71 return nil, err
72 }
73 return &client{consulapi: c}, nil
74}
75
76// Register a service with consulapi local agent
77func (c *client) Register(name string, port int) error {
78 reg := &consulapi.AgentServiceRegistration{
79 ID: name,
80 Name: name,
81 Port: port,
82 }
83 return c.consulapi.Agent().ServiceRegister(reg)
84}
85
86// DeRegister a service with consulapi local agent
87func (c *client) DeRegister(id string) error {
88 return c.consulapi.Agent().ServiceDeregister(id)
89}
90
91// Service return a service
92func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
93 passingOnly := true
94 addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
95 if len(addrs) == 0 && err == nil {
96 return nil, nil, fmt.Errorf("service ( %s ) was not found", service)
97 }
98 if err != nil {
99 return nil, nil, err
100 }
101 return addrs, meta, nil
102}
103
104// Starts envoy with the current restartEpoch
105func startEnvoy(cfg_file string) {
106 cmd := exec.Command("/usr/local/bin/envoy", "--restart-epoch", strconv.Itoa(restartEpoch),
107 "--config-path", cfg_file)
108
109 curEpoch := restartEpoch
110 restartEpoch += 1
111 if err := cmd.Start(); err != nil {
112 log.Fatal(err)
113 panic(err)
114 }
115 log.Printf("Waiting on envoy %d to exit", curEpoch)
116 if err := cmd.Wait(); err != nil {
117 log.Fatal(err, "Unexpected exit code")
118 }
119 log.Printf("Envoy %d exited", curEpoch)
120
121}
122
123// This function will use the provided templete file to generate
124// the targetfile substituting
125func updateEnvoyConfig(templateFile string, targetFile string, cv ConfigVars) {
126 var firstRun bool = true
127 f := func() (bool) {
128 var rtrn bool = firstRun
129 firstRun = false
130 return rtrn
131 }
132 var funcs = template.FuncMap{"isFirst": f}
133 // Slurp up the template file.
134 tplt, err := ioutil.ReadFile(templateFile)
135 if err != nil {
136 panic("ERROR reading the template file, aborting")
137 }
138 //fmt.Println(string(tplt))
139 configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
140 if err != nil {
141 panic(err)
142 }
143 outFile,err := os.Create(targetFile)
144 if err != nil {
145 panic(err)
146 }
147 if err := configTemplate.Execute(outFile, cv); err != nil {
148 panic(err)
149 }
150 //cfgFile, err := ioutil.ReadFile(targetFile)
151 //if err != nil {
152 // panic("ERROR reading the config file, aborting")
153 //}
154 //fmt.Println(string(cfgFile))
155}
156
157func getServiceAddr(serviceName string, retrys int, waitTime int) (addrs []string, err error) {
158 for i := 0; i < retrys; i++ {
159 addrs,err = net.LookupHost(serviceName)
160 if err != nil {
161 log.Printf("%s name resolution failed %d time(s) retrying...\n", serviceName, i+1)
162 } else {
163 //fmt.Printf("%s address = %s\n",serviceName, addrs[0])
164 break
165 }
166 time.Sleep(time.Duration(waitTime) * time.Second)
167 }
168 if err != nil {
169 log.Printf("%s name resolution failed %d times gving up\n", serviceName, retrys)
170 }
171 return
172}
173
174func parseAssignment(jsonString []byte) (vCluster []VolthaClusterEntry, err error) {
175 var vc VolthaClusterEntry
176 var f interface{}
177
178 log.Printf("Parsing %s\n", string(jsonString))
179 err = json.Unmarshal(jsonString, &f)
180 if err != nil {
181 log.Fatal("Unable to parse json record %s", jsonString)
182 panic(err)
183 } else {
184 m := f.(map[string]interface{})
185 for k, v := range m {
186 vc.Prefix = k
187 //log.Printf("Processing key %s\n", k)
188 switch vv := v.(type) {
189 case map[string]interface{}:
190 for i, u := range vv {
191 //log.Printf("Processing key %s\n", i)
192 switch uu := u.(type) {
193 case string:
194 if i == vcoreHostIpName {
195 vc.Host = uu
196 } else if i == vcoreIdName {
197 vc.Id = uu
198 } else {
199 log.Printf("WARNING: unexpected descriptor,%s\n", i)
200 }
201 default:
202 log.Printf("WARNING: unexpected type, ")
203 log.Println(i, u)
204 }
205 }
206 default:
207 log.Printf("WARNING: unexpected type, ")
208 log.Println(k, v)
209 }
210 vCluster = append(vCluster, vc)
211 }
212 }
213 log.Println("Parsing complete")
214 return
215}
216
217func runEnvoy(meta * consulapi.QueryMeta, kvp * consulapi.KVPair, values * map[string]interface{}, cv ConfigVars,
218 templatePath string, configPath string) {
219 var err error
220 var vCluster []VolthaClusterEntry
221
222 // Extract all values from the KV record
223 vCluster, err = parseAssignment([]byte(kvp.Value))
224 if err == nil {
225 (*values)["volthaRR"] = []string{}
226 for i := range vCluster {
227 //log.Printf("Processing %s\n", vCluster[i].Host)
228 (*values)["volthaRR"] = append((*values)["volthaRR"].([]string), vCluster[i].Host)
229 cv.VolthaRR = append(cv.VolthaRR, vCluster[i].Host + ":" + volthaPort)
230 }
231 } else {
232 log.Fatal("Couldn't parse the KV record %s\n", string(kvp.Value))
233 panic(err)
234 }
235
236 // Now that we have the data loaded, update the envoy config and start envoy
237 updateEnvoyConfig(templatePath, configPath, cv)
238 go startEnvoy(configPath)
239 log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
240}
241
242func runMonitorEnvoy(kv * consulapi.KV, values * map[string]interface{}, cv ConfigVars,
243 templatePath string, configPath string) {
244 var err error
245 var qo consulapi.QueryOptions
246
247
248 // Get the initial values of the assignment key which contains individual
249 // voltha core IP addresses. This may be empty until voltha populates it
250 // so it must be checked
251 kvp, meta, err := kv.Get(assignmentKey, nil)
252 for i := 0; i < 10; i++ {
253 if err != nil {
254 fmt.Println(err)
255 log.Printf("Unable to read assignment consul key, retry %d\n", i+1)
256 time.Sleep(time.Duration(2) * time.Second)
257 kvp, meta, err = kv.Get(assignmentKey, nil)
258 } else if kvp != nil && len(string(kvp.Value)) > 10 {
259 log.Printf("Starting Envoy")
260 runEnvoy(meta, kvp, values, cv, templatePath, configPath)
261 break
262 } else {
263 log.Printf("Voltha assignment key invalid, retry %d\n", i+1)
264 time.Sleep(time.Duration(2) * time.Second)
265 kvp, meta, err = kv.Get(assignmentKey, nil)
266 }
267 }
268
269 for {
270 qo.WaitIndex = meta.LastIndex
271 for {
272 if qo.WaitIndex != meta.LastIndex {
273 break
274 }
275 kvp, meta, err = kv.Get(assignmentKey, &qo)
276 if err != nil {
277 log.Fatal("Unable to read assignment consul key")
278 panic(err)
279 } else {
280 log.Println(string(kvp.Value))
281 log.Printf("meta.LastIndex = %d\n", meta.LastIndex)
282 }
283 }
284 // Fell through, the index has changed thus the key has changed
285
286 runEnvoy(meta, kvp, values, cv, templatePath, configPath)
287 }
288}
289
290func main() {
291
292 var err error
293 var addrs []string
294 var cv ConfigVars // Template variables.
295 var consul * consulapi.Client
296 var values map[string]interface{} // Key values map
297
298 values = make(map[string]interface{})
299
300 // Resolve consul's virtual ip address
301 addrs, err = getServiceAddr("consul", 10, 2)
302 if err == nil {
303 values["consulvip"] = addrs[0]
304 log.Printf("Consul's address = %s\n",addrs[0])
305 } else {
306 log.Fatal("Can't proceed without consul's vIP address")
307 panic(err)
308 }
309
310 // Resolve voltha's virtual ip address
311 addrs,err = getServiceAddr("vcore", 10, 2)
312 if err == nil {
313 log.Printf("Voltha address = %s\n",addrs[0])
314 // Config var for the template
315 cv.VolthaVip = addrs[0] + ":" + volthaPort
316 values["volthavip"] = addrs[0]
317 } else {
318 log.Fatal("Can't proceed without voltha's vIP address")
319 panic(err)
320 }
321
322 // Fire up a consul client and get the kv store
323 config := consulapi.DefaultConfig()
324 config.Address = values["consulvip"].(string) + ":" + consulPort
325 consul, err = consulapi.NewClient(config)
326 if err != nil {
327 log.Fatal("error creating consul client aborting")
328 panic(err)
329 }
330 kv := consul.KV()
331
332 // Start envoy and monitor changes to the KV store to reload
333 // consul's config. This never returns unless somethign crashes.
334 runMonitorEnvoy(kv, &values, cv, "/envoy/voltha-grpc-proxy.template.json", "/envoy/voltha-grpc-proxy.json")
335}