blob: d3fce9962ade4569e2f31452d0fef9ca7df28128 [file] [log] [blame]
Elia Battistonc8d0d462022-02-22 16:30:51 +01001/*
2* Copyright 2022-present Open Networking Foundation
3
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7
8* http://www.apache.org/licenses/LICENSE-2.0
9
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15 */
16
17package main
18
19import (
20 "context"
21 "fmt"
22 "os"
23 "os/signal"
24 "syscall"
25 "time"
26
27 "github.com/opencord/voltha-lib-go/v7/pkg/log"
28 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
29 "github.com/opencord/voltha-lib-go/v7/pkg/version"
Elia Battistonac8d23f2022-03-14 17:54:56 +010030 "github.com/opencord/voltha-northbound-bbf-adapter/internal/clients"
Elia Battistonc8d0d462022-02-22 16:30:51 +010031 "github.com/opencord/voltha-northbound-bbf-adapter/internal/config"
Elia Battistone1cecb22022-03-21 10:05:25 +010032 "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
Elia Battistonac8d23f2022-03-14 17:54:56 +010033 "github.com/opencord/voltha-northbound-bbf-adapter/internal/sysrepo"
Elia Battistonc8d0d462022-02-22 16:30:51 +010034)
35
Elia Battistonbe9edc12022-03-09 11:35:58 +010036//String for readiness probe services
Elia Battistonc8d0d462022-02-22 16:30:51 +010037const (
38 bbfAdapterService = "bbf-adapter-service"
Elia Battistonac8d23f2022-03-14 17:54:56 +010039 sysrepoService = "sysrepo"
Elia Battistonc8d0d462022-02-22 16:30:51 +010040)
41
Elia Battistonbe9edc12022-03-09 11:35:58 +010042type bbfAdapter struct {
43 conf *config.BBFAdapterConfig
44 volthaNbiClient *clients.VolthaNbiClient
45 oltAppClient *clients.OltAppClient
Elia Battistonac8d23f2022-03-14 17:54:56 +010046 sysrepoPlugin *sysrepo.SysrepoPlugin
Elia Battiston4750d3c2022-07-14 13:24:56 +000047 kafkaConsumer *clients.KafkaConsumer
Elia Battistonbe9edc12022-03-09 11:35:58 +010048}
49
50func newBbfAdapter(conf *config.BBFAdapterConfig) *bbfAdapter {
51 return &bbfAdapter{
52 conf: conf,
53 }
54}
55
Elia Battistone1cecb22022-03-21 10:05:25 +010056func (a *bbfAdapter) start(ctx context.Context) {
Elia Battistonbe9edc12022-03-09 11:35:58 +010057 var err error
58
59 //Connect to the voltha northbound api
60 a.volthaNbiClient = clients.NewVolthaNbiClient(a.conf.VolthaNbiEndpoint)
61 if err = a.volthaNbiClient.Connect(ctx, a.conf.TlsEnabled, a.conf.TlsVerify); err != nil {
62 logger.Fatalw(ctx, "failed-to-open-voltha-nbi-grpc-connection", log.Fields{"err": err})
63 } else {
64 probe.UpdateStatusFromContext(ctx, a.conf.VolthaNbiEndpoint, probe.ServiceStatusRunning)
65 }
66
67 //Check if the REST APIs of the olt app are reachable
68 a.oltAppClient = clients.NewOltAppClient(a.conf.OnosRestEndpoint, a.conf.OnosUser, a.conf.OnosPassword)
69 if err := a.oltAppClient.CheckConnection(ctx); err != nil {
70 logger.Fatalw(ctx, "failed-to-connect-to-onos-olt-app-api", log.Fields{"err": err})
71 } else {
72 probe.UpdateStatusFromContext(ctx, a.conf.OnosRestEndpoint, probe.ServiceStatusRunning)
73 }
74
Elia Battistone1cecb22022-03-21 10:05:25 +010075 //Create the global adapter that will be used by callbacks
76 core.AdapterInstance = core.NewVolthaYangAdapter(a.volthaNbiClient, a.oltAppClient)
77
Elia Battistonac8d23f2022-03-14 17:54:56 +010078 //Load sysrepo plugin
Elia Battiston589addb2022-04-04 16:40:01 +020079 a.sysrepoPlugin, err = sysrepo.StartNewPlugin(ctx, a.conf.SchemaMountFilePath)
Elia Battistonac8d23f2022-03-14 17:54:56 +010080 if err != nil {
81 logger.Fatalw(ctx, "failed-to-start-sysrepo-plugin", log.Fields{"err": err})
82 } else {
83 probe.UpdateStatusFromContext(ctx, sysrepoService, probe.ServiceStatusRunning)
84 }
85
Elia Battiston4750d3c2022-07-14 13:24:56 +000086 //Set up the Kafka consumer
87 a.kafkaConsumer = clients.NewKafkaConsumer(a.conf.KafkaClusterAddress)
88 if err := a.kafkaConsumer.Start(ctx, a.sysrepoPlugin.ManageVolthaEvent); err != nil {
89 logger.Fatalw(ctx, "failed-to-start-kafka-consumer", log.Fields{"err": err})
90 } else {
91 probe.UpdateStatusFromContext(ctx, a.conf.KafkaClusterAddress, probe.ServiceStatusRunning)
92 }
93
Elia Battistonbe9edc12022-03-09 11:35:58 +010094 //Set the service as running, making the adapter finally ready
95 probe.UpdateStatusFromContext(ctx, bbfAdapterService, probe.ServiceStatusRunning)
96 logger.Info(ctx, "bbf-adapter-ready")
Elia Battistonbe9edc12022-03-09 11:35:58 +010097}
98
99//Close all connections of the adapter
Elia Battistonac8d23f2022-03-14 17:54:56 +0100100func (a *bbfAdapter) cleanup(ctx context.Context) {
Elia Battistone1cecb22022-03-21 10:05:25 +0100101 core.AdapterInstance = nil
102
Elia Battiston4750d3c2022-07-14 13:24:56 +0000103 if err := a.kafkaConsumer.Stop(); err != nil {
104 logger.Errorw(ctx, "failed-to-stop-kafka-consumer", log.Fields{"err": err})
105 }
106
Elia Battistonac8d23f2022-03-14 17:54:56 +0100107 a.volthaNbiClient.Close(ctx)
Elia Battistone1cecb22022-03-21 10:05:25 +0100108
Elia Battistonac8d23f2022-03-14 17:54:56 +0100109 err := a.sysrepoPlugin.Stop(ctx)
110 if err != nil {
111 logger.Errorw(ctx, "failed-to-stop-sysrepo-plugin", log.Fields{"err": err})
112 }
Elia Battistone1cecb22022-03-21 10:05:25 +0100113
114 probe.UpdateStatusFromContext(ctx, bbfAdapterService, probe.ServiceStatusStopped)
Elia Battistonbe9edc12022-03-09 11:35:58 +0100115}
116
Elia Battistonc8d0d462022-02-22 16:30:51 +0100117func printBanner() {
118 fmt.Println(" ____ ____ ______ _ _ ")
119 fmt.Println(" | _ \\| _ \\| ____| /\\ | | | | ")
120 fmt.Println(" | |_) | |_) | |__ / \\ __| | __ _ _ __ | |_ ___ _ __ ")
121 fmt.Println(" | _ <| _ <| __| / /\\ \\ / _` |/ _` | '_ \\| __/ _ \\ '__|")
122 fmt.Println(" | |_) | |_) | | / ____ \\ (_| | (_| | |_) | || __/ | ")
123 fmt.Println(" |____/|____/|_| /_/ \\_\\__,_|\\__,_| .__/ \\__\\___|_| ")
124 fmt.Println(" | | ")
125 fmt.Println(" |_| ")
126}
127
128func printVersion() {
129 fmt.Println("VOLTHA Northbound BBF Adapter")
130 fmt.Println(version.VersionInfo.String(" "))
131}
132
133func waitForExit(ctx context.Context) int {
134 signalChannel := make(chan os.Signal, 1)
135 signal.Notify(signalChannel,
136 syscall.SIGHUP,
137 syscall.SIGINT,
138 syscall.SIGTERM,
139 syscall.SIGQUIT)
140
141 exitChannel := make(chan int)
142
143 go func() {
144 s := <-signalChannel
145 switch s {
146 case syscall.SIGHUP,
147 syscall.SIGINT,
148 syscall.SIGTERM,
149 syscall.SIGQUIT:
150 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
151 exitChannel <- 0
152 default:
153 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
154 exitChannel <- 1
155 }
156 }()
157
158 code := <-exitChannel
159 return code
160}
161
162func main() {
Elia Battistonbe9edc12022-03-09 11:35:58 +0100163 ctx, cancelCtx := context.WithCancel(context.Background())
164
Elia Battistonc8d0d462022-02-22 16:30:51 +0100165 start := time.Now()
166
167 conf := config.LoadConfig(ctx)
168
169 //Logging
170 logLevel, err := log.StringToLogLevel(conf.LogLevel)
171 if err != nil {
172 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
173 }
174
175 // Setup default logger - applies for packages that do not have specific logger set
176 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{}); err != nil {
177 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
178 }
179
180 // Update all loggers (provisionned via init) with a common field
181 if err := log.UpdateAllLoggers(log.Fields{}); err != nil {
182 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
183 }
184
185 log.SetAllLogLevel(logLevel)
186
187 defer func() {
188 err := log.CleanUp()
189 if err != nil {
190 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
191 }
192 }()
193
194 // Print version and exit
195 if conf.PrintVersion {
196 printVersion()
197 return
198 }
199
200 // Print banner if specified
201 if conf.PrintBanner {
202 printBanner()
203 }
204
205 logger.Infow(ctx, "config", log.Fields{"config": *conf})
206
207 p := &probe.Probe{}
208 go p.ListenAndServe(ctx, conf.ProbeAddress)
209
Elia Battistonbe9edc12022-03-09 11:35:58 +0100210 //Register all services that will need to be initialized before considering the adapter ready
Elia Battistonc8d0d462022-02-22 16:30:51 +0100211 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
Elia Battistonbe9edc12022-03-09 11:35:58 +0100212 p.RegisterService(
213 ctx,
214 bbfAdapterService,
215 conf.VolthaNbiEndpoint,
216 conf.OnosRestEndpoint,
Elia Battiston4750d3c2022-07-14 13:24:56 +0000217 conf.KafkaClusterAddress,
Elia Battistonac8d23f2022-03-14 17:54:56 +0100218 sysrepoService,
Elia Battistonbe9edc12022-03-09 11:35:58 +0100219 )
220
Elia Battistonc8d0d462022-02-22 16:30:51 +0100221 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(conf.TraceEnabled, conf.TraceAgentAddress, conf.LogCorrelationEnabled)
222 if err != nil {
223 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
224 } else {
225 defer log.TerminateTracing(closer)
226 }
227
Elia Battistonbe9edc12022-03-09 11:35:58 +0100228 adapter := newBbfAdapter(conf)
Elia Battistonc8d0d462022-02-22 16:30:51 +0100229
Elia Battistonbe9edc12022-03-09 11:35:58 +0100230 //Run the adapter
Elia Battistone1cecb22022-03-21 10:05:25 +0100231 adapter.start(probeCtx)
Elia Battistonac8d23f2022-03-14 17:54:56 +0100232 defer adapter.cleanup(probeCtx)
Elia Battistonc8d0d462022-02-22 16:30:51 +0100233
Elia Battistonbe9edc12022-03-09 11:35:58 +0100234 //Wait a signal to stop execution
Elia Battistonc8d0d462022-02-22 16:30:51 +0100235 code := waitForExit(ctx)
236 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
237
Elia Battistonbe9edc12022-03-09 11:35:58 +0100238 //Stop everything that waits for the context to be done
239 cancelCtx()
Elia Battistonbe9edc12022-03-09 11:35:58 +0100240
Elia Battistonc8d0d462022-02-22 16:30:51 +0100241 elapsed := time.Since(start)
242 logger.Infow(ctx, "run-time", log.Fields{"time": elapsed.Seconds()})
243}