blob: cd5dbe933df0e14e64c35186948f868dbc500fd0 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package main
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "context"
20 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040021 "fmt"
khenaidoobf6e7bb2018-08-14 22:27:29 -040022 grpcserver "github.com/opencord/voltha-go/common/grpc"
khenaidoo5c11af72018-07-20 17:21:05 -040023 "github.com/opencord/voltha-go/common/log"
24 "github.com/opencord/voltha-go/db/kvstore"
khenaidooabad44c2018-08-03 16:58:35 -040025 ca "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo5c11af72018-07-20 17:21:05 -040026 "github.com/opencord/voltha-go/rw_core/config"
khenaidoob9203542018-09-17 22:56:37 -040027 c "github.com/opencord/voltha-go/rw_core/core"
khenaidoocfee5f42018-07-19 22:47:38 -040028 "os"
29 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040030 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040031 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040032 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040033)
34
35type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040036 kvClient kvstore.Client
37 config *config.RWCoreFlags
38 halted bool
39 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040040 //kmp *kafka.KafkaMessagingProxy
41 grpcServer *grpcserver.GrpcServer
42 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040043 //For test
44 receiverChannels []<-chan *ca.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040045}
46
khenaidoob9203542018-09-17 22:56:37 -040047func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040048 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoob9203542018-09-17 22:56:37 -040049}
50
khenaidoocfee5f42018-07-19 22:47:38 -040051func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
52
khenaidoo5c11af72018-07-20 17:21:05 -040053 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040054 switch storeType {
55 case "consul":
56 return kvstore.NewConsulClient(address, timeout)
57 case "etcd":
58 return kvstore.NewEtcdClient(address, timeout)
59 }
60 return nil, errors.New("unsupported-kv-store")
61}
62
63func newRWCore(cf *config.RWCoreFlags) *rwCore {
64 var rwCore rwCore
65 rwCore.config = cf
66 rwCore.halted = false
67 rwCore.exitChannel = make(chan int, 1)
khenaidooabad44c2018-08-03 16:58:35 -040068 rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040069 return &rwCore
70}
71
khenaidoob9203542018-09-17 22:56:37 -040072func (rw *rwCore) setKVClient() error {
73 addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
74 client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
khenaidoocfee5f42018-07-19 22:47:38 -040075 if err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -040076 rw.kvClient = nil
khenaidoocfee5f42018-07-19 22:47:38 -040077 log.Error(err)
78 return err
79 }
khenaidoob9203542018-09-17 22:56:37 -040080 rw.kvClient = client
khenaidoocfee5f42018-07-19 22:47:38 -040081 return nil
82}
83
khenaidoocfee5f42018-07-19 22:47:38 -040084func toString(value interface{}) (string, error) {
85 switch t := value.(type) {
86 case []byte:
87 return string(value.([]byte)), nil
88 case string:
89 return value.(string), nil
90 default:
91 return "", fmt.Errorf("unexpected-type-%T", t)
92 }
93}
94
khenaidoob9203542018-09-17 22:56:37 -040095//func (rw *rwCore) createGRPCService(context.Context) {
96// // create an insecure gserver server
97// rw.grpcServer = grpcserver.NewGrpcServer(rw.config.GrpcHost, rw.config.GrpcPort, nil, false)
98// log.Info("grpc-server-created")
99//}
khenaidoobf6e7bb2018-08-14 22:27:29 -0400100
khenaidoob9203542018-09-17 22:56:37 -0400101//func (rw *rwCore) startKafkaMessagingProxy(ctx context.Context) error {
102// log.Infow("starting-kafka-messaging-proxy", log.Fields{"host":rw.config.KafkaAdapterHost,
103// "port":rw.config.KafkaAdapterPort, "topic":rw.config.CoreTopic})
104// var err error
105// if rw.kmp, err = kafka.NewKafkaMessagingProxy(
106// kafka.KafkaHost(rw.config.KafkaAdapterHost),
107// kafka.KafkaPort(rw.config.KafkaAdapterPort),
108// kafka.DefaultTopic(&kafka.Topic{Name: rw.config.CoreTopic})); err != nil {
109// log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
110// return err
111// }
112// if err = rw.kmp.Start(); err != nil {
113// log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
114// return err
115// }
116//
117// requestProxy := &c.RequestHandlerProxy{}
118// rw.kmp.SubscribeWithTarget(kafka.Topic{Name: rw.config.CoreTopic}, requestProxy)
119//
120// log.Info("started-kafka-messaging-proxy")
121// return nil
122//}
khenaidoobf6e7bb2018-08-14 22:27:29 -0400123
khenaidoob9203542018-09-17 22:56:37 -0400124func (rw *rwCore) start(ctx context.Context) {
khenaidoo5c11af72018-07-20 17:21:05 -0400125 log.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400126
127 //// Setup GRPC Server
128 //rw.createGRPCService(ctx)
129
130 //// Setup Kafka messaging services
131 //if err := rw.startKafkaMessagingProxy(ctx); err != nil {
132 // log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
133 //}
134
Richard Jankowskie4d77662018-10-17 13:53:21 -0400135 // Setup KV Client
136 log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
137 err := rw.setKVClient()
138 if err == nil {
139 // Setup KV transaction context
140 c.SetTransactionContext(rw.config.InstanceID,
141 "service/voltha/transactions/",
142 rw.kvClient,
143 rw.config.KVStoreTimeout,
144 rw.config.KVTxnKeyDelTime)
145 }
146
khenaidoob9203542018-09-17 22:56:37 -0400147 // Create the core service
Richard Jankowskie4d77662018-10-17 13:53:21 -0400148 rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
khenaidoob9203542018-09-17 22:56:37 -0400149
150 // start the core
151 rw.core.Start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400152}
153
khenaidoob9203542018-09-17 22:56:37 -0400154func (rw *rwCore) stop() {
khenaidoocfee5f42018-07-19 22:47:38 -0400155 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400156 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400157
khenaidoob9203542018-09-17 22:56:37 -0400158 //// Stop the Kafka messaging service
159 //if rw.kmp != nil {
160 // rw.kmp.Stop()
161 //}
khenaidooabad44c2018-08-03 16:58:35 -0400162
khenaidoocfee5f42018-07-19 22:47:38 -0400163 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400164 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400165
166 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400167 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400168 // Release all reservations
khenaidoob9203542018-09-17 22:56:37 -0400169 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400170 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400171 }
172 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400173 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400174 }
175}
176
177func waitForExit() int {
178 signalChannel := make(chan os.Signal, 1)
179 signal.Notify(signalChannel,
180 syscall.SIGHUP,
181 syscall.SIGINT,
182 syscall.SIGTERM,
183 syscall.SIGQUIT)
184
185 exitChannel := make(chan int)
186
187 go func() {
188 s := <-signalChannel
189 switch s {
190 case syscall.SIGHUP,
191 syscall.SIGINT,
192 syscall.SIGTERM,
193 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400194 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400195 exitChannel <- 0
196 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400197 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400198 exitChannel <- 1
199 }
200 }()
201
202 code := <-exitChannel
203 return code
204}
205
khenaidoo5c11af72018-07-20 17:21:05 -0400206func printBanner() {
207 fmt.Println(" ")
208 fmt.Println(" ______ ______ ")
209 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
210 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
211 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
212 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
213 fmt.Println(" ")
214}
215
khenaidoocfee5f42018-07-19 22:47:38 -0400216func main() {
217 start := time.Now()
218
219 cf := config.NewRWCoreFlags()
220 cf.ParseCommandArguments()
221
khenaidoob9203542018-09-17 22:56:37 -0400222 //// Setup logging
223
224 //Setup default logger - applies for packages that do not have specific logger set
225 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400226 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
227 }
khenaidoob9203542018-09-17 22:56:37 -0400228
229 // Update all loggers (provisionned via init) with a common field
230 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
231 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
232 }
233
234 log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidoo92e62c52018-10-03 14:02:54 -0400235 log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
khenaidoob9203542018-09-17 22:56:37 -0400236
khenaidoocfee5f42018-07-19 22:47:38 -0400237 defer log.CleanUp()
238
khenaidoo5c11af72018-07-20 17:21:05 -0400239 // Print banner if specified
240 if cf.Banner {
241 printBanner()
242 }
243
244 log.Infow("rw-core-config", log.Fields{"config": *cf})
245
246 ctx, cancel := context.WithCancel(context.Background())
247 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400248
khenaidoob9203542018-09-17 22:56:37 -0400249 rw := newRWCore(cf)
250 go rw.start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400251
252 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400253 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400254
255 // Cleanup before leaving
khenaidoob9203542018-09-17 22:56:37 -0400256 rw.stop()
khenaidoocfee5f42018-07-19 22:47:38 -0400257
258 elapsed := time.Since(start)
khenaidoob9203542018-09-17 22:56:37 -0400259 log.Infow("rw-core-run-time", log.Fields{"core": rw.config.InstanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400260}