blob: 62cf2dba9b38599242403a1c701ec7b460982f88 [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package main
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/adapters"
23 com "github.com/opencord/voltha-go/adapters/common"
24 ac "github.com/opencord/voltha-go/adapters/simulated_onu/adaptercore"
25 "github.com/opencord/voltha-go/adapters/simulated_onu/config"
26 "github.com/opencord/voltha-go/common/log"
27 "github.com/opencord/voltha-go/db/kvstore"
28 "github.com/opencord/voltha-go/kafka"
29 ic "github.com/opencord/voltha-go/protos/inter_container"
30 "github.com/opencord/voltha-go/protos/voltha"
31 "os"
32 "os/signal"
33 "strconv"
34 "syscall"
35 "time"
36)
37
38type adapter struct {
39 instanceId string
40 config *config.AdapterFlags
41 iAdapter adapters.IAdapter
42 kafkaClient kafka.Client
43 kvClient kvstore.Client
44 kip *kafka.InterContainerProxy
45 coreProxy *com.CoreProxy
46 halted bool
47 exitChannel chan int
48 receiverChannels []<-chan *ic.InterContainerMessage
49}
50
51func init() {
52 log.AddPackage(log.JSON, log.DebugLevel, nil)
53}
54
55func newAdapter(cf *config.AdapterFlags) *adapter {
56 var a adapter
57 a.instanceId = cf.InstanceID
58 a.config = cf
59 a.halted = false
60 a.exitChannel = make(chan int, 1)
61 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
62 return &a
63}
64
65func (a *adapter) start(ctx context.Context) {
66 log.Info("Starting Core Adapter components")
67 var err error
68
69 // Setup KV Client
70 log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
71 if err := a.setKVClient(); err != nil {
72 log.Fatal("error-setting-kv-client")
73 }
74
75 // Setup Kafka Client
76 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
77 log.Fatal("Unsupported-common-client")
78 }
79
80 // Start the common InterContainer Proxy - retry indefinitely
81 if a.kip, err = a.startInterContainerProxy(-1); err != nil {
82 log.Fatal("error-starting-inter-container-proxy")
83 }
84
85 // Create the core proxy to handle requests to the Core
86 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
87
88 // Create the simulated OLT adapter
89 if a.iAdapter, err = a.startSimulatedONU(ctx, a.kip, a.coreProxy); err != nil {
90 log.Fatal("error-starting-inter-container-proxy")
91 }
92
93 // Register the core request handler
94 if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
95 log.Fatal("error-setting-core-request-handler")
96 }
97
98 // Register this adapter to the Core - retry indefinitely
99 if err = a.registerWithCore(-1); err != nil {
100 log.Fatal("error-registering-with-core")
101 }
102}
103
104func (rw *adapter) stop() {
105 // Stop leadership tracking
106 rw.halted = true
107
108 // send exit signal
109 rw.exitChannel <- 0
110
111 // Cleanup - applies only if we had a kvClient
112 if rw.kvClient != nil {
113 // Release all reservations
114 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
115 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
116 }
117 // Close the DB connection
118 rw.kvClient.Close()
119 }
120
121 // TODO: More cleanup
122}
123
124func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
125
126 log.Infow("kv-store-type", log.Fields{"store": storeType})
127 switch storeType {
128 case "consul":
129 return kvstore.NewConsulClient(address, timeout)
130 case "etcd":
131 return kvstore.NewEtcdClient(address, timeout)
132 }
133 return nil, errors.New("unsupported-kv-store")
134}
135
136func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
137
138 log.Infow("common-client-type", log.Fields{"client": clientType})
139 switch clientType {
140 case "sarama":
141 return kafka.NewSaramaClient(
142 kafka.Host(host),
143 kafka.Port(port),
144 kafka.ProducerReturnOnErrors(true),
145 kafka.ProducerReturnOnSuccess(true),
146 kafka.ProducerMaxRetries(6),
147 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
148 }
149 return nil, errors.New("unsupported-client-type")
150}
151
152func (a *adapter) setKVClient() error {
153 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
154 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
155 if err != nil {
156 a.kvClient = nil
157 log.Error(err)
158 return err
159 }
160 a.kvClient = client
161 return nil
162}
163
164func toString(value interface{}) (string, error) {
165 switch t := value.(type) {
166 case []byte:
167 return string(value.([]byte)), nil
168 case string:
169 return value.(string), nil
170 default:
171 return "", fmt.Errorf("unexpected-type-%T", t)
172 }
173}
174
175func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
176 log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
177 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
178 var err error
179 var kip *kafka.InterContainerProxy
180 if kip, err = kafka.NewInterContainerProxy(
181 kafka.InterContainerHost(a.config.KafkaAdapterHost),
182 kafka.InterContainerPort(a.config.KafkaAdapterPort),
183 kafka.MsgClient(a.kafkaClient),
184 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
185 log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
186 return nil, err
187 }
188 count := 0
189 for {
190 if err = kip.Start(); err != nil {
191 log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
192 if retries == count {
193 return nil, err
194 }
195 count = +1
196 // Take a nap before retrying
197 time.Sleep(2 * time.Second)
198 } else {
199 break
200 }
201 }
202
203 log.Info("common-messaging-proxy-created")
204 return kip, nil
205}
206
207func (a *adapter) startSimulatedONU(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy) (*ac.SimulatedONU, error) {
208 log.Info("starting-simulated-onu")
209 var err error
210 sOLT := ac.NewSimulatedONU(ctx, a.kip, cp)
211
212 if err = sOLT.Start(ctx); err != nil {
213 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
214 return nil, err
215 }
216
217 log.Info("simulated-olt-started")
218 return sOLT, nil
219}
220
221func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
222 log.Info("setting-request-handler")
223 requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
224 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
225 log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
226 return err
227
228 }
229 log.Info("request-handler-setup-done")
230 return nil
231}
232func (a *adapter) registerWithCore(retries int) error {
233 log.Info("registering-with-core")
234 adapterDescription := &voltha.Adapter{Id: "simulated_onu", Vendor: "simulation Enterprise Inc"}
235 types := []*voltha.DeviceType{{Id: "simulated_onu", Adapter: "simulated_onu"}}
236 deviceTypes := &voltha.DeviceTypes{Items: types}
237 count := 0
238 for {
239 if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
240 log.Warnw("registering-with-core-failed", log.Fields{"error": err})
241 if retries == count {
242 return err
243 }
244 count += 1
245 // Take a nap before retrying
246 time.Sleep(2 * time.Second)
247 } else {
248 break
249 }
250 }
251 log.Info("registered-with-core")
252 return nil
253}
254
255func waitForExit() int {
256 signalChannel := make(chan os.Signal, 1)
257 signal.Notify(signalChannel,
258 syscall.SIGHUP,
259 syscall.SIGINT,
260 syscall.SIGTERM,
261 syscall.SIGQUIT)
262
263 exitChannel := make(chan int)
264
265 go func() {
266 s := <-signalChannel
267 switch s {
268 case syscall.SIGHUP,
269 syscall.SIGINT,
270 syscall.SIGTERM,
271 syscall.SIGQUIT:
272 log.Infow("closing-signal-received", log.Fields{"signal": s})
273 exitChannel <- 0
274 default:
275 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
276 exitChannel <- 1
277 }
278 }()
279
280 code := <-exitChannel
281 return code
282}
283
284func printBanner() {
285 fmt.Println(" _ _ _ _ ")
286 fmt.Println(" ___(_)_ __ ___ _ _| | __ _| |_ ___ __| | ___ _ __ _ _ ")
287 fmt.Println("/ __| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | / _ \\| '_ \\| | | | ")
288 fmt.Println("\\__ \\ | | | | | | |_| | | (_| | || __/ (_| | | (_) | | | | |_| | ")
289 fmt.Println("|___/_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|___\\___/|_| |_|\\__,_| ")
290 fmt.Println(" |_____| ")
291 fmt.Println(" ")
292}
293
294func main() {
295 start := time.Now()
296
297 cf := config.NewAdapterFlags()
298 cf.ParseCommandArguments()
299
300 //// Setup logging
301
302 //Setup default logger - applies for packages that do not have specific logger set
303 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
304 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
305 }
306
307 // Update all loggers (provisionned via init) with a common field
308 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
309 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
310 }
311
312 log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
313
314 defer log.CleanUp()
315
316 // Print banner if specified
317 if cf.Banner {
318 printBanner()
319 }
320
321 log.Infow("config", log.Fields{"config": *cf})
322
323 ctx, cancel := context.WithCancel(context.Background())
324 defer cancel()
325
326 ad := newAdapter(cf)
327 go ad.start(ctx)
328
329 code := waitForExit()
330 log.Infow("received-a-closing-signal", log.Fields{"code": code})
331
332 // Cleanup before leaving
333 ad.stop()
334
335 elapsed := time.Since(start)
336 log.Infow("runtime", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
337}