blob: caad35b40d94b554f656d2f70e40ba09d78c03ca [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package main -> this is the entry point of the OpenOnuAdapter
18package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "io/ioutil"
25 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
34 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
35 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v3/pkg/log"
38 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v3/pkg/version"
40 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
41 "github.com/opencord/voltha-protos/v3/go/voltha"
42
43 "test.internal/openadapter/internal/pkg/config"
44 ac "test.internal/openadapter/internal/pkg/onuadaptercore"
45)
46
47type adapter struct {
48 //defaultAppName string
49 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
63func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
74 logger.Info("Starting Core Adapter components")
75 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
82 "message-bus",
83 "kv-store",
84 "container-proxy",
85 "core-request-handler",
86 "register-with-core",
87 )
88 }
89 }
90
91 // Setup KV Client
92 logger.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
93 if err = a.setKVClient(); err != nil {
94 logger.Fatalw("error-setting-kv-client", log.Fields{"error": err})
95 }
96
97 if p != nil {
98 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
99 }
100
101 // Setup Log Config
102 /* address config update acc. to [VOL-2736] */
103 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
104 cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
105 go conf.StartLogLevelConfigProcessing(cm, ctx)
106
107 // Setup Kafka Client
108 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
109 logger.Fatalw("Unsupported-common-client", log.Fields{"error": err})
110 }
111
112 if p != nil {
113 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
114 }
115
116 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
117 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
118 logger.Fatalw("error-starting-inter-container-proxy", log.Fields{"error": err})
119 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
120 return err
121 }
122
123 // Create the core proxy to handle requests to the Core
124 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
125
126 // Create the adaptor proxy to handle request between olt and onu
127 a.adapterProxy = com.NewAdapterProxy(a.kip, "eponolt", a.config.CoreTopic, cm.Backend)
128
129 // Create the event proxy to post events to KAFKA
130 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
131
132 // Create the open ONU interface adapter
133 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
134 a.config); err != nil {
135 logger.Fatalw("error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
136 }
137
138 // Register the core request handler
139 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
140 logger.Fatalw("error-setting-core-request-handler", log.Fields{"error": err})
141 }
142
143 // Register this adapter to the Core - retries indefinitely
144 if err = a.registerWithCore(ctx, -1); err != nil {
145 logger.Fatalw("error-registering-with-core", log.Fields{"error": err})
146 }
147
148 // check the readiness and liveliness and update the probe status
149 a.checkServicesReadiness(ctx)
150 return err
151}
152
153func (a *adapter) stop(ctx context.Context) {
154 // Stop leadership tracking
155 a.halted = true
156
157 // send exit signal
158 a.exitChannel <- 0
159
160 // Cleanup - applies only if we had a kvClient
161 if a.kvClient != nil {
162 // Release all reservations
163 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
164 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
165 }
166 // Close the DB connection
167 a.kvClient.Close()
168 }
169
170 if a.kip != nil {
171 a.kip.Stop()
172 }
173}
174
175// #############################################
176// Adapter Utility methods ##### begin #########
177
178func newKVClient(storeType, address string, timeout time.Duration) (kvstore.Client, error) {
179 logger.Infow("kv-store-type", log.Fields{"store": storeType})
180 switch storeType {
181 case "consul":
182 return kvstore.NewConsulClient(address, timeout)
183 case "etcd":
184 return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
185 }
186 return nil, errors.New("unsupported-kv-store")
187}
188
189func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
190
191 logger.Infow("common-client-type", log.Fields{"client": clientType})
192 /* address config update acc. to [VOL-2736] */
193 addr := host + ":" + strconv.Itoa(port)
194
195 switch clientType {
196 case "sarama":
197 return kafka.NewSaramaClient(
198 kafka.Address(addr),
199 kafka.ProducerReturnOnErrors(true),
200 kafka.ProducerReturnOnSuccess(true),
201 kafka.ProducerMaxRetries(6),
202 kafka.ProducerRetryBackoff(time.Millisecond*30),
203 kafka.MetadatMaxRetries(15)), nil
204 }
205
206 return nil, errors.New("unsupported-client-type")
207}
208
209func (a *adapter) setKVClient() error {
210 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
211 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
212 if err != nil {
213 a.kvClient = nil
214 logger.Errorw("error-starting-KVClient", log.Fields{"error": err})
215 return err
216 }
217 a.kvClient = client
218 return nil
219}
220
221func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
222 logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
223 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
224 var err error
225 /* address config update acc. to [VOL-2736] */
226 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
227 kip := kafka.NewInterContainerProxy(
228 kafka.InterContainerAddress(addr),
229 kafka.MsgClient(a.kafkaClient),
230 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
231 count := 0
232 for {
233 if err = kip.Start(); err != nil {
234 logger.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
235 if retries == count {
236 return nil, err
237 }
238 count++
239 // Take a nap before retrying
240 time.Sleep(2 * time.Second)
241 } else {
242 break
243 }
244 }
245 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
246 logger.Info("common-messaging-proxy-created")
247 return kip, nil
248}
249
250func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
251 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
252 cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
253 var err error
254 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg)
255
256 if err = sAcONU.Start(ctx); err != nil {
257 logger.Fatalw("error-starting-EPONOnuAdapterCore", log.Fields{"error": err})
258 return nil, err
259 }
260
261 logger.Info("open-ont-OpenOnuAdapterCore-started")
262 return sAcONU, nil
263}
264
265func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
266 logger.Info("setting-request-handler")
267 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
268 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
269 logger.Errorw("request-handler-setup-failed", log.Fields{"error": err})
270 return err
271
272 }
273 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
274 logger.Info("request-handler-setup-done")
275 return nil
276}
277
278func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
279 adapterID := fmt.Sprintf("epononu_%d", a.config.CurrentReplica)
280 logger.Infow("registering-with-core", log.Fields{
281 "adapterID": adapterID,
282 "currentReplica": a.config.CurrentReplica,
283 "totalReplicas": a.config.TotalReplicas,
284 })
285 adapterDescription := &voltha.Adapter{
286 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
287 Vendor: "VOLTHA EPON ONU",
288 Version: version.VersionInfo.Version,
289 // TODO once we'll be ready to support multiple versions of the adapter
290 // the Endpoint will have to change to `epononu_<currentReplica`>
291 Endpoint: "epononu",
292 Type: "epononu",
293 CurrentReplica: int32(a.config.CurrentReplica),
294 TotalReplicas: int32(a.config.TotalReplicas),
295 }
296 types := []*voltha.DeviceType{{Id: "epononu",
297 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "FEC"},
298 Adapter: "epononu", // Name of the adapter that handles device type
299 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
300 AcceptsAddRemoveFlowUpdates: true}}
301 deviceTypes := &voltha.DeviceTypes{Items: types}
302 count := 0
303 for {
304 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
305 logger.Warnw("registering-with-core-failed", log.Fields{"error": err})
306 if retries == count {
307 return err
308 }
309 count++
310 // Take a nap before retrying
311 time.Sleep(2 * time.Second)
312 } else {
313 break
314 }
315 }
316 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
317 logger.Info("registered-with-core")
318 return nil
319}
320
321/**
322This function checks the liveliness and readiness of the kakfa and kv-client services
323and update the status in the probe.
324*/
325func (a *adapter) checkServicesReadiness(ctx context.Context) {
326 // checks the kafka readiness
327 go a.checkKafkaReadiness(ctx)
328
329 // checks the kv-store readiness
330 go a.checkKvStoreReadiness(ctx)
331}
332
333/**
334This function checks the liveliness and readiness of the kv-store service
335and update the status in the probe.
336*/
337func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
338 // dividing the live probe interval by 2 to get updated status every 30s
339 timeout := a.config.LiveProbeInterval / 2
340 kvStoreChannel := make(chan bool, 1)
341
342 // Default false to check the liveliness.
343 kvStoreChannel <- false
344 for {
345 timeoutTimer := time.NewTimer(timeout)
346 select {
347 case liveliness := <-kvStoreChannel:
348 if !liveliness {
349 // kv-store not reachable or down, updating the status to not ready state
350 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
351 timeout = a.config.NotLiveProbeInterval
352 } else {
353 // kv-store is reachable , updating the status to running state
354 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
355 timeout = a.config.LiveProbeInterval / 2
356 }
357 // Check if the timer has expired or not
358 if !timeoutTimer.Stop() {
359 <-timeoutTimer.C
360 }
361 case <-timeoutTimer.C:
362 // Check the status of the kv-store
363 logger.Info("kv-store liveliness-recheck")
364 if a.kvClient.IsConnectionUp(ctx) {
365 kvStoreChannel <- true
366 } else {
367 kvStoreChannel <- false
368 }
369 }
370 }
371}
372
373/**
374This function checks the liveliness and readiness of the kafka service
375and update the status in the probe.
376*/
377func (a *adapter) checkKafkaReadiness(ctx context.Context) {
378 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
379 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
380 timeout := a.config.LiveProbeInterval
381 for {
382 timeoutTimer := time.NewTimer(timeout)
383
384 select {
385 case healthiness := <-healthinessChannel:
386 if !healthiness {
387 // logger.Fatal will call os.Exit(1) to terminate
388 logger.Fatal("Kafka service has become unhealthy")
389 }
390 case liveliness := <-livelinessChannel:
391 if !liveliness {
392 // kafka not reachable or down, updating the status to not ready state
393 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
394 timeout = a.config.NotLiveProbeInterval
395 } else {
396 // kafka is reachable , updating the status to running state
397 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
398 timeout = a.config.LiveProbeInterval
399 }
400 // Check if the timer has expired or not
401 if !timeoutTimer.Stop() {
402 <-timeoutTimer.C
403 }
404 case <-timeoutTimer.C:
405 logger.Info("kafka-proxy-liveness-recheck")
406 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
407 // the liveness probe may wait (and block) writing to our channel.
408 err := a.kafkaClient.SendLiveness()
409 if err != nil {
410 // Catch possible error case if sending liveness after Sarama has been stopped.
411 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
412 }
413 }
414 }
415}
416
417// Adapter Utility methods ##### end #########
418// #############################################
419
420func getVerifiedCodeVersion() string {
421 if version.VersionInfo.Version == "unknown-version" {
422 content, err := ioutil.ReadFile("VERSION")
423 if err == nil {
424 return (string(content))
425 }
426 logger.Error("'VERSION'-file not readable")
427 }
428 return version.VersionInfo.Version
429}
430
431func printVersion(appName string) {
432 fmt.Println(appName)
433 fmt.Println(version.VersionInfo.String(" "))
434}
435
436func printBanner() {
437 fmt.Println(" ____ ____ ___ ___ _ ")
438 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
439 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
440 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
441 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
442 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
443 fmt.Println(" | | __| |")
444 fmt.Println(" |_| |____/")
445 fmt.Println(" ")
446}
447
448func waitForExit(ctx context.Context) int {
449 signalChannel := make(chan os.Signal, 1)
450 signal.Notify(signalChannel,
451 syscall.SIGHUP,
452 syscall.SIGINT,
453 syscall.SIGTERM,
454 syscall.SIGQUIT)
455
456 exitChannel := make(chan int)
457
458 go func() {
459 select {
460 case <-ctx.Done():
461 logger.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
462 exitChannel <- 2
463 case s := <-signalChannel:
464 switch s {
465 case syscall.SIGHUP,
466 syscall.SIGINT,
467 syscall.SIGTERM,
468 syscall.SIGQUIT:
469 logger.Infow("closing-signal-received", log.Fields{"signal": s})
470 exitChannel <- 0
471 default:
472 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
473 exitChannel <- 1
474 }
475 }
476 }()
477
478 code := <-exitChannel
479 return code
480}
481
482func main() {
483 start := time.Now()
484
485 cf := config.NewAdapterFlags()
486 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion()
487 cf.ParseCommandArguments()
488
489 // Setup logging
490
491 logLevel, err := log.StringToLogLevel(cf.LogLevel)
492 if err != nil {
493 logger.Fatalf("Cannot setup logging, %s", err)
494 }
495
496 // Setup default logger - applies for packages that do not have specific logger set
497 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
498 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
499 }
500
501 // Update all loggers (provisioned via init) with a common field
502 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
503 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
504 }
505
506 log.SetAllLogLevel(logLevel)
507
508 realMain() //fatal on httpListen(0,6060) ...
509
510 defer func() {
511 _ = log.CleanUp()
512 }()
513 // Print version / build information and exit
514 if cf.DisplayVersionOnly {
515 printVersion(defaultAppName)
516 return
517 }
518 logger.Infow("config", log.Fields{"StartName": defaultAppName})
519 logger.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
520 logger.Infow("config", log.Fields{"Arguments": os.Args[1:]})
521
522 // Print banner if specified
523 if cf.Banner {
524 printBanner()
525 }
526
527 logger.Infow("config", log.Fields{"config": *cf})
528
529 ctx, cancel := context.WithCancel(context.Background())
530 defer cancel()
531
532 ad := newAdapter(cf)
533
534 p := &probe.Probe{}
535 logger.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
536
537 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
538 logger.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
539
540 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
541
542 go func() {
543 err := ad.start(probeCtx)
544 // If this operation returns an error
545 // cancel all operations using this context
546 if err != nil {
547 cancel()
548 }
549 }()
550
551 code := waitForExit(ctx)
552 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
553
554 // Cleanup before leaving
555 ad.stop(ctx)
556
557 elapsed := time.Since(start)
558 logger.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
559 //logger.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
560}