blob: 7ba8ef3a509ba124750f774db7e9364c23d28d21 [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package main
17
18import (
19 "context"
20 "errors"
21 "fmt"
Matteo Scandolo18f5eb12020-04-17 10:34:25 -070022 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
23 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
24 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
25 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
28 "github.com/opencord/voltha-lib-go/v3/pkg/version"
29 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
30 "github.com/opencord/voltha-protos/v3/go/voltha"
Scott Baker70c16092019-09-30 17:08:54 -070031 ac "github.com/opencord/voltha-simonu-adapter/internal/pkg/adaptercore"
32 "github.com/opencord/voltha-simonu-adapter/internal/pkg/config"
Scott Bakereee8dd82019-09-24 12:52:34 -070033 "os"
34 "os/signal"
35 "strconv"
36 "syscall"
37 "time"
38)
39
40type adapter struct {
41 instanceId string
42 config *config.AdapterFlags
43 iAdapter adapters.IAdapter
44 kafkaClient kafka.Client
45 kvClient kvstore.Client
Matteo Scandolo18f5eb12020-04-17 10:34:25 -070046 kip kafka.InterContainerProxy
Scott Bakereee8dd82019-09-24 12:52:34 -070047 coreProxy *com.CoreProxy
48 halted bool
49 exitChannel chan int
50 receiverChannels []<-chan *ic.InterContainerMessage
51}
52
53func init() {
54 log.AddPackage(log.JSON, log.DebugLevel, nil)
55}
56
57func newAdapter(cf *config.AdapterFlags) *adapter {
58 var a adapter
59 a.instanceId = cf.InstanceID
60 a.config = cf
61 a.halted = false
62 a.exitChannel = make(chan int, 1)
63 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
64 return &a
65}
66
67func (a *adapter) start(ctx context.Context) {
68 log.Info("Starting Core Adapter components")
69 var err error
70
Vignesh Ethiraj531322b2019-10-14 14:07:19 +000071 // If the context has a probe then fetch it and register our services
72 var p *probe.Probe
73 if value := ctx.Value(probe.ProbeContextKey); value != nil {
74 if _, ok := value.(*probe.Probe); ok {
75 p = value.(*probe.Probe)
76 p.RegisterService(
77 "message-bus",
78 "kv-store",
79 "container-proxy",
80 "core-request-handler",
81 "register-with-core",
82 )
83 }
84 }
85
Scott Bakereee8dd82019-09-24 12:52:34 -070086 // Setup KV Client
87 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
88 if err := a.setKVClient(); err != nil {
89 log.Fatal("error-setting-kv-client")
90 }
91
Vignesh Ethiraj531322b2019-10-14 14:07:19 +000092 if p != nil {
93 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
94 }
95
Scott Bakereee8dd82019-09-24 12:52:34 -070096 // Setup Kafka Client
97 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
98 log.Fatal("Unsupported-common-client")
99 }
100
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000101 if p != nil {
102 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
103 }
104
Scott Bakereee8dd82019-09-24 12:52:34 -0700105 // Start the common InterContainer Proxy - retry indefinitely
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000106 if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
Scott Bakereee8dd82019-09-24 12:52:34 -0700107 log.Fatal("error-starting-inter-container-proxy")
108 }
109
110 // Create the core proxy to handle requests to the Core
111 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
112
113 // Create the simulated OLT adapter
114 if a.iAdapter, err = a.startSimulatedONU(ctx, a.kip, a.coreProxy); err != nil {
115 log.Fatal("error-starting-inter-container-proxy")
116 }
117
118 // Register the core request handler
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000119 if err = a.setupRequestHandler(ctx, a.instanceId, a.iAdapter, a.coreProxy); err != nil {
Scott Bakereee8dd82019-09-24 12:52:34 -0700120 log.Fatal("error-setting-core-request-handler")
121 }
122
123 // Register this adapter to the Core - retry indefinitely
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000124 if err = a.registerWithCore(ctx, -1); err != nil {
Scott Bakereee8dd82019-09-24 12:52:34 -0700125 log.Fatal("error-registering-with-core")
126 }
127}
128
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700129func (rw *adapter) stop(ctx context.Context) {
Scott Bakereee8dd82019-09-24 12:52:34 -0700130 // Stop leadership tracking
131 rw.halted = true
132
133 // send exit signal
134 rw.exitChannel <- 0
135
136 // Cleanup - applies only if we had a kvClient
137 if rw.kvClient != nil {
138 // Release all reservations
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700139 if err := rw.kvClient.ReleaseAllReservations(ctx); err != nil {
Scott Bakereee8dd82019-09-24 12:52:34 -0700140 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
141 }
142 // Close the DB connection
143 rw.kvClient.Close()
144 }
145
146 // TODO: More cleanup
147}
148
149func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
150
151 log.Infow("kv-store-type", log.Fields{"store": storeType})
152 switch storeType {
153 case "consul":
154 return kvstore.NewConsulClient(address, timeout)
155 case "etcd":
156 return kvstore.NewEtcdClient(address, timeout)
157 }
158 return nil, errors.New("unsupported-kv-store")
159}
160
161func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
162
163 log.Infow("common-client-type", log.Fields{"client": clientType})
164 switch clientType {
165 case "sarama":
166 return kafka.NewSaramaClient(
167 kafka.Host(host),
168 kafka.Port(port),
169 kafka.ProducerReturnOnErrors(true),
170 kafka.ProducerReturnOnSuccess(true),
171 kafka.ProducerMaxRetries(6),
172 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
173 }
174 return nil, errors.New("unsupported-client-type")
175}
176
177func (a *adapter) setKVClient() error {
178 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
179 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
180 if err != nil {
181 a.kvClient = nil
182 log.Error(err)
183 return err
184 }
185 a.kvClient = client
186 return nil
187}
188
189func toString(value interface{}) (string, error) {
190 switch t := value.(type) {
191 case []byte:
192 return string(value.([]byte)), nil
193 case string:
194 return value.(string), nil
195 default:
196 return "", fmt.Errorf("unexpected-type-%T", t)
197 }
198}
199
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700200func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Scott Bakereee8dd82019-09-24 12:52:34 -0700201 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
202 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
203 var err error
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700204 kip := kafka.NewInterContainerProxy(
Scott Bakereee8dd82019-09-24 12:52:34 -0700205 kafka.InterContainerHost(a.config.KafkaAdapterHost),
206 kafka.InterContainerPort(a.config.KafkaAdapterPort),
207 kafka.MsgClient(a.kafkaClient),
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700208 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Scott Bakereee8dd82019-09-24 12:52:34 -0700209 count := 0
210 for {
211 if err = kip.Start(); err != nil {
212 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
213 if retries == count {
214 return nil, err
215 }
216 count = +1
217 // Take a nap before retrying
218 time.Sleep(2 * time.Second)
219 } else {
220 break
221 }
222 }
223
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000224 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Scott Bakereee8dd82019-09-24 12:52:34 -0700225 log.Info("common-messaging-proxy-created")
226 return kip, nil
227}
228
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700229func (a *adapter) startSimulatedONU(ctx context.Context, kip kafka.InterContainerProxy, cp *com.CoreProxy) (*ac.SimulatedONU, error) {
Scott Bakereee8dd82019-09-24 12:52:34 -0700230 log.Info("starting-simulated-onu")
231 var err error
232 sOLT := ac.NewSimulatedONU(ctx, a.kip, cp)
233
234 if err = sOLT.Start(ctx); err != nil {
235 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
236 return nil, err
237 }
238
239 log.Info("simulated-olt-started")
240 return sOLT, nil
241}
242
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000243func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
Scott Bakereee8dd82019-09-24 12:52:34 -0700244 log.Info("setting-request-handler")
245 requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
246 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
247 log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
248 return err
249
250 }
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000251 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Scott Bakereee8dd82019-09-24 12:52:34 -0700252 log.Info("request-handler-setup-done")
253 return nil
254}
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000255func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Scott Bakereee8dd82019-09-24 12:52:34 -0700256 log.Info("registering-with-core")
Vignesh Ethiraj893bd8d2019-09-26 10:07:49 +0000257 adapterDescription := &voltha.Adapter{
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700258 Id: "simulated_onu_1",
Vignesh Ethiraj893bd8d2019-09-26 10:07:49 +0000259 Vendor: "Open Networking Foundation",
260 Version: version.VersionInfo.Version,
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700261 Type: "simulated_onu",
262 // TODO add parameters to deploy multiple replicas
263 CurrentReplica: 1,
264 TotalReplicas: 1,
265 Endpoint: "simulated_onu",
Vignesh Ethiraj893bd8d2019-09-26 10:07:49 +0000266 }
Scott Bakereee8dd82019-09-24 12:52:34 -0700267 types := []*voltha.DeviceType{{Id: "simulated_onu", Adapter: "simulated_onu"}}
268 deviceTypes := &voltha.DeviceTypes{Items: types}
269 count := 0
270 for {
271 if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
272 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
273 if retries == count {
274 return err
275 }
276 count += 1
277 // Take a nap before retrying
278 time.Sleep(2 * time.Second)
279 } else {
280 break
281 }
282 }
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000283 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Scott Bakereee8dd82019-09-24 12:52:34 -0700284 log.Info("registered-with-core")
285 return nil
286}
287
288func waitForExit() int {
289 signalChannel := make(chan os.Signal, 1)
290 signal.Notify(signalChannel,
291 syscall.SIGHUP,
292 syscall.SIGINT,
293 syscall.SIGTERM,
294 syscall.SIGQUIT)
295
296 exitChannel := make(chan int)
297
298 go func() {
299 s := <-signalChannel
300 switch s {
301 case syscall.SIGHUP,
302 syscall.SIGINT,
303 syscall.SIGTERM,
304 syscall.SIGQUIT:
305 log.Infow("closing-signal-received", log.Fields{"signal": s})
306 exitChannel <- 0
307 default:
308 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
309 exitChannel <- 1
310 }
311 }()
312
313 code := <-exitChannel
314 return code
315}
316
317func printBanner() {
318 fmt.Println(" _ _ _ _ ")
319 fmt.Println(" ___(_)_ __ ___ _ _| | __ _| |_ ___ __| | ___ _ __ _ _ ")
320 fmt.Println("/ __| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | / _ \\| '_ \\| | | | ")
321 fmt.Println("\\__ \\ | | | | | | |_| | | (_| | || __/ (_| | | (_) | | | | |_| | ")
322 fmt.Println("|___/_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|___\\___/|_| |_|\\__,_| ")
323 fmt.Println(" |_____| ")
324 fmt.Println(" ")
325}
326
327func main() {
328 start := time.Now()
329
330 cf := config.NewAdapterFlags()
331 cf.ParseCommandArguments()
332
David Bainbridge69ef4bd2019-10-22 21:48:37 +0000333 if cf.PrintVersion {
334 fmt.Println(version.VersionInfo.String(""))
335 return
336 }
337
Scott Bakereee8dd82019-09-24 12:52:34 -0700338 //// Setup logging
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700339 logLevel, err := log.StringToLogLevel(cf.LogLevel)
340 if err != nil {
341 log.Fatalf("Cannot setup logging, %s", err)
342 }
Scott Bakereee8dd82019-09-24 12:52:34 -0700343
344 //Setup default logger - applies for packages that do not have specific logger set
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700345 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Scott Bakereee8dd82019-09-24 12:52:34 -0700346 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
347 }
348
349 // Update all loggers (provisionned via init) with a common field
350 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
351 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
352 }
353
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700354 log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v3/pkg/adapters/common", log.DebugLevel)
Scott Bakereee8dd82019-09-24 12:52:34 -0700355
356 defer log.CleanUp()
357
358 // Print banner if specified
359 if cf.Banner {
360 printBanner()
361 }
362
363 log.Infow("config", log.Fields{"config": *cf})
364
365 ctx, cancel := context.WithCancel(context.Background())
366 defer cancel()
367
368 ad := newAdapter(cf)
Vignesh Ethiraj531322b2019-10-14 14:07:19 +0000369
370 p := &probe.Probe{}
371 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
372 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
373 go ad.start(probeCtx)
Scott Bakereee8dd82019-09-24 12:52:34 -0700374
375 code := waitForExit()
376 log.Infow("received-a-closing-signal", log.Fields{"code": code})
377
378 // Cleanup before leaving
Matteo Scandolo18f5eb12020-04-17 10:34:25 -0700379 ad.stop(ctx)
Scott Bakereee8dd82019-09-24 12:52:34 -0700380
381 elapsed := time.Since(start)
382 log.Infow("runtime", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
383}