blob: c86cd45eee59356e70f11393764e04c65d4938c3 [file] [log] [blame]
Stephane Barbariea75791c2019-01-24 10:58:06 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package main
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 grpcserver "github.com/opencord/voltha-go/common/grpc"
23 "github.com/opencord/voltha-go/common/log"
24 "github.com/opencord/voltha-go/db/kvstore"
25 ic "github.com/opencord/voltha-go/protos/inter_container"
26 "github.com/opencord/voltha-go/ro_core/config"
27 c "github.com/opencord/voltha-go/ro_core/core"
28 "os"
29 "os/signal"
30 "strconv"
31 "syscall"
32 "time"
33)
34
35type roCore struct {
36 kvClient kvstore.Client
37 config *config.ROCoreFlags
38 halted bool
39 exitChannel chan int
40 grpcServer *grpcserver.GrpcServer
41 core *c.Core
42 //For test
43 receiverChannels []<-chan *ic.InterContainerMessage
44}
45
46func init() {
47 log.AddPackage(log.JSON, log.DebugLevel, nil)
48}
49
50func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
51
52 log.Infow("kv-store-type", log.Fields{"store": storeType})
53 switch storeType {
54 case "consul":
55 return kvstore.NewConsulClient(address, timeout)
56 case "etcd":
57 return kvstore.NewEtcdClient(address, timeout)
58 }
59 return nil, errors.New("unsupported-kv-store")
60}
61
62func newROCore(cf *config.ROCoreFlags) *roCore {
63 var roCore roCore
64 roCore.config = cf
65 roCore.halted = false
66 roCore.exitChannel = make(chan int, 1)
67 roCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
68 return &roCore
69}
70
71func (ro *roCore) setKVClient() error {
72 addr := ro.config.KVStoreHost + ":" + strconv.Itoa(ro.config.KVStorePort)
73 client, err := newKVClient(ro.config.KVStoreType, addr, ro.config.KVStoreTimeout)
74 if err != nil {
75 ro.kvClient = nil
76 log.Error(err)
77 return err
78 }
79 ro.kvClient = client
80 return nil
81}
82
83func toString(value interface{}) (string, error) {
84 switch t := value.(type) {
85 case []byte:
86 return string(value.([]byte)), nil
87 case string:
88 return value.(string), nil
89 default:
90 return "", fmt.Errorf("unexpected-type-%T", t)
91 }
92}
93
94func (ro *roCore) start(ctx context.Context) {
95 log.Info("Starting RW Core components")
96
97 // Setup KV Client
98 log.Debugw("create-kv-client", log.Fields{"kvstore": ro.config.KVStoreType})
99 ro.setKVClient()
100
101 // Create the core service
102 ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)
103
104 // start the core
105 ro.core.Start(ctx)
106}
107
108func (ro *roCore) stop() {
109 // Stop leadership tracking
110 ro.halted = true
111
112 // send exit signal
113 ro.exitChannel <- 0
114
115 // Cleanup - applies only if we had a kvClient
116 if ro.kvClient != nil {
117 // Release all reservations
118 if err := ro.kvClient.ReleaseAllReservations(); err != nil {
119 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
120 }
121 // Close the DB connection
122 ro.kvClient.Close()
123 }
124
125 ro.core.Stop(nil)
126}
127
128func waitForExit() int {
129 signalChannel := make(chan os.Signal, 1)
130 signal.Notify(signalChannel,
131 syscall.SIGHUP,
132 syscall.SIGINT,
133 syscall.SIGTERM,
134 syscall.SIGQUIT)
135
136 exitChannel := make(chan int)
137
138 go func() {
139 s := <-signalChannel
140 switch s {
141 case syscall.SIGHUP,
142 syscall.SIGINT,
143 syscall.SIGTERM,
144 syscall.SIGQUIT:
145 log.Infow("closing-signal-received", log.Fields{"signal": s})
146 exitChannel <- 0
147 default:
148 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
149 exitChannel <- 1
150 }
151 }()
152
153 code := <-exitChannel
154 return code
155}
156
157func printBanner() {
158 fmt.Println(" ")
159 fmt.Println(" ______ ______ ")
160 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
161 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
162 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
163 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
164 fmt.Println(" ")
165}
166
167func main() {
168 start := time.Now()
169
170 cf := config.NewROCoreFlags()
171 cf.ParseCommandArguments()
172
173 //// Setup logging
174
175 //Setup default logger - applies for packages that do not have specific logger set
176 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
177 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
178 }
179
180 // Update all loggers (provisionned via init) with a common field
181 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
182 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
183 }
184
185 log.SetPackageLogLevel("github.com/opencord/voltha-go/ro_core/core", log.DebugLevel)
186
187 defer log.CleanUp()
188
189 // Print banner if specified
190 if cf.Banner {
191 printBanner()
192 }
193
194 log.Infow("ro-core-config", log.Fields{"config": *cf})
195
196 ctx, cancel := context.WithCancel(context.Background())
197 defer cancel()
198
199 ro := newROCore(cf)
200 go ro.start(ctx)
201
202 code := waitForExit()
203 log.Infow("received-a-closing-signal", log.Fields{"code": code})
204
205 // Cleanup before leaving
206 ro.stop()
207
208 elapsed := time.Since(start)
209 log.Infow("ro-core-run-time", log.Fields{"core": ro.config.InstanceID, "time": elapsed / time.Second})
210}