blob: 19da3d73858ba2319779895e855a032f020162fc [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2* Copyright 2018-present Open Networking Foundation
3
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7
8* http://www.apache.org/licenses/LICENSE-2.0
9
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15 */
16
17//Package main invokes the application
18package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "os"
25 "os/signal"
26 "syscall"
27 "time"
28
29 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
33 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
34 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
35 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
38 "github.com/opencord/voltha-lib-go/v3/pkg/version"
39 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
40 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
41 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
42 "github.com/opencord/voltha-protos/v3/go/voltha"
43)
44
45type adapter struct {
46 instanceID string
47 config *config.AdapterFlags
48 iAdapter adapters.IAdapter
49 kafkaClient kafka.Client
50 kvClient kvstore.Client
51 kip kafka.InterContainerProxy
52 coreProxy adapterif.CoreProxy
53 adapterProxy adapterif.AdapterProxy
54 eventProxy adapterif.EventProxy
55 halted bool
56 exitChannel chan int
57 receiverChannels []<-chan *ic.InterContainerMessage
58}
59
60func newAdapter(cf *config.AdapterFlags) *adapter {
61 var a adapter
62 a.instanceID = cf.InstanceID
63 a.config = cf
64 a.halted = false
65 a.exitChannel = make(chan int, 1)
66 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
67 return &a
68}
69
70func (a *adapter) start(ctx context.Context) {
71 logger.Info(ctx, "Starting Core Adapter components")
72 var err error
73
74 var p *probe.Probe
75 if value := ctx.Value(probe.ProbeContextKey); value != nil {
76 if _, ok := value.(*probe.Probe); ok {
77 p = value.(*probe.Probe)
78 p.RegisterService(
79 ctx,
80 "message-bus",
81 "kv-store",
82 "container-proxy",
83 "core-request-handler",
84 "register-with-core",
85 )
86 }
87 }
88
89 // Setup KV Client
90 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
91 if err = a.setKVClient(ctx); err != nil {
92 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
93 }
94
95 if p != nil {
96 p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
97 }
98
99 // Setup Log Config
100 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
101 go conf.StartLogLevelConfigProcessing(cm, ctx)
102 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
103
104 // Setup Kafka Client
105 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
106 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
107 }
108
109 if p != nil {
110 p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
111 }
112
113 // setup endpointManager
114
115 // Start the common InterContainer Proxy - retries indefinitely
116 if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
117 logger.Fatal(ctx, "error-starting-inter-container-proxy")
118 }
119
120 // Create the core proxy to handle requests to the Core
121 a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
122
123 // Create the adaptor proxy to handle request between olt and onu
124 a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
125
126 // Create the event proxy to post events to KAFKA
127 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
128
129 // Create the open OLT adapter
130 if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy, a.config); err != nil {
131 logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
132 }
133
134 // Register the core request handler
135 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
136 logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
137 }
138
139 // Register this adapter to the Core - retries indefinitely
140 if err = a.registerWithCore(ctx, -1); err != nil {
141 logger.Fatal(ctx, "error-registering-with-core")
142 }
143
144 // check the readiness and liveliness and update the probe status
145 a.checkServicesReadiness(ctx)
146}
147
148/**
149This function checks the liveliness and readiness of the kakfa and kv-client services
150and update the status in the probe.
151*/
152func (a *adapter) checkServicesReadiness(ctx context.Context) {
153 // checks the kafka readiness
154 go a.checkKafkaReadiness(ctx)
155
156 // checks the kv-store readiness
157 go a.checkKvStoreReadiness(ctx)
158}
159
160/**
161This function checks the liveliness and readiness of the kv-store service
162and update the status in the probe.
163*/
164func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
165 // dividing the live probe interval by 2 to get updated status every 30s
166 timeout := a.config.LiveProbeInterval / 2
167 kvStoreChannel := make(chan bool, 1)
168
169 // Default false to check the liveliness.
170 kvStoreChannel <- false
171 for {
172 timeoutTimer := time.NewTimer(timeout)
173 select {
174 case liveliness := <-kvStoreChannel:
175 if !liveliness {
176 // kv-store not reachable or down, updating the status to not ready state
177 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
178 timeout = a.config.NotLiveProbeInterval
179 } else {
180 // kv-store is reachable , updating the status to running state
181 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
182 timeout = a.config.LiveProbeInterval / 2
183 }
184 // Check if the timer has expired or not
185 if !timeoutTimer.Stop() {
186 <-timeoutTimer.C
187 }
188 case <-timeoutTimer.C:
189 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
190 logger.Info(ctx, "kv-store liveliness-recheck")
191 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
192
193 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
194 // Cleanup cancel func resources
195 cancelFunc()
196 }
197 }
198}
199
200/**
201This function checks the liveliness and readiness of the kafka service
202and update the status in the probe.
203*/
204func (a *adapter) checkKafkaReadiness(ctx context.Context) {
205 livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
206 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
207 timeout := a.config.LiveProbeInterval
208 failed := false
209 for {
210 timeoutTimer := time.NewTimer(timeout)
211
212 select {
213 case healthiness := <-healthinessChannel:
214 if !healthiness {
215 // This will eventually cause K8s to restart the container, and will do
216 // so in a way that allows cleanup to continue, rather than an immediate
217 // panic and exit here.
218 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
219 failed = true
220 }
221 // Check if the timer has expired or not
222 if !timeoutTimer.Stop() {
223 <-timeoutTimer.C
224 }
225 case liveliness := <-livelinessChannel:
226 if failed {
227 // Failures of the message bus are permanent and can't ever be recovered from,
228 // so make sure we never inadvertently reset a failed state back to unready.
229 } else if !liveliness {
230 // kafka not reachable or down, updating the status to not ready state
231 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
232 timeout = a.config.NotLiveProbeInterval
233 } else {
234 // kafka is reachable , updating the status to running state
235 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
236 timeout = a.config.LiveProbeInterval
237 }
238 // Check if the timer has expired or not
239 if !timeoutTimer.Stop() {
240 <-timeoutTimer.C
241 }
242 case <-timeoutTimer.C:
243 logger.Info(ctx, "kafka-proxy-liveness-recheck")
244 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
245 // the liveness probe may wait (and block) writing to our channel.
246 err := a.kafkaClient.SendLiveness(ctx)
247 if err != nil {
248 // Catch possible error case if sending liveness after Sarama has been stopped.
249 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
250 }
251 }
252 }
253}
254
255func (a *adapter) stop(ctx context.Context) {
256 // Stop leadership tracking
257 a.halted = true
258
259 // send exit signal
260 a.exitChannel <- 0
261
262 // Cleanup - applies only if we had a kvClient
263 if a.kvClient != nil {
264 // Release all reservations
265 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
266 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
267 }
268 // Close the DB connection
269 a.kvClient.Close(ctx)
270 }
271
272 if a.kip != nil {
273 a.kip.Stop(ctx)
274 }
275
276 // TODO: More cleanup
277}
278
279func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
280
281 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
282 switch storeType {
283 case "consul":
284 return kvstore.NewConsulClient(ctx, address, timeout)
285 case "etcd":
286 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
287 }
288 return nil, errors.New("unsupported-kv-store")
289}
290
291func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
292
293 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
294 switch clientType {
295 case "sarama":
296 return kafka.NewSaramaClient(
297 kafka.Address(address),
298 kafka.ProducerReturnOnErrors(true),
299 kafka.ProducerReturnOnSuccess(true),
300 kafka.ProducerMaxRetries(6),
301 kafka.ProducerRetryBackoff(time.Millisecond*30),
302 kafka.MetadatMaxRetries(15)), nil
303 }
304
305 return nil, errors.New("unsupported-client-type")
306}
307
308func (a *adapter) setKVClient(ctx context.Context) error {
309 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
310 if err != nil {
311 a.kvClient = nil
312 return err
313 }
314 a.kvClient = client
315
316 return nil
317}
318
319func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
320 logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"address": a.config.KafkaAdapterAddress,
321 "topic": a.config.Topic})
322 var err error
323 kip := kafka.NewInterContainerProxy(
324 kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
325 kafka.MsgClient(a.kafkaClient),
326 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
327 count := 0
328 for {
329 if err = kip.Start(ctx); err != nil {
330 logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err})
331 if retries == count {
332 return nil, err
333 }
334 count = +1
335 // Take a nap before retrying
336 time.Sleep(2 * time.Second)
337 } else {
338 break
339 }
340 }
341 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
342 logger.Info(ctx, "common-messaging-proxy-created")
343 return kip, nil
344}
345
346func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
347 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
348 cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
349 logger.Info(ctx, "starting-open-olt")
350 var err error
351 sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg)
352
353 if err = sOLT.Start(ctx); err != nil {
354 return nil, err
355 }
356
357 logger.Info(ctx, "open-olt-started")
358 return sOLT, nil
359}
360
361func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
362 logger.Info(ctx, "setting-request-handler")
363 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
364 if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
365 return err
366
367 }
368 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
369 logger.Info(ctx, "request-handler-setup-done")
370 return nil
371}
372
373func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
374 adapterID := fmt.Sprintf("eponolt_%d", a.config.CurrentReplica)
375 logger.Infow(ctx, "registering-with-core", log.Fields{
376 "adapterID": adapterID,
377 "currentReplica": a.config.CurrentReplica,
378 "totalReplicas": a.config.TotalReplicas,
379 })
380 adapterDescription := &voltha.Adapter{
381 Id: adapterID, // Unique name for the device type
382 Vendor: "VOLTHA EPON OLT",
383 Version: version.VersionInfo.Version,
384 // TODO once we'll be ready to support multiple versions of the OpenOLT adapter
385 // the Endpoint will have to change to `openolt_<currentReplica`>
386 Endpoint: a.config.Topic,
387 Type: "eponolt",
388 CurrentReplica: int32(a.config.CurrentReplica),
389 TotalReplicas: int32(a.config.TotalReplicas),
390 }
391 types := []*voltha.DeviceType{{
392 Id: "eponolt",
393 Adapter: "eponolt", // Type of the adapter that handles device type
394 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
395 AcceptsAddRemoveFlowUpdates: true}}
396 deviceTypes := &voltha.DeviceTypes{Items: types}
397 count := 0
398 for {
399 if err := a.coreProxy.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), adapterDescription, deviceTypes); err != nil {
400 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
401 if retries == count {
402 return err
403 }
404 count++
405 // Take a nap before retrying
406 time.Sleep(2 * time.Second)
407 } else {
408 break
409 }
410 }
411 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
412 logger.Info(ctx, "registered-with-core")
413 return nil
414}
415
416func waitForExit(ctx context.Context) int {
417 signalChannel := make(chan os.Signal, 1)
418 signal.Notify(signalChannel,
419 syscall.SIGHUP,
420 syscall.SIGINT,
421 syscall.SIGTERM,
422 syscall.SIGQUIT)
423
424 exitChannel := make(chan int)
425
426 go func() {
427 s := <-signalChannel
428 switch s {
429 case syscall.SIGHUP,
430 syscall.SIGINT,
431 syscall.SIGTERM,
432 syscall.SIGQUIT:
433 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
434 exitChannel <- 0
435 default:
436 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
437 exitChannel <- 1
438 }
439 }()
440
441 code := <-exitChannel
442 return code
443}
444
445func printBanner() {
446 fmt.Println(` _____ ____ ____ __ _ ____ _ _______ `)
447 fmt.Println(` | ____| _ \ / __ \| \ | |/ __ \| | |__ __| `)
448 fmt.Println(` | |___| |_| | | | | \ | | | | | | | | `)
449 fmt.Println(` | ___| __/| | | | |\ \| | | | | | | | `)
450 fmt.Println(` | |___| | | |__| | | \ | |__| | |____| | `)
451 fmt.Println(` |_____|_| \____/|_| \__|\____/|______|_| `)
452}
453
454
455
456func printVersion() {
457 fmt.Println("VOLTHA OpenOLT Adapter")
458 fmt.Println(version.VersionInfo.String(" "))
459}
460
461func main() {
462 ctx := context.Background()
463 start := time.Now()
464
465 cf := config.NewAdapterFlags()
466 cf.ParseCommandArguments()
467
468 // Setup logging
469
470 logLevel, err := log.StringToLogLevel(cf.LogLevel)
471 if err != nil {
472 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
473 }
474
475 // Setup default logger - applies for packages that do not have specific logger set
476 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
477 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
478 }
479
480 // Update all loggers (provisionned via init) with a common field
481 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
482 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
483 }
484
485 log.SetAllLogLevel(logLevel)
486
487 realMain()
488
489 defer func() {
490 err := log.CleanUp()
491 if err != nil {
492 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
493 }
494 }()
495
496 // Print version / build information and exit
497 if cf.DisplayVersionOnly {
498 printVersion()
499 return
500 }
501
502 // Print banner if specified
503 if cf.Banner {
504 printBanner()
505 }
506
507 logger.Infow(ctx, "config", log.Fields{"config": *cf})
508
509 ctx, cancel := context.WithCancel(context.Background())
510 defer cancel()
511
512 ad := newAdapter(cf)
513
514 p := &probe.Probe{}
515 go p.ListenAndServe(ctx, ad.config.ProbeAddress)
516
517 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
518
519 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
520 if err != nil {
521 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
522 } else {
523 defer log.TerminateTracing(closer)
524 }
525
526 go ad.start(probeCtx)
527
528 code := waitForExit(ctx)
529 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
530
531 // Cleanup before leaving
532 ad.stop(ctx)
533
534 elapsed := time.Since(start)
535 logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
536}