blob: 7650ad3c3a25e6e33509513ad1fe0d1744c9636d [file] [log] [blame]
cuilin20187b2a8c32019-03-26 19:52:28 -07001/*
cbabu116b73f2019-12-10 17:56:32 +05302* Copyright 2018-present Open Networking Foundation
cuilin20187b2a8c32019-03-26 19:52:28 -07003
cbabu116b73f2019-12-10 17:56:32 +05304* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
cuilin20187b2a8c32019-03-26 19:52:28 -07007
cbabu116b73f2019-12-10 17:56:32 +05308* http://www.apache.org/licenses/LICENSE-2.0
cuilin20187b2a8c32019-03-26 19:52:28 -07009
cbabu116b73f2019-12-10 17:56:32 +053010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
cuilin20187b2a8c32019-03-26 19:52:28 -070015 */
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070016
17//Package main invokes the application
cuilin20187b2a8c32019-03-26 19:52:28 -070018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
kdarapu381c6902019-07-31 18:23:16 +053024 "os"
25 "os/signal"
kdarapu381c6902019-07-31 18:23:16 +053026 "syscall"
27 "time"
28
Esin Karamanccb714b2019-11-29 15:02:06 +000029 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
kdarapu381c6902019-07-31 18:23:16 +053030
Esin Karamanccb714b2019-11-29 15:02:06 +000031 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
divyadesaia37f78b2020-02-07 12:41:22 +000033 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
Esin Karamanccb714b2019-11-29 15:02:06 +000034 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
35 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
Scott Bakerdbd960e2020-02-28 08:57:51 -080038 "github.com/opencord/voltha-lib-go/v3/pkg/version"
39 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
40 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
Esin Karamanccb714b2019-11-29 15:02:06 +000041 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
42 "github.com/opencord/voltha-protos/v3/go/voltha"
cuilin20187b2a8c32019-03-26 19:52:28 -070043)
44
45type adapter struct {
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070046 instanceID string
cuilin20187b2a8c32019-03-26 19:52:28 -070047 config *config.AdapterFlags
48 iAdapter adapters.IAdapter
49 kafkaClient kafka.Client
50 kvClient kvstore.Client
npujarec5762e2020-01-01 14:08:48 +053051 kip kafka.InterContainerProxy
kdarapu381c6902019-07-31 18:23:16 +053052 coreProxy adapterif.CoreProxy
53 adapterProxy adapterif.AdapterProxy
54 eventProxy adapterif.EventProxy
cuilin20187b2a8c32019-03-26 19:52:28 -070055 halted bool
56 exitChannel chan int
57 receiverChannels []<-chan *ic.InterContainerMessage
58}
59
cuilin20187b2a8c32019-03-26 19:52:28 -070060func newAdapter(cf *config.AdapterFlags) *adapter {
61 var a adapter
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070062 a.instanceID = cf.InstanceID
cuilin20187b2a8c32019-03-26 19:52:28 -070063 a.config = cf
64 a.halted = false
65 a.exitChannel = make(chan int, 1)
66 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
67 return &a
68}
69
70func (a *adapter) start(ctx context.Context) {
Girish Kumar2ad402b2020-03-20 19:45:12 +000071 logger.Info("Starting Core Adapter components")
cuilin20187b2a8c32019-03-26 19:52:28 -070072 var err error
73
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000074 var p *probe.Probe
75 if value := ctx.Value(probe.ProbeContextKey); value != nil {
76 if _, ok := value.(*probe.Probe); ok {
77 p = value.(*probe.Probe)
78 p.RegisterService(
79 "message-bus",
80 "kv-store",
81 "container-proxy",
82 "core-request-handler",
83 "register-with-core",
84 )
85 }
86 }
87
cuilin20187b2a8c32019-03-26 19:52:28 -070088 // Setup KV Client
Girish Kumar2ad402b2020-03-20 19:45:12 +000089 logger.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070090 if err = a.setKVClient(); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +000091 logger.Fatalw("error-setting-kv-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -070092 }
93
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000094 if p != nil {
95 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
96 }
97
divyadesaia37f78b2020-02-07 12:41:22 +000098 // Setup Log Config
Neha Sharma3f221ae2020-04-29 19:02:12 +000099 cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
divyadesaid26f6b12020-03-19 06:30:28 +0000100 go conf.StartLogLevelConfigProcessing(cm, ctx)
divyadesaia37f78b2020-02-07 12:41:22 +0000101
cuilin20187b2a8c32019-03-26 19:52:28 -0700102 // Setup Kafka Client
Neha Sharma3f221ae2020-04-29 19:02:12 +0000103 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterAddress); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000104 logger.Fatalw("Unsupported-common-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700105 }
106
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000107 if p != nil {
108 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
109 }
110
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700111 // setup endpointManager
112
cuilin20187b2a8c32019-03-26 19:52:28 -0700113 // Start the common InterContainer Proxy - retries indefinitely
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000114 if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000115 logger.Fatal("error-starting-inter-container-proxy")
cuilin20187b2a8c32019-03-26 19:52:28 -0700116 }
117
118 // Create the core proxy to handle requests to the Core
119 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
120
121 // Create the adaptor proxy to handle request between olt and onu
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700122 a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic, cm.Backend)
cuilin20187b2a8c32019-03-26 19:52:28 -0700123
Devmalya Paulfb990a52019-07-09 10:01:49 -0400124 // Create the event proxy to post events to KAFKA
125 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
126
cuilin20187b2a8c32019-03-26 19:52:28 -0700127 // Create the open OLT adapter
Girish Kumarf26e4882020-03-05 06:49:10 +0000128 if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy, a.config); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000129 logger.Fatalw("error-starting-openolt", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700130 }
131
132 // Register the core request handler
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000133 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000134 logger.Fatalw("error-setting-core-request-handler", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700135 }
136
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700137 // Register this adapter to the Core - retries indefinitely
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000138 if err = a.registerWithCore(ctx, -1); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000139 logger.Fatal("error-registering-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700140 }
cbabu95f21522019-11-13 14:25:18 +0100141
cbabu116b73f2019-12-10 17:56:32 +0530142 // check the readiness and liveliness and update the probe status
143 a.checkServicesReadiness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100144}
145
146/**
147This function checks the liveliness and readiness of the kakfa and kv-client services
148and update the status in the probe.
149*/
cbabu116b73f2019-12-10 17:56:32 +0530150func (a *adapter) checkServicesReadiness(ctx context.Context) {
151 // checks the kafka readiness
152 go a.checkKafkaReadiness(ctx)
153
154 // checks the kv-store readiness
155 go a.checkKvStoreReadiness(ctx)
156}
157
158/**
159This function checks the liveliness and readiness of the kv-store service
160and update the status in the probe.
161*/
162func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
163 // dividing the live probe interval by 2 to get updated status every 30s
164 timeout := a.config.LiveProbeInterval / 2
165 kvStoreChannel := make(chan bool, 1)
166
167 // Default false to check the liveliness.
168 kvStoreChannel <- false
cbabu95f21522019-11-13 14:25:18 +0100169 for {
cbabu116b73f2019-12-10 17:56:32 +0530170 timeoutTimer := time.NewTimer(timeout)
171 select {
172 case liveliness := <-kvStoreChannel:
173 if !liveliness {
174 // kv-store not reachable or down, updating the status to not ready state
175 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
176 timeout = a.config.NotLiveProbeInterval
177 } else {
178 // kv-store is reachable , updating the status to running state
179 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
180 timeout = a.config.LiveProbeInterval / 2
181 }
182 // Check if the timer has expired or not
183 if !timeoutTimer.Stop() {
184 <-timeoutTimer.C
185 }
186 case <-timeoutTimer.C:
Girish Kumarbeadc112020-02-26 18:41:02 +0000187 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
Girish Kumar2ad402b2020-03-20 19:45:12 +0000188 logger.Info("kv-store liveliness-recheck")
Girish Kumarbeadc112020-02-26 18:41:02 +0000189 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
190
191 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
192 // Cleanup cancel func resources
193 cancelFunc()
cbabu95f21522019-11-13 14:25:18 +0100194 }
cbabu116b73f2019-12-10 17:56:32 +0530195 }
196}
197
198/**
199This function checks the liveliness and readiness of the kafka service
200and update the status in the probe.
201*/
202func (a *adapter) checkKafkaReadiness(ctx context.Context) {
203 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
Scott Baker86fce9a2019-12-12 09:47:17 -0800204 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
cbabu116b73f2019-12-10 17:56:32 +0530205 timeout := a.config.LiveProbeInterval
Scott Bakere701b862020-02-20 16:19:16 -0800206 failed := false
cbabu116b73f2019-12-10 17:56:32 +0530207 for {
208 timeoutTimer := time.NewTimer(timeout)
209
210 select {
Scott Baker86fce9a2019-12-12 09:47:17 -0800211 case healthiness := <-healthinessChannel:
212 if !healthiness {
Scott Bakere701b862020-02-20 16:19:16 -0800213 // This will eventually cause K8s to restart the container, and will do
214 // so in a way that allows cleanup to continue, rather than an immediate
215 // panic and exit here.
216 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
217 failed = true
218 }
219 // Check if the timer has expired or not
220 if !timeoutTimer.Stop() {
221 <-timeoutTimer.C
Scott Baker86fce9a2019-12-12 09:47:17 -0800222 }
cbabu116b73f2019-12-10 17:56:32 +0530223 case liveliness := <-livelinessChannel:
Scott Bakere701b862020-02-20 16:19:16 -0800224 if failed {
225 // Failures of the message bus are permanent and can't ever be recovered from,
226 // so make sure we never inadvertently reset a failed state back to unready.
227 } else if !liveliness {
cbabu116b73f2019-12-10 17:56:32 +0530228 // kafka not reachable or down, updating the status to not ready state
229 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
230 timeout = a.config.NotLiveProbeInterval
231 } else {
232 // kafka is reachable , updating the status to running state
233 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
234 timeout = a.config.LiveProbeInterval
235 }
236 // Check if the timer has expired or not
237 if !timeoutTimer.Stop() {
238 <-timeoutTimer.C
239 }
240 case <-timeoutTimer.C:
Girish Kumar2ad402b2020-03-20 19:45:12 +0000241 logger.Info("kafka-proxy-liveness-recheck")
cbabu116b73f2019-12-10 17:56:32 +0530242 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
243 // the liveness probe may wait (and block) writing to our channel.
244 err := a.kafkaClient.SendLiveness()
245 if err != nil {
246 // Catch possible error case if sending liveness after Sarama has been stopped.
Girish Kumar2ad402b2020-03-20 19:45:12 +0000247 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530248 }
cbabu95f21522019-11-13 14:25:18 +0100249 }
cbabu95f21522019-11-13 14:25:18 +0100250 }
cuilin20187b2a8c32019-03-26 19:52:28 -0700251}
252
npujarec5762e2020-01-01 14:08:48 +0530253func (a *adapter) stop(ctx context.Context) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700254 // Stop leadership tracking
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700255 a.halted = true
cuilin20187b2a8c32019-03-26 19:52:28 -0700256
257 // send exit signal
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700258 a.exitChannel <- 0
cuilin20187b2a8c32019-03-26 19:52:28 -0700259
260 // Cleanup - applies only if we had a kvClient
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700261 if a.kvClient != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700262 // Release all reservations
npujarec5762e2020-01-01 14:08:48 +0530263 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000264 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700265 }
266 // Close the DB connection
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700267 a.kvClient.Close()
cuilin20187b2a8c32019-03-26 19:52:28 -0700268 }
269
Scott Bakere701b862020-02-20 16:19:16 -0800270 if a.kip != nil {
271 a.kip.Stop()
272 }
273
cuilin20187b2a8c32019-03-26 19:52:28 -0700274 // TODO: More cleanup
275}
276
Neha Sharmacc656962020-04-14 14:26:11 +0000277func newKVClient(storeType, address string, timeout time.Duration) (kvstore.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700278
Girish Kumar2ad402b2020-03-20 19:45:12 +0000279 logger.Infow("kv-store-type", log.Fields{"store": storeType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700280 switch storeType {
281 case "consul":
282 return kvstore.NewConsulClient(address, timeout)
283 case "etcd":
Scott Bakered4a8e72020-04-17 11:10:20 -0700284 return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
cuilin20187b2a8c32019-03-26 19:52:28 -0700285 }
286 return nil, errors.New("unsupported-kv-store")
287}
288
Neha Sharma3f221ae2020-04-29 19:02:12 +0000289func newKafkaClient(clientType, address string) (kafka.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700290
Girish Kumar2ad402b2020-03-20 19:45:12 +0000291 logger.Infow("common-client-type", log.Fields{"client": clientType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700292 switch clientType {
293 case "sarama":
294 return kafka.NewSaramaClient(
Neha Sharma3f221ae2020-04-29 19:02:12 +0000295 kafka.Address(address),
cuilin20187b2a8c32019-03-26 19:52:28 -0700296 kafka.ProducerReturnOnErrors(true),
297 kafka.ProducerReturnOnSuccess(true),
298 kafka.ProducerMaxRetries(6),
Abhilash S.L3b494632019-07-16 15:51:09 +0530299 kafka.ProducerRetryBackoff(time.Millisecond*30),
300 kafka.MetadatMaxRetries(15)), nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700301 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700302
cuilin20187b2a8c32019-03-26 19:52:28 -0700303 return nil, errors.New("unsupported-client-type")
304}
305
306func (a *adapter) setKVClient() error {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000307 client, err := newKVClient(a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
cuilin20187b2a8c32019-03-26 19:52:28 -0700308 if err != nil {
309 a.kvClient = nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700310 return err
311 }
312 a.kvClient = client
divyadesaia37f78b2020-02-07 12:41:22 +0000313
cuilin20187b2a8c32019-03-26 19:52:28 -0700314 return nil
315}
316
npujarec5762e2020-01-01 14:08:48 +0530317func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000318 logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"address": a.config.KafkaAdapterAddress,
319 "topic": a.config.Topic})
cuilin20187b2a8c32019-03-26 19:52:28 -0700320 var err error
npujarec5762e2020-01-01 14:08:48 +0530321 kip := kafka.NewInterContainerProxy(
Neha Sharma3f221ae2020-04-29 19:02:12 +0000322 kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
cuilin20187b2a8c32019-03-26 19:52:28 -0700323 kafka.MsgClient(a.kafkaClient),
npujarec5762e2020-01-01 14:08:48 +0530324 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
cuilin20187b2a8c32019-03-26 19:52:28 -0700325 count := 0
326 for {
327 if err = kip.Start(); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000328 logger.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700329 if retries == count {
330 return nil, err
331 }
332 count = +1
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700333 // Take a nap before retrying
cuilin20187b2a8c32019-03-26 19:52:28 -0700334 time.Sleep(2 * time.Second)
335 } else {
336 break
337 }
338 }
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000339 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Girish Kumar2ad402b2020-03-20 19:45:12 +0000340 logger.Info("common-messaging-proxy-created")
cuilin20187b2a8c32019-03-26 19:52:28 -0700341 return kip, nil
342}
343
npujarec5762e2020-01-01 14:08:48 +0530344func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
Abhilash Laxmeshwarf9942e92020-01-07 15:32:44 +0530345 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
346 cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000347 logger.Info("starting-open-olt")
cuilin20187b2a8c32019-03-26 19:52:28 -0700348 var err error
Abhilash Laxmeshwarf9942e92020-01-07 15:32:44 +0530349 sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg)
cuilin20187b2a8c32019-03-26 19:52:28 -0700350
351 if err = sOLT.Start(ctx); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700352 return nil, err
353 }
354
Girish Kumar2ad402b2020-03-20 19:45:12 +0000355 logger.Info("open-olt-started")
cuilin20187b2a8c32019-03-26 19:52:28 -0700356 return sOLT, nil
357}
358
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000359func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000360 logger.Info("setting-request-handler")
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700361 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
cuilin20187b2a8c32019-03-26 19:52:28 -0700362 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700363 return err
364
365 }
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000366 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Girish Kumar2ad402b2020-03-20 19:45:12 +0000367 logger.Info("request-handler-setup-done")
cuilin20187b2a8c32019-03-26 19:52:28 -0700368 return nil
369}
370
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000371func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700372 adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
373 logger.Infow("registering-with-core", log.Fields{
374 "adapterID": adapterID,
375 "currentReplica": a.config.CurrentReplica,
376 "totalReplicas": a.config.TotalReplicas,
377 })
378 adapterDescription := &voltha.Adapter{
379 Id: adapterID, // Unique name for the device type
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400380 Vendor: "VOLTHA OpenOLT",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700381 Version: version.VersionInfo.Version,
382 // TODO once we'll be ready to support multiple versions of the OpenOLT adapter
383 // the Endpoint will have to change to `openolt_<currentReplica`>
serkant.uluderya5e3528d2020-05-22 19:31:07 -0700384 Endpoint: a.config.Topic,
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700385 Type: "openolt",
386 CurrentReplica: int32(a.config.CurrentReplica),
387 TotalReplicas: int32(a.config.TotalReplicas),
388 }
389 types := []*voltha.DeviceType{{
390 Id: "openolt",
391 Adapter: "openolt", // Type of the adapter that handles device type
Girish Gowdru0c588b22019-04-23 23:24:56 -0400392 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
393 AcceptsAddRemoveFlowUpdates: true}}
cuilin20187b2a8c32019-03-26 19:52:28 -0700394 deviceTypes := &voltha.DeviceTypes{Items: types}
395 count := 0
396 for {
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700397 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000398 logger.Warnw("registering-with-core-failed", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700399 if retries == count {
400 return err
401 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700402 count++
403 // Take a nap before retrying
cuilin20187b2a8c32019-03-26 19:52:28 -0700404 time.Sleep(2 * time.Second)
405 } else {
406 break
407 }
408 }
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000409 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Girish Kumar2ad402b2020-03-20 19:45:12 +0000410 logger.Info("registered-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700411 return nil
412}
413
414func waitForExit() int {
415 signalChannel := make(chan os.Signal, 1)
416 signal.Notify(signalChannel,
417 syscall.SIGHUP,
418 syscall.SIGINT,
419 syscall.SIGTERM,
420 syscall.SIGQUIT)
421
422 exitChannel := make(chan int)
423
424 go func() {
425 s := <-signalChannel
426 switch s {
427 case syscall.SIGHUP,
428 syscall.SIGINT,
429 syscall.SIGTERM,
430 syscall.SIGQUIT:
Girish Kumar2ad402b2020-03-20 19:45:12 +0000431 logger.Infow("closing-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700432 exitChannel <- 0
433 default:
Girish Kumar2ad402b2020-03-20 19:45:12 +0000434 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700435 exitChannel <- 1
436 }
437 }()
438
439 code := <-exitChannel
440 return code
441}
442
443func printBanner() {
David K. Bainbridge794735f2020-02-11 21:01:37 -0800444 fmt.Println(` ____ ____ _ _______ `)
445 fmt.Println(` / _ \ / __ \| | |__ __|`)
446 fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
447 fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
448 fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
449 fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
450 fmt.Println(` | | `)
451 fmt.Println(` |_| `)
452 fmt.Println(` `)
cuilin20187b2a8c32019-03-26 19:52:28 -0700453}
454
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400455func printVersion() {
456 fmt.Println("VOLTHA OpenOLT Adapter")
457 fmt.Println(version.VersionInfo.String(" "))
458}
459
cuilin20187b2a8c32019-03-26 19:52:28 -0700460func main() {
461 start := time.Now()
462
463 cf := config.NewAdapterFlags()
464 cf.ParseCommandArguments()
465
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700466 // Setup logging
cuilin20187b2a8c32019-03-26 19:52:28 -0700467
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000468 logLevel, err := log.StringToLogLevel(cf.LogLevel)
469 if err != nil {
Girish Kumar2ad402b2020-03-20 19:45:12 +0000470 logger.Fatalf("Cannot setup logging, %s", err)
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000471 }
Rohan Agrawal2488f192020-01-31 09:26:55 +0000472
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700473 // Setup default logger - applies for packages that do not have specific logger set
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000474 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700475 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
476 }
477
478 // Update all loggers (provisionned via init) with a common field
Hardik Windlassb9c869b2019-10-10 08:34:32 +0000479 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700480 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
481 }
482
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000483 log.SetAllLogLevel(logLevel)
Rohan Agrawal93bced32020-02-11 10:16:01 +0000484
Matteo Scandolo8f2b9572020-02-28 15:35:23 -0800485 realMain()
486
cuilin20187b2a8c32019-03-26 19:52:28 -0700487 defer log.CleanUp()
488
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400489 // Print version / build information and exit
490 if cf.DisplayVersionOnly {
491 printVersion()
492 return
493 }
494
cuilin20187b2a8c32019-03-26 19:52:28 -0700495 // Print banner if specified
496 if cf.Banner {
497 printBanner()
498 }
499
Girish Kumar2ad402b2020-03-20 19:45:12 +0000500 logger.Infow("config", log.Fields{"config": *cf})
cuilin20187b2a8c32019-03-26 19:52:28 -0700501
502 ctx, cancel := context.WithCancel(context.Background())
503 defer cancel()
504
505 ad := newAdapter(cf)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000506
507 p := &probe.Probe{}
Neha Sharma3f221ae2020-04-29 19:02:12 +0000508 go p.ListenAndServe(ad.config.ProbeAddress)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000509
510 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
511
512 go ad.start(probeCtx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700513
514 code := waitForExit()
Girish Kumar2ad402b2020-03-20 19:45:12 +0000515 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
cuilin20187b2a8c32019-03-26 19:52:28 -0700516
517 // Cleanup before leaving
npujarec5762e2020-01-01 14:08:48 +0530518 ad.stop(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700519
520 elapsed := time.Since(start)
Girish Kumar2ad402b2020-03-20 19:45:12 +0000521 logger.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
cuilin20187b2a8c32019-03-26 19:52:28 -0700522}