blob: 7a3a0d559e444ed35c4d1d340c9f80f558e307eb [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package main -> this is the entry point of the OpenAdapter
18package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -070024 "github.com/opencord/voltha-lib-go/v3/pkg/db"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
34 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
35 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
38 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
39 "github.com/opencord/voltha-protos/v3/go/voltha"
40
41 ac "test.internal/openadapter/adaptercoreont"
42 "test.internal/openadapter/config"
43 "test.internal/openadapter/config/version"
44)
45
46type adapter struct {
47 defaultAppName string
48 instanceID string
49 config *config.AdapterFlags
50 iAdapter adapters.IAdapter // from Voltha interface adapters
51 kafkaClient kafka.Client
52 kvClient kvstore.Client
53 kip kafka.InterContainerProxy
54 coreProxy adapterif.CoreProxy
55 adapterProxy adapterif.AdapterProxy
56 eventProxy adapterif.EventProxy
57 halted bool
58 exitChannel chan int
59 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
60}
61
62//package single start function (run at first package instantiation)
63func init() {
64 _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
65}
66
67func newAdapter(cf *config.AdapterFlags) *adapter {
68 var a adapter
69 a.instanceID = cf.InstanceID
70 a.config = cf
71 a.halted = false
72 a.exitChannel = make(chan int, 1)
73 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
74 return &a
75}
76
77func (a *adapter) start(ctx context.Context) error {
78 log.Info("Starting Core Adapter components")
79 var err error
80
81 var p *probe.Probe
82 if value := ctx.Value(probe.ProbeContextKey); value != nil {
83 if _, ok := value.(*probe.Probe); ok {
84 p = value.(*probe.Probe)
85 p.RegisterService(
86 "message-bus",
87 "kv-store",
88 "container-proxy",
89 "core-request-handler",
90 "register-with-core",
91 )
92 }
93 }
94
95 // Setup KV Client
96 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
97 if err = a.setKVClient(); err != nil {
98 log.Fatal("error-setting-kv-client")
99 }
100
101 if p != nil {
102 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
103 }
104
105 // Setup Kafka Client
106 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
107 log.Fatal("Unsupported-common-client")
108 }
109
110 if p != nil {
111 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
112 }
113
114 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
115 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
116 log.Fatal("error-starting-inter-container-proxy")
117 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
118 return err
119 }
120
121 // Create the core proxy to handle requests to the Core
122 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
123
124 // Create the adaptor proxy to handle request between olt and onu
125 //a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700126 backend := &db.Backend{
127 Client: a.kvClient,
128 StoreType: a.config.KVStoreType,
129 Host: a.config.KVStoreHost,
130 Port: a.config.KVStorePort,
131 Timeout: a.config.KVStoreTimeout,
132 PathPrefix: "service/voltha",
133 }
134 a.adapterProxy = com.NewAdapterProxy(a.kip, "openolt", a.config.CoreTopic, backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000135
136 // Create the event proxy to post events to KAFKA
137 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
138
139 // Create the open ONU interface adapter
140 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
141 a.config); err != nil {
142 log.Fatal("error-starting-VolthaInterfaceAdapter for OpenOnt")
143 }
144
145 // Register the core request handler
146 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
147 log.Fatal("error-setting-core-request-handler")
148 }
149
150 // Register this adapter to the Core - retries indefinitely
151 if err = a.registerWithCore(ctx, -1); err != nil {
152 log.Fatal("error-registering-with-core")
153 }
154
155 // check the readiness and liveliness and update the probe status
156 a.checkServicesReadiness(ctx)
157 return err
158}
159
160func (a *adapter) stop(ctx context.Context) {
161 // Stop leadership tracking
162 a.halted = true
163
164 // send exit signal
165 a.exitChannel <- 0
166
167 // Cleanup - applies only if we had a kvClient
168 if a.kvClient != nil {
169 // Release all reservations
170 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
171 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
172 }
173 // Close the DB connection
174 a.kvClient.Close()
175 }
176
177 // TODO: More cleanup
178}
179
180// #############################################
181// Adapter Utility methods ##### begin #########
182
183func (a *adapter) getAdapterInstance() string {
184 return a.instanceID
185}
186
187func newKVClient(storeType, address string, timeout int) (kvstore.Client, error) {
188
189 log.Infow("kv-store-type", log.Fields{"store": storeType})
190 switch storeType {
191 case "consul":
192 return kvstore.NewConsulClient(address, timeout)
193 case "etcd":
194 return kvstore.NewEtcdClient(address, timeout)
195 }
196 return nil, errors.New("unsupported-kv-store")
197}
198
199func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
200
201 log.Infow("common-client-type", log.Fields{"client": clientType})
202 switch clientType {
203 case "sarama":
204 return kafka.NewSaramaClient(
205 kafka.Host(host),
206 kafka.Port(port),
207 kafka.ProducerReturnOnErrors(true),
208 kafka.ProducerReturnOnSuccess(true),
209 kafka.ProducerMaxRetries(6),
210 kafka.ProducerRetryBackoff(time.Millisecond*30),
211 kafka.MetadatMaxRetries(15)), nil
212 }
213
214 return nil, errors.New("unsupported-client-type")
215}
216
217func (a *adapter) setKVClient() error {
218 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
219 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
220 if err != nil {
221 a.kvClient = nil
222 log.Error(err)
223 return err
224 }
225 a.kvClient = client
226 return nil
227}
228
229func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
230 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
231 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
232 var err error
233 kip := kafka.NewInterContainerProxy(
234 kafka.InterContainerHost(a.config.KafkaAdapterHost),
235 kafka.InterContainerPort(a.config.KafkaAdapterPort),
236 kafka.MsgClient(a.kafkaClient),
237 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
238 var count uint8 = 0
239 for {
240 if err = kip.Start(); err != nil {
241 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
242 if retries == int(count) {
243 return nil, err
244 }
245 count++
246 // Take a nap before retrying
247 time.Sleep(2 * time.Second)
248 } else {
249 break
250 }
251 }
252 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
253 log.Info("common-messaging-proxy-created")
254 return kip, nil
255}
256
257/*
258func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
259 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
260 cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
261 log.Info("starting-open-olt")
262 var err error
263 sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg)
264
265 if err = sOLT.Start(ctx); err != nil {
266 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
267 return nil, err
268 }
269
270 log.Info("open-olt-started")
271 return sOLT, nil
272}
273func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
274 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
275 cfg *config.AdapterFlags) (adapters.IAdapter, error) {
276*/
277
278func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
279 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
280 cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
281 var err error
282 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, cfg)
283
284 if err = sAcONU.Start(ctx); err != nil {
285 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
286 return nil, err
287 }
288
289 log.Info("open-ont-adaptercore-started")
290 return sAcONU, nil
291}
292
293func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
294 log.Info("setting-request-handler")
295 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
296 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
297 log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
298 return err
299
300 }
301 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
302 log.Info("request-handler-setup-done")
303 return nil
304}
305
306func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
307 log.Info("registering-with-core")
308 /*
309 adapterDescription := &voltha.Adapter{Id: "openolt", // Unique name for the device type
310 Vendor: "VOLTHA OpenOLT",
311 Version: version.VersionInfo.Version}
312 types := []*voltha.DeviceType{{Id: "openolt",
313 Adapter: "openolt", // Name of the adapter that handles device type
314 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
315 AcceptsAddRemoveFlowUpdates: true}}
316 */
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700317 adapterDescription := &voltha.Adapter{
318 Id: "brcm_openomci_onu", // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000319 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700320 Version: version.VersionInfo.Version,
321 Endpoint: "brcm_openomci_onu",
322 Type: "brcm_openomci_onu",
323 CurrentReplica: 1,
324 TotalReplicas: 1,
325 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000326 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
327 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM"},
328 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
329 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
330 AcceptsAddRemoveFlowUpdates: true}}
331 deviceTypes := &voltha.DeviceTypes{Items: types}
332 count := 0
333 for {
334 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
335 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
336 if retries == count {
337 return err
338 }
339 count++
340 // Take a nap before retrying
341 time.Sleep(2 * time.Second)
342 } else {
343 break
344 }
345 }
346 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
347 log.Info("registered-with-core")
348 return nil
349}
350
351/**
352This function checks the liveliness and readiness of the kakfa and kv-client services
353and update the status in the probe.
354*/
355func (a *adapter) checkServicesReadiness(ctx context.Context) {
356 // checks the kafka readiness
357 go a.checkKafkaReadiness(ctx)
358
359 // checks the kv-store readiness
360 go a.checkKvStoreReadiness(ctx)
361}
362
363/**
364This function checks the liveliness and readiness of the kv-store service
365and update the status in the probe.
366*/
367func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
368 // dividing the live probe interval by 2 to get updated status every 30s
369 timeout := a.config.LiveProbeInterval / 2
370 kvStoreChannel := make(chan bool, 1)
371
372 // Default false to check the liveliness.
373 kvStoreChannel <- false
374 for {
375 timeoutTimer := time.NewTimer(timeout)
376 select {
377 case liveliness := <-kvStoreChannel:
378 if !liveliness {
379 // kv-store not reachable or down, updating the status to not ready state
380 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
381 timeout = a.config.NotLiveProbeInterval
382 } else {
383 // kv-store is reachable , updating the status to running state
384 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
385 timeout = a.config.LiveProbeInterval / 2
386 }
387 // Check if the timer has expired or not
388 if !timeoutTimer.Stop() {
389 <-timeoutTimer.C
390 }
391 case <-timeoutTimer.C:
392 // Check the status of the kv-store
393 log.Info("kv-store liveliness-recheck")
394 if a.kvClient.IsConnectionUp(ctx) {
395 kvStoreChannel <- true
396 } else {
397 kvStoreChannel <- false
398 }
399 }
400 }
401}
402
403/**
404This function checks the liveliness and readiness of the kafka service
405and update the status in the probe.
406*/
407func (a *adapter) checkKafkaReadiness(ctx context.Context) {
408 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
409 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
410 timeout := a.config.LiveProbeInterval
411 for {
412 timeoutTimer := time.NewTimer(timeout)
413
414 select {
415 case healthiness := <-healthinessChannel:
416 if !healthiness {
417 // log.Fatal will call os.Exit(1) to terminate
418 log.Fatal("Kafka service has become unhealthy")
419 }
420 case liveliness := <-livelinessChannel:
421 if !liveliness {
422 // kafka not reachable or down, updating the status to not ready state
423 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
424 timeout = a.config.NotLiveProbeInterval
425 } else {
426 // kafka is reachable , updating the status to running state
427 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
428 timeout = a.config.LiveProbeInterval
429 }
430 // Check if the timer has expired or not
431 if !timeoutTimer.Stop() {
432 <-timeoutTimer.C
433 }
434 case <-timeoutTimer.C:
435 log.Info("kafka-proxy-liveness-recheck")
436 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
437 // the liveness probe may wait (and block) writing to our channel.
438 err := a.kafkaClient.SendLiveness()
439 if err != nil {
440 // Catch possible error case if sending liveness after Sarama has been stopped.
441 log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
442 }
443 }
444 }
445}
446
447// Adapter Utility methods ##### end #########
448// #############################################
449
450func printVersion(appName string) {
451 fmt.Println(appName)
452 fmt.Println(version.VersionInfo.String(" "))
453}
454
455func printBanner() {
456 fmt.Println(" ____ ____ ___ ___ _ ")
457 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
458 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
459 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
460 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
461 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
462 fmt.Println(" | | __| |")
463 fmt.Println(" |_| |____/")
464 fmt.Println(" ")
465}
466
467func waitForExit(ctx context.Context) int {
468 signalChannel := make(chan os.Signal, 1)
469 signal.Notify(signalChannel,
470 syscall.SIGHUP,
471 syscall.SIGINT,
472 syscall.SIGTERM,
473 syscall.SIGQUIT)
474
475 exitChannel := make(chan int)
476
477 go func() {
478 select {
479 case <-ctx.Done():
480 log.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
481 exitChannel <- 2
482 case s := <-signalChannel:
483 switch s {
484 case syscall.SIGHUP,
485 syscall.SIGINT,
486 syscall.SIGTERM,
487 syscall.SIGQUIT:
488 log.Infow("closing-signal-received", log.Fields{"signal": s})
489 exitChannel <- 0
490 default:
491 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
492 exitChannel <- 1
493 }
494 }
495 }()
496
497 code := <-exitChannel
498 return code
499}
500
501func main() {
502 start := time.Now()
503
504 cf := config.NewAdapterFlags()
505 defaultAppName := cf.InstanceID + "_" + version.GetCodeVersion()
506 cf.ParseCommandArguments()
507
508 // Setup logging
509
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700510 loglevel, err := log.StringToLogLevel(cf.LogLevel)
511 if err != nil {
512 log.Fatalf("Cannot setup logging, %s", err)
513 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000514
515 // Setup default logger - applies for packages that do not have specific logger set
516 if _, err := log.SetDefaultLogger(log.JSON, loglevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
517 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
518 }
519
520 // Update all loggers (provisionned via init) with a common field
521 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
522 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
523 }
524
525 log.SetAllLogLevel(loglevel)
526
527 defer log.CleanUp()
528
529 // Print version / build information and exit
530 if cf.DisplayVersionOnly {
531 printVersion(defaultAppName)
532 return
533 } else {
534 log.Infow("config", log.Fields{"StartName": defaultAppName})
535 log.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
536 log.Infow("config", log.Fields{"Arguments": os.Args[1:]})
537 }
538
539 // Print banner if specified
540 if cf.Banner {
541 printBanner()
542 }
543
544 log.Infow("config", log.Fields{"config": *cf})
545
546 ctx, cancel := context.WithCancel(context.Background())
547 //defer cancel()
548
549 ad := newAdapter(cf)
550
551 p := &probe.Probe{}
552 log.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.getAdapterInstance(), "ProbeCoreState": p.GetStatus("register-with-core")})
553
554 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
555 log.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
556
557 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
558
559 go func() {
560 err := ad.start(probeCtx)
561 // If this operation returns an error
562 // cancel all operations using this context
563 if err != nil {
564 cancel()
565 }
566 }()
567
568 code := waitForExit(ctx)
569 log.Infow("received-a-closing-signal", log.Fields{"code": code})
570
571 // Cleanup before leaving
572 ad.stop(ctx)
573
574 elapsed := time.Since(start)
575 log.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
576 //log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
577}