blob: f324939caaa2ed2319531b1b4dba6c704e112e9d [file] [log] [blame]
Scott Baker2d897982019-09-24 11:50:08 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package main
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/adapters"
23 com "github.com/opencord/voltha-go/adapters/common"
24 "github.com/opencord/voltha-go/common/log"
Vignesh Ethiraje872b172019-09-26 10:01:47 +000025 "github.com/opencord/voltha-go/common/version"
Scott Baker2d897982019-09-24 11:50:08 -070026 "github.com/opencord/voltha-go/db/kvstore"
27 "github.com/opencord/voltha-go/kafka"
28 ic "github.com/opencord/voltha-protos/go/inter_container"
29 "github.com/opencord/voltha-protos/go/voltha"
30 ac "github.com/opencord/voltha-simolt-adapter/internal/pkg/adaptercore"
31 "github.com/opencord/voltha-simolt-adapter/internal/pkg/config"
32 "os"
33 "os/signal"
34 "strconv"
35 "syscall"
36 "time"
37)
38
39type adapter struct {
40 instanceId string
41 config *config.AdapterFlags
42 iAdapter adapters.IAdapter
43 kafkaClient kafka.Client
44 kvClient kvstore.Client
45 kip *kafka.InterContainerProxy
46 coreProxy *com.CoreProxy
47 halted bool
48 exitChannel chan int
49 receiverChannels []<-chan *ic.InterContainerMessage
50}
51
52func init() {
53 log.AddPackage(log.JSON, log.DebugLevel, nil)
54}
55
56func newAdapter(cf *config.AdapterFlags) *adapter {
57 var a adapter
58 a.instanceId = cf.InstanceID
59 a.config = cf
60 a.halted = false
61 a.exitChannel = make(chan int, 1)
62 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
63 return &a
64}
65
66func (a *adapter) start(ctx context.Context) {
67 log.Info("Starting Core Adapter components")
68 var err error
69
70 // Setup KV Client
71 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
72 if err := a.setKVClient(); err != nil {
73 log.Fatal("error-setting-kv-client")
74 }
75
76 // Setup Kafka Client
77 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
78 log.Fatal("Unsupported-common-client")
79 }
80
81 // Start the common InterContainer Proxy - retries indefinitely
82 if a.kip, err = a.startInterContainerProxy(-1); err != nil {
83 log.Fatal("error-starting-inter-container-proxy")
84 }
85
86 // Create the core proxy to handle requests to the Core
87 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
88
89 // Create the simulated OLT adapter
90 if a.iAdapter, err = a.startSimulatedOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {
91 log.Fatal("error-starting-inter-container-proxy")
92 }
93
94 // Register the core request handler
95 if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
96 log.Fatal("error-setting-core-request-handler")
97 }
98
99 // Register this adapter to the Core - retries indefinitely
100 if err = a.registerWithCore(-1); err != nil {
101 log.Fatal("error-registering-with-core")
102 }
103}
104
105func (rw *adapter) stop() {
106 // Stop leadership tracking
107 rw.halted = true
108
109 // send exit signal
110 rw.exitChannel <- 0
111
112 // Cleanup - applies only if we had a kvClient
113 if rw.kvClient != nil {
114 // Release all reservations
115 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
116 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
117 }
118 // Close the DB connection
119 rw.kvClient.Close()
120 }
121
122 // TODO: More cleanup
123}
124
125func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
126
127 log.Infow("kv-store-type", log.Fields{"store": storeType})
128 switch storeType {
129 case "consul":
130 return kvstore.NewConsulClient(address, timeout)
131 case "etcd":
132 return kvstore.NewEtcdClient(address, timeout)
133 }
134 return nil, errors.New("unsupported-kv-store")
135}
136
137func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
138
139 log.Infow("common-client-type", log.Fields{"client": clientType})
140 switch clientType {
141 case "sarama":
142 return kafka.NewSaramaClient(
143 kafka.Host(host),
144 kafka.Port(port),
145 kafka.ProducerReturnOnErrors(true),
146 kafka.ProducerReturnOnSuccess(true),
147 kafka.ProducerMaxRetries(6),
148 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
149 }
150 return nil, errors.New("unsupported-client-type")
151}
152
153func (a *adapter) setKVClient() error {
154 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
155 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
156 if err != nil {
157 a.kvClient = nil
158 log.Error(err)
159 return err
160 }
161 a.kvClient = client
162 return nil
163}
164
165func toString(value interface{}) (string, error) {
166 switch t := value.(type) {
167 case []byte:
168 return string(value.([]byte)), nil
169 case string:
170 return value.(string), nil
171 default:
172 return "", fmt.Errorf("unexpected-type-%T", t)
173 }
174}
175
176func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
177 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
178 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
179 var err error
180 var kip *kafka.InterContainerProxy
181 if kip, err = kafka.NewInterContainerProxy(
182 kafka.InterContainerHost(a.config.KafkaAdapterHost),
183 kafka.InterContainerPort(a.config.KafkaAdapterPort),
184 kafka.MsgClient(a.kafkaClient),
185 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
186 log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
187 return nil, err
188 }
189 count := 0
190 for {
191 if err = kip.Start(); err != nil {
192 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
193 if retries == count {
194 return nil, err
195 }
196 count = +1
197 // Take a nap before retrying
198 time.Sleep(2 * time.Second)
199 } else {
200 break
201 }
202 }
203
204 log.Info("common-messaging-proxy-created")
205 return kip, nil
206}
207
208func (a *adapter) startSimulatedOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.SimulatedOLT, error) {
209 log.Info("starting-simulated-olt")
210 var err error
211 sOLT := ac.NewSimulatedOLT(ctx, a.kip, cp, onuNumber)
212
213 if err = sOLT.Start(ctx); err != nil {
214 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
215 return nil, err
216 }
217
218 log.Info("simulated-olt-started")
219 return sOLT, nil
220}
221
222func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
223 log.Info("setting-request-handler")
224 requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
225 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
226 log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
227 return err
228
229 }
230 log.Info("request-handler-setup-done")
231 return nil
232}
233
234func (a *adapter) registerWithCore(retries int) error {
235 log.Info("registering-with-core")
Vignesh Ethiraje872b172019-09-26 10:01:47 +0000236 adapterDescription := &voltha.Adapter{
237 Id: "simulated_olt",
238 Vendor: "Open Networking Foundation",
239 Version: version.VersionInfo.Version,
240 }
Scott Baker2d897982019-09-24 11:50:08 -0700241 types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt", AcceptsAddRemoveFlowUpdates: true}}
242 deviceTypes := &voltha.DeviceTypes{Items: types}
243 count := 0
244 for {
245 if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
246 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
247 if retries == count {
248 return err
249 }
250 count += 1
251 // Take a nap before retrying
252 time.Sleep(2 * time.Second)
253 } else {
254 break
255 }
256 }
257 log.Info("registered-with-core")
258 return nil
259}
260
261func waitForExit() int {
262 signalChannel := make(chan os.Signal, 1)
263 signal.Notify(signalChannel,
264 syscall.SIGHUP,
265 syscall.SIGINT,
266 syscall.SIGTERM,
267 syscall.SIGQUIT)
268
269 exitChannel := make(chan int)
270
271 go func() {
272 s := <-signalChannel
273 switch s {
274 case syscall.SIGHUP,
275 syscall.SIGINT,
276 syscall.SIGTERM,
277 syscall.SIGQUIT:
278 log.Infow("closing-signal-received", log.Fields{"signal": s})
279 exitChannel <- 0
280 default:
281 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
282 exitChannel <- 1
283 }
284 }()
285
286 code := <-exitChannel
287 return code
288}
289
290func printBanner() {
291 fmt.Println(" ____ _ _ _ _ ___ _ _____ ")
292 fmt.Println("/ ___|(_)_ __ ___ _ _| | __ _| |_ ___ __| |/ _ \\| | |_ _| ")
293 fmt.Println("\\___ \\| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | | | | | | | ")
294 fmt.Println(" ___) | | | | | | | |_| | | (_| | || __/ (_| | |_| | |___| | ")
295 fmt.Println("|____/|_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|\\___/|_____|_| ")
296 fmt.Println(" ")
297}
298
299func main() {
300 start := time.Now()
301
302 cf := config.NewAdapterFlags()
303 cf.ParseCommandArguments()
304
305 //// Setup logging
306
307 //Setup default logger - applies for packages that do not have specific logger set
308 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
309 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
310 }
311
312 // Update all loggers (provisionned via init) with a common field
313 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
314 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
315 }
316
317 log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
318
319 defer log.CleanUp()
320
321 // Print banner if specified
322 if cf.Banner {
323 printBanner()
324 }
325
326 log.Infow("config", log.Fields{"config": *cf})
327
328 ctx, cancel := context.WithCancel(context.Background())
329 defer cancel()
330
331 ad := newAdapter(cf)
332 go ad.start(ctx)
333
334 code := waitForExit()
335 log.Infow("received-a-closing-signal", log.Fields{"code": code})
336
337 // Cleanup before leaving
338 ad.stop()
339
340 elapsed := time.Since(start)
341 log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
342}