blob: 6f2ea33336460e9bce20f8511c0562a725680af6 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package main -> this is the entry point of the OpenAdapter
18package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "os"
25 "os/signal"
26 "strconv"
27 "syscall"
28 "time"
29
30 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
32 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
33 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
35 "github.com/opencord/voltha-lib-go/v3/pkg/log"
36 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
37 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
38 "github.com/opencord/voltha-protos/v3/go/voltha"
39
40 ac "test.internal/openadapter/adaptercoreont"
41 "test.internal/openadapter/config"
42 "test.internal/openadapter/config/version"
43)
44
45type adapter struct {
46 defaultAppName string
47 instanceID string
48 config *config.AdapterFlags
49 iAdapter adapters.IAdapter // from Voltha interface adapters
50 kafkaClient kafka.Client
51 kvClient kvstore.Client
52 kip kafka.InterContainerProxy
53 coreProxy adapterif.CoreProxy
54 adapterProxy adapterif.AdapterProxy
55 eventProxy adapterif.EventProxy
56 halted bool
57 exitChannel chan int
58 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
59}
60
61//package single start function (run at first package instantiation)
62func init() {
63 _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
64}
65
66func newAdapter(cf *config.AdapterFlags) *adapter {
67 var a adapter
68 a.instanceID = cf.InstanceID
69 a.config = cf
70 a.halted = false
71 a.exitChannel = make(chan int, 1)
72 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
73 return &a
74}
75
76func (a *adapter) start(ctx context.Context) error {
77 log.Info("Starting Core Adapter components")
78 var err error
79
80 var p *probe.Probe
81 if value := ctx.Value(probe.ProbeContextKey); value != nil {
82 if _, ok := value.(*probe.Probe); ok {
83 p = value.(*probe.Probe)
84 p.RegisterService(
85 "message-bus",
86 "kv-store",
87 "container-proxy",
88 "core-request-handler",
89 "register-with-core",
90 )
91 }
92 }
93
94 // Setup KV Client
95 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
96 if err = a.setKVClient(); err != nil {
97 log.Fatal("error-setting-kv-client")
98 }
99
100 if p != nil {
101 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
102 }
103
104 // Setup Kafka Client
105 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
106 log.Fatal("Unsupported-common-client")
107 }
108
109 if p != nil {
110 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
111 }
112
113 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
114 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
115 log.Fatal("error-starting-inter-container-proxy")
116 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
117 return err
118 }
119
120 // Create the core proxy to handle requests to the Core
121 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
122
123 // Create the adaptor proxy to handle request between olt and onu
124 //a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic)
125 a.adapterProxy = com.NewAdapterProxy(a.kip, "openolt", a.config.CoreTopic)
126
127 // Create the event proxy to post events to KAFKA
128 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
129
130 // Create the open ONU interface adapter
131 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
132 a.config); err != nil {
133 log.Fatal("error-starting-VolthaInterfaceAdapter for OpenOnt")
134 }
135
136 // Register the core request handler
137 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
138 log.Fatal("error-setting-core-request-handler")
139 }
140
141 // Register this adapter to the Core - retries indefinitely
142 if err = a.registerWithCore(ctx, -1); err != nil {
143 log.Fatal("error-registering-with-core")
144 }
145
146 // check the readiness and liveliness and update the probe status
147 a.checkServicesReadiness(ctx)
148 return err
149}
150
151func (a *adapter) stop(ctx context.Context) {
152 // Stop leadership tracking
153 a.halted = true
154
155 // send exit signal
156 a.exitChannel <- 0
157
158 // Cleanup - applies only if we had a kvClient
159 if a.kvClient != nil {
160 // Release all reservations
161 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
162 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
163 }
164 // Close the DB connection
165 a.kvClient.Close()
166 }
167
168 // TODO: More cleanup
169}
170
171// #############################################
172// Adapter Utility methods ##### begin #########
173
174func (a *adapter) getAdapterInstance() string {
175 return a.instanceID
176}
177
178func newKVClient(storeType, address string, timeout int) (kvstore.Client, error) {
179
180 log.Infow("kv-store-type", log.Fields{"store": storeType})
181 switch storeType {
182 case "consul":
183 return kvstore.NewConsulClient(address, timeout)
184 case "etcd":
185 return kvstore.NewEtcdClient(address, timeout)
186 }
187 return nil, errors.New("unsupported-kv-store")
188}
189
190func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
191
192 log.Infow("common-client-type", log.Fields{"client": clientType})
193 switch clientType {
194 case "sarama":
195 return kafka.NewSaramaClient(
196 kafka.Host(host),
197 kafka.Port(port),
198 kafka.ProducerReturnOnErrors(true),
199 kafka.ProducerReturnOnSuccess(true),
200 kafka.ProducerMaxRetries(6),
201 kafka.ProducerRetryBackoff(time.Millisecond*30),
202 kafka.MetadatMaxRetries(15)), nil
203 }
204
205 return nil, errors.New("unsupported-client-type")
206}
207
208func (a *adapter) setKVClient() error {
209 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
210 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
211 if err != nil {
212 a.kvClient = nil
213 log.Error(err)
214 return err
215 }
216 a.kvClient = client
217 return nil
218}
219
220func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
221 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
222 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
223 var err error
224 kip := kafka.NewInterContainerProxy(
225 kafka.InterContainerHost(a.config.KafkaAdapterHost),
226 kafka.InterContainerPort(a.config.KafkaAdapterPort),
227 kafka.MsgClient(a.kafkaClient),
228 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
229 var count uint8 = 0
230 for {
231 if err = kip.Start(); err != nil {
232 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
233 if retries == int(count) {
234 return nil, err
235 }
236 count++
237 // Take a nap before retrying
238 time.Sleep(2 * time.Second)
239 } else {
240 break
241 }
242 }
243 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
244 log.Info("common-messaging-proxy-created")
245 return kip, nil
246}
247
248/*
249func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
250 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
251 cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
252 log.Info("starting-open-olt")
253 var err error
254 sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg)
255
256 if err = sOLT.Start(ctx); err != nil {
257 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
258 return nil, err
259 }
260
261 log.Info("open-olt-started")
262 return sOLT, nil
263}
264func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
265 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
266 cfg *config.AdapterFlags) (adapters.IAdapter, error) {
267*/
268
269func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
270 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
271 cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
272 var err error
273 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, cfg)
274
275 if err = sAcONU.Start(ctx); err != nil {
276 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
277 return nil, err
278 }
279
280 log.Info("open-ont-adaptercore-started")
281 return sAcONU, nil
282}
283
284func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
285 log.Info("setting-request-handler")
286 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
287 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
288 log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
289 return err
290
291 }
292 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
293 log.Info("request-handler-setup-done")
294 return nil
295}
296
297func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
298 log.Info("registering-with-core")
299 /*
300 adapterDescription := &voltha.Adapter{Id: "openolt", // Unique name for the device type
301 Vendor: "VOLTHA OpenOLT",
302 Version: version.VersionInfo.Version}
303 types := []*voltha.DeviceType{{Id: "openolt",
304 Adapter: "openolt", // Name of the adapter that handles device type
305 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
306 AcceptsAddRemoveFlowUpdates: true}}
307 */
308 adapterDescription := &voltha.Adapter{Id: "brcm_openomci_onu", // Unique name for the device type ->exact type required for OLT comm????
309 Vendor: "VOLTHA OpenONUGo",
310 Version: version.VersionInfo.Version}
311 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
312 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM"},
313 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
314 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
315 AcceptsAddRemoveFlowUpdates: true}}
316 deviceTypes := &voltha.DeviceTypes{Items: types}
317 count := 0
318 for {
319 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
320 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
321 if retries == count {
322 return err
323 }
324 count++
325 // Take a nap before retrying
326 time.Sleep(2 * time.Second)
327 } else {
328 break
329 }
330 }
331 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
332 log.Info("registered-with-core")
333 return nil
334}
335
336/**
337This function checks the liveliness and readiness of the kakfa and kv-client services
338and update the status in the probe.
339*/
340func (a *adapter) checkServicesReadiness(ctx context.Context) {
341 // checks the kafka readiness
342 go a.checkKafkaReadiness(ctx)
343
344 // checks the kv-store readiness
345 go a.checkKvStoreReadiness(ctx)
346}
347
348/**
349This function checks the liveliness and readiness of the kv-store service
350and update the status in the probe.
351*/
352func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
353 // dividing the live probe interval by 2 to get updated status every 30s
354 timeout := a.config.LiveProbeInterval / 2
355 kvStoreChannel := make(chan bool, 1)
356
357 // Default false to check the liveliness.
358 kvStoreChannel <- false
359 for {
360 timeoutTimer := time.NewTimer(timeout)
361 select {
362 case liveliness := <-kvStoreChannel:
363 if !liveliness {
364 // kv-store not reachable or down, updating the status to not ready state
365 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
366 timeout = a.config.NotLiveProbeInterval
367 } else {
368 // kv-store is reachable , updating the status to running state
369 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
370 timeout = a.config.LiveProbeInterval / 2
371 }
372 // Check if the timer has expired or not
373 if !timeoutTimer.Stop() {
374 <-timeoutTimer.C
375 }
376 case <-timeoutTimer.C:
377 // Check the status of the kv-store
378 log.Info("kv-store liveliness-recheck")
379 if a.kvClient.IsConnectionUp(ctx) {
380 kvStoreChannel <- true
381 } else {
382 kvStoreChannel <- false
383 }
384 }
385 }
386}
387
388/**
389This function checks the liveliness and readiness of the kafka service
390and update the status in the probe.
391*/
392func (a *adapter) checkKafkaReadiness(ctx context.Context) {
393 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
394 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
395 timeout := a.config.LiveProbeInterval
396 for {
397 timeoutTimer := time.NewTimer(timeout)
398
399 select {
400 case healthiness := <-healthinessChannel:
401 if !healthiness {
402 // log.Fatal will call os.Exit(1) to terminate
403 log.Fatal("Kafka service has become unhealthy")
404 }
405 case liveliness := <-livelinessChannel:
406 if !liveliness {
407 // kafka not reachable or down, updating the status to not ready state
408 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
409 timeout = a.config.NotLiveProbeInterval
410 } else {
411 // kafka is reachable , updating the status to running state
412 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
413 timeout = a.config.LiveProbeInterval
414 }
415 // Check if the timer has expired or not
416 if !timeoutTimer.Stop() {
417 <-timeoutTimer.C
418 }
419 case <-timeoutTimer.C:
420 log.Info("kafka-proxy-liveness-recheck")
421 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
422 // the liveness probe may wait (and block) writing to our channel.
423 err := a.kafkaClient.SendLiveness()
424 if err != nil {
425 // Catch possible error case if sending liveness after Sarama has been stopped.
426 log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
427 }
428 }
429 }
430}
431
432// Adapter Utility methods ##### end #########
433// #############################################
434
435func printVersion(appName string) {
436 fmt.Println(appName)
437 fmt.Println(version.VersionInfo.String(" "))
438}
439
440func printBanner() {
441 fmt.Println(" ____ ____ ___ ___ _ ")
442 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
443 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
444 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
445 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
446 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
447 fmt.Println(" | | __| |")
448 fmt.Println(" |_| |____/")
449 fmt.Println(" ")
450}
451
452func waitForExit(ctx context.Context) int {
453 signalChannel := make(chan os.Signal, 1)
454 signal.Notify(signalChannel,
455 syscall.SIGHUP,
456 syscall.SIGINT,
457 syscall.SIGTERM,
458 syscall.SIGQUIT)
459
460 exitChannel := make(chan int)
461
462 go func() {
463 select {
464 case <-ctx.Done():
465 log.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
466 exitChannel <- 2
467 case s := <-signalChannel:
468 switch s {
469 case syscall.SIGHUP,
470 syscall.SIGINT,
471 syscall.SIGTERM,
472 syscall.SIGQUIT:
473 log.Infow("closing-signal-received", log.Fields{"signal": s})
474 exitChannel <- 0
475 default:
476 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
477 exitChannel <- 1
478 }
479 }
480 }()
481
482 code := <-exitChannel
483 return code
484}
485
486func main() {
487 start := time.Now()
488
489 cf := config.NewAdapterFlags()
490 defaultAppName := cf.InstanceID + "_" + version.GetCodeVersion()
491 cf.ParseCommandArguments()
492
493 // Setup logging
494
495 loglevel := log.StringToInt(cf.LogLevel)
496
497 // Setup default logger - applies for packages that do not have specific logger set
498 if _, err := log.SetDefaultLogger(log.JSON, loglevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
499 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
500 }
501
502 // Update all loggers (provisionned via init) with a common field
503 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
504 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
505 }
506
507 log.SetAllLogLevel(loglevel)
508
509 defer log.CleanUp()
510
511 // Print version / build information and exit
512 if cf.DisplayVersionOnly {
513 printVersion(defaultAppName)
514 return
515 } else {
516 log.Infow("config", log.Fields{"StartName": defaultAppName})
517 log.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
518 log.Infow("config", log.Fields{"Arguments": os.Args[1:]})
519 }
520
521 // Print banner if specified
522 if cf.Banner {
523 printBanner()
524 }
525
526 log.Infow("config", log.Fields{"config": *cf})
527
528 ctx, cancel := context.WithCancel(context.Background())
529 //defer cancel()
530
531 ad := newAdapter(cf)
532
533 p := &probe.Probe{}
534 log.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.getAdapterInstance(), "ProbeCoreState": p.GetStatus("register-with-core")})
535
536 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
537 log.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
538
539 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
540
541 go func() {
542 err := ad.start(probeCtx)
543 // If this operation returns an error
544 // cancel all operations using this context
545 if err != nil {
546 cancel()
547 }
548 }()
549
550 code := waitForExit(ctx)
551 log.Infow("received-a-closing-signal", log.Fields{"code": code})
552
553 // Cleanup before leaving
554 ad.stop(ctx)
555
556 elapsed := time.Since(start)
557 log.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
558 //log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
559}