blob: 2245621c2649b1be748465e8029b69e0f982db1c [file] [log] [blame]
Scott Baker2d897982019-09-24 11:50:08 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package main
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Bakerb61e3332019-10-24 13:36:06 -070022 "github.com/opencord/voltha-lib-go/v2/pkg/adapters"
23 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
24 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
25 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v2/pkg/log"
27 "github.com/opencord/voltha-lib-go/v2/pkg/probe"
28 "github.com/opencord/voltha-lib-go/v2/pkg/version"
Scott Bakerc3337a52019-11-04 09:24:30 -080029 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
30 "github.com/opencord/voltha-protos/v2/go/voltha"
Scott Baker2d897982019-09-24 11:50:08 -070031 ac "github.com/opencord/voltha-simolt-adapter/internal/pkg/adaptercore"
32 "github.com/opencord/voltha-simolt-adapter/internal/pkg/config"
33 "os"
34 "os/signal"
35 "strconv"
36 "syscall"
37 "time"
38)
39
40type adapter struct {
41 instanceId string
42 config *config.AdapterFlags
43 iAdapter adapters.IAdapter
44 kafkaClient kafka.Client
45 kvClient kvstore.Client
46 kip *kafka.InterContainerProxy
47 coreProxy *com.CoreProxy
48 halted bool
49 exitChannel chan int
50 receiverChannels []<-chan *ic.InterContainerMessage
51}
52
53func init() {
54 log.AddPackage(log.JSON, log.DebugLevel, nil)
55}
56
57func newAdapter(cf *config.AdapterFlags) *adapter {
58 var a adapter
59 a.instanceId = cf.InstanceID
60 a.config = cf
61 a.halted = false
62 a.exitChannel = make(chan int, 1)
63 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
64 return &a
65}
66
67func (a *adapter) start(ctx context.Context) {
68 log.Info("Starting Core Adapter components")
69 var err error
70
Vignesh Ethiraj5620c672019-10-14 13:18:52 +000071 var p *probe.Probe
72 if value := ctx.Value(probe.ProbeContextKey); value != nil {
73 if _, ok := value.(*probe.Probe); ok {
74 p = value.(*probe.Probe)
75 p.RegisterService(
76 "message-bus",
77 "kv-store",
78 "container-proxy",
79 "core-request-handler",
80 "register-with-core",
81 )
82 }
83 }
84
Scott Baker2d897982019-09-24 11:50:08 -070085 // Setup KV Client
86 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
87 if err := a.setKVClient(); err != nil {
88 log.Fatal("error-setting-kv-client")
89 }
90
Vignesh Ethiraj5620c672019-10-14 13:18:52 +000091 if p != nil {
92 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
93 }
94
Scott Baker2d897982019-09-24 11:50:08 -070095 // Setup Kafka Client
96 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
97 log.Fatal("Unsupported-common-client")
98 }
99
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000100 if p != nil {
101 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
102 }
103
Scott Baker2d897982019-09-24 11:50:08 -0700104 // Start the common InterContainer Proxy - retries indefinitely
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000105 if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
Scott Baker2d897982019-09-24 11:50:08 -0700106 log.Fatal("error-starting-inter-container-proxy")
107 }
108
109 // Create the core proxy to handle requests to the Core
110 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
111
112 // Create the simulated OLT adapter
113 if a.iAdapter, err = a.startSimulatedOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {
114 log.Fatal("error-starting-inter-container-proxy")
115 }
116
117 // Register the core request handler
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000118 if err = a.setupRequestHandler(ctx, a.instanceId, a.iAdapter, a.coreProxy); err != nil {
Scott Baker2d897982019-09-24 11:50:08 -0700119 log.Fatal("error-setting-core-request-handler")
120 }
121
122 // Register this adapter to the Core - retries indefinitely
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000123 if err = a.registerWithCore(ctx, -1); err != nil {
Scott Baker2d897982019-09-24 11:50:08 -0700124 log.Fatal("error-registering-with-core")
125 }
126}
127
128func (rw *adapter) stop() {
129 // Stop leadership tracking
130 rw.halted = true
131
132 // send exit signal
133 rw.exitChannel <- 0
134
135 // Cleanup - applies only if we had a kvClient
136 if rw.kvClient != nil {
137 // Release all reservations
138 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
139 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
140 }
141 // Close the DB connection
142 rw.kvClient.Close()
143 }
144
145 // TODO: More cleanup
146}
147
148func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
149
150 log.Infow("kv-store-type", log.Fields{"store": storeType})
151 switch storeType {
152 case "consul":
153 return kvstore.NewConsulClient(address, timeout)
154 case "etcd":
155 return kvstore.NewEtcdClient(address, timeout)
156 }
157 return nil, errors.New("unsupported-kv-store")
158}
159
160func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
161
162 log.Infow("common-client-type", log.Fields{"client": clientType})
163 switch clientType {
164 case "sarama":
165 return kafka.NewSaramaClient(
166 kafka.Host(host),
167 kafka.Port(port),
168 kafka.ProducerReturnOnErrors(true),
169 kafka.ProducerReturnOnSuccess(true),
170 kafka.ProducerMaxRetries(6),
171 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
172 }
173 return nil, errors.New("unsupported-client-type")
174}
175
176func (a *adapter) setKVClient() error {
177 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
178 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
179 if err != nil {
180 a.kvClient = nil
181 log.Error(err)
182 return err
183 }
184 a.kvClient = client
185 return nil
186}
187
188func toString(value interface{}) (string, error) {
189 switch t := value.(type) {
190 case []byte:
191 return string(value.([]byte)), nil
192 case string:
193 return value.(string), nil
194 default:
195 return "", fmt.Errorf("unexpected-type-%T", t)
196 }
197}
198
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000199func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
Scott Baker2d897982019-09-24 11:50:08 -0700200 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
201 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
202 var err error
203 var kip *kafka.InterContainerProxy
204 if kip, err = kafka.NewInterContainerProxy(
205 kafka.InterContainerHost(a.config.KafkaAdapterHost),
206 kafka.InterContainerPort(a.config.KafkaAdapterPort),
207 kafka.MsgClient(a.kafkaClient),
208 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
209 log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
210 return nil, err
211 }
212 count := 0
213 for {
214 if err = kip.Start(); err != nil {
215 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
216 if retries == count {
217 return nil, err
218 }
219 count = +1
220 // Take a nap before retrying
221 time.Sleep(2 * time.Second)
222 } else {
223 break
224 }
225 }
226
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000227 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Scott Baker2d897982019-09-24 11:50:08 -0700228 log.Info("common-messaging-proxy-created")
229 return kip, nil
230}
231
232func (a *adapter) startSimulatedOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.SimulatedOLT, error) {
233 log.Info("starting-simulated-olt")
234 var err error
235 sOLT := ac.NewSimulatedOLT(ctx, a.kip, cp, onuNumber)
236
237 if err = sOLT.Start(ctx); err != nil {
238 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
239 return nil, err
240 }
241
242 log.Info("simulated-olt-started")
243 return sOLT, nil
244}
245
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000246func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
Scott Baker2d897982019-09-24 11:50:08 -0700247 log.Info("setting-request-handler")
248 requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
249 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
250 log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
251 return err
252
253 }
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000254 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Scott Baker2d897982019-09-24 11:50:08 -0700255 log.Info("request-handler-setup-done")
256 return nil
257}
258
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000259func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Scott Baker2d897982019-09-24 11:50:08 -0700260 log.Info("registering-with-core")
Vignesh Ethiraje872b172019-09-26 10:01:47 +0000261 adapterDescription := &voltha.Adapter{
262 Id: "simulated_olt",
263 Vendor: "Open Networking Foundation",
264 Version: version.VersionInfo.Version,
265 }
Scott Baker2d897982019-09-24 11:50:08 -0700266 types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt", AcceptsAddRemoveFlowUpdates: true}}
267 deviceTypes := &voltha.DeviceTypes{Items: types}
268 count := 0
269 for {
270 if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
271 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
272 if retries == count {
273 return err
274 }
275 count += 1
276 // Take a nap before retrying
277 time.Sleep(2 * time.Second)
278 } else {
279 break
280 }
281 }
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000282 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Scott Baker2d897982019-09-24 11:50:08 -0700283 log.Info("registered-with-core")
284 return nil
285}
286
287func waitForExit() int {
288 signalChannel := make(chan os.Signal, 1)
289 signal.Notify(signalChannel,
290 syscall.SIGHUP,
291 syscall.SIGINT,
292 syscall.SIGTERM,
293 syscall.SIGQUIT)
294
295 exitChannel := make(chan int)
296
297 go func() {
298 s := <-signalChannel
299 switch s {
300 case syscall.SIGHUP,
301 syscall.SIGINT,
302 syscall.SIGTERM,
303 syscall.SIGQUIT:
304 log.Infow("closing-signal-received", log.Fields{"signal": s})
305 exitChannel <- 0
306 default:
307 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
308 exitChannel <- 1
309 }
310 }()
311
312 code := <-exitChannel
313 return code
314}
315
316func printBanner() {
317 fmt.Println(" ____ _ _ _ _ ___ _ _____ ")
318 fmt.Println("/ ___|(_)_ __ ___ _ _| | __ _| |_ ___ __| |/ _ \\| | |_ _| ")
319 fmt.Println("\\___ \\| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | | | | | | | ")
320 fmt.Println(" ___) | | | | | | | |_| | | (_| | || __/ (_| | |_| | |___| | ")
321 fmt.Println("|____/|_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|\\___/|_____|_| ")
322 fmt.Println(" ")
323}
324
325func main() {
326 start := time.Now()
327
328 cf := config.NewAdapterFlags()
329 cf.ParseCommandArguments()
330
David Bainbridge8ea2c9d2019-10-22 21:35:22 +0000331 if cf.PrintVersion {
332 fmt.Println(version.VersionInfo.String(""))
333 return
334 }
335
Scott Baker2d897982019-09-24 11:50:08 -0700336 //// Setup logging
337
338 //Setup default logger - applies for packages that do not have specific logger set
339 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
340 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
341 }
342
343 // Update all loggers (provisionned via init) with a common field
344 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
345 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
346 }
347
Scott Bakerb61e3332019-10-24 13:36:06 -0700348 log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v2/pkg/adapters/common", log.DebugLevel)
Scott Baker2d897982019-09-24 11:50:08 -0700349
350 defer log.CleanUp()
351
352 // Print banner if specified
353 if cf.Banner {
354 printBanner()
355 }
356
357 log.Infow("config", log.Fields{"config": *cf})
358
359 ctx, cancel := context.WithCancel(context.Background())
360 defer cancel()
361
362 ad := newAdapter(cf)
Vignesh Ethiraj5620c672019-10-14 13:18:52 +0000363
364 p := &probe.Probe{}
365 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
366
367 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
368
369 go ad.start(probeCtx)
Scott Baker2d897982019-09-24 11:50:08 -0700370
371 code := waitForExit()
372 log.Infow("received-a-closing-signal", log.Fields{"code": code})
373
374 // Cleanup before leaving
375 ad.stop()
376
377 elapsed := time.Since(start)
378 log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
379}