blob: fc135f69be18f057c17ab7b58a26d6999ff5641f [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package main
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "context"
20 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040021 "fmt"
khenaidoobf6e7bb2018-08-14 22:27:29 -040022 grpcserver "github.com/opencord/voltha-go/common/grpc"
khenaidoo5c11af72018-07-20 17:21:05 -040023 "github.com/opencord/voltha-go/common/log"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070024 "github.com/opencord/voltha-go/common/version"
khenaidoo5c11af72018-07-20 17:21:05 -040025 "github.com/opencord/voltha-go/db/kvstore"
khenaidoo43c82122018-11-22 18:38:28 -050026 "github.com/opencord/voltha-go/kafka"
khenaidoo5c11af72018-07-20 17:21:05 -040027 "github.com/opencord/voltha-go/rw_core/config"
khenaidoob9203542018-09-17 22:56:37 -040028 c "github.com/opencord/voltha-go/rw_core/core"
khenaidoo2c6a0992019-04-29 13:46:56 -040029 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoocfee5f42018-07-19 22:47:38 -040030 "os"
31 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040032 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040033 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040034 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040035)
36
37type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040038 kvClient kvstore.Client
39 config *config.RWCoreFlags
40 halted bool
41 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040042 //kmp *kafka.KafkaMessagingProxy
khenaidoo43c82122018-11-22 18:38:28 -050043 grpcServer *grpcserver.GrpcServer
44 kafkaClient kafka.Client
45 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040046 //For test
khenaidoo79232702018-12-04 11:00:41 -050047 receiverChannels []<-chan *ic.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040048}
49
khenaidoob9203542018-09-17 22:56:37 -040050func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040051 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoob9203542018-09-17 22:56:37 -040052}
53
khenaidoocfee5f42018-07-19 22:47:38 -040054func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
55
khenaidoo5c11af72018-07-20 17:21:05 -040056 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040057 switch storeType {
58 case "consul":
59 return kvstore.NewConsulClient(address, timeout)
60 case "etcd":
61 return kvstore.NewEtcdClient(address, timeout)
62 }
63 return nil, errors.New("unsupported-kv-store")
64}
65
khenaidooca301322019-01-09 23:06:32 -050066func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
khenaidoo43c82122018-11-22 18:38:28 -050067
68 log.Infow("kafka-client-type", log.Fields{"client": clientType})
69 switch clientType {
70 case "sarama":
71 return kafka.NewSaramaClient(
72 kafka.Host(host),
khenaidoo90847922018-12-03 14:47:51 -050073 kafka.Port(port),
khenaidooca301322019-01-09 23:06:32 -050074 kafka.ConsumerType(kafka.GroupCustomer),
khenaidoo90847922018-12-03 14:47:51 -050075 kafka.ProducerReturnOnErrors(true),
76 kafka.ProducerReturnOnSuccess(true),
77 kafka.ProducerMaxRetries(6),
khenaidooca301322019-01-09 23:06:32 -050078 kafka.NumPartitions(3),
79 kafka.ConsumerGroupName(instanceID),
80 kafka.ConsumerGroupPrefix(instanceID),
khenaidoo54e0ddf2019-02-27 16:21:33 -050081 kafka.AutoCreateTopic(true),
khenaidooca301322019-01-09 23:06:32 -050082 kafka.ProducerFlushFrequency(5),
khenaidoo79232702018-12-04 11:00:41 -050083 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
khenaidoo43c82122018-11-22 18:38:28 -050084 }
85 return nil, errors.New("unsupported-client-type")
86}
87
khenaidoocfee5f42018-07-19 22:47:38 -040088func newRWCore(cf *config.RWCoreFlags) *rwCore {
89 var rwCore rwCore
90 rwCore.config = cf
91 rwCore.halted = false
92 rwCore.exitChannel = make(chan int, 1)
khenaidoo79232702018-12-04 11:00:41 -050093 rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040094 return &rwCore
95}
96
khenaidoob9203542018-09-17 22:56:37 -040097func (rw *rwCore) setKVClient() error {
98 addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
99 client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
khenaidoocfee5f42018-07-19 22:47:38 -0400100 if err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400101 rw.kvClient = nil
khenaidoocfee5f42018-07-19 22:47:38 -0400102 log.Error(err)
103 return err
104 }
khenaidoob9203542018-09-17 22:56:37 -0400105 rw.kvClient = client
khenaidoocfee5f42018-07-19 22:47:38 -0400106 return nil
107}
108
khenaidoocfee5f42018-07-19 22:47:38 -0400109func toString(value interface{}) (string, error) {
110 switch t := value.(type) {
111 case []byte:
112 return string(value.([]byte)), nil
113 case string:
114 return value.(string), nil
115 default:
116 return "", fmt.Errorf("unexpected-type-%T", t)
117 }
118}
119
khenaidoob9203542018-09-17 22:56:37 -0400120func (rw *rwCore) start(ctx context.Context) {
khenaidoo5c11af72018-07-20 17:21:05 -0400121 log.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400122
khenaidoo90847922018-12-03 14:47:51 -0500123 // Setup KV Client
Richard Jankowskie4d77662018-10-17 13:53:21 -0400124 log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
125 err := rw.setKVClient()
126 if err == nil {
127 // Setup KV transaction context
khenaidoo9cdc1a62019-01-24 21:57:40 -0500128 txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
129 if err = c.SetTransactionContext(rw.config.InstanceID,
130 txnPrefix,
Richard Jankowskie4d77662018-10-17 13:53:21 -0400131 rw.kvClient,
132 rw.config.KVStoreTimeout,
Richard Jankowski199fd862019-03-18 14:49:51 -0400133 rw.config.KVTxnKeyDelTime,
khenaidoo1ce37ad2019-03-24 22:07:24 -0400134 1); err != nil {
khenaidoo9cdc1a62019-01-24 21:57:40 -0500135 log.Fatal("creating-transaction-context-failed")
136 }
Richard Jankowskie4d77662018-10-17 13:53:21 -0400137 }
138
khenaidoo90847922018-12-03 14:47:51 -0500139 // Setup Kafka Client
khenaidooca301322019-01-09 23:06:32 -0500140 if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, rw.config.InstanceID); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500141 log.Fatal("Unsupported-kafka-client")
142 }
143
khenaidoob9203542018-09-17 22:56:37 -0400144 // Create the core service
khenaidoo43c82122018-11-22 18:38:28 -0500145 rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient, rw.kafkaClient)
khenaidoob9203542018-09-17 22:56:37 -0400146
147 // start the core
148 rw.core.Start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400149}
150
khenaidoob9203542018-09-17 22:56:37 -0400151func (rw *rwCore) stop() {
khenaidoocfee5f42018-07-19 22:47:38 -0400152 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400153 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400154
155 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400156 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400157
158 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400159 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400160 // Release all reservations
khenaidoob9203542018-09-17 22:56:37 -0400161 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400162 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400163 }
164 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400165 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400166 }
khenaidoo43c82122018-11-22 18:38:28 -0500167
168 rw.core.Stop(nil)
169
170 //if rw.kafkaClient != nil {
171 // rw.kafkaClient.Stop()
172 //}
khenaidoocfee5f42018-07-19 22:47:38 -0400173}
174
175func waitForExit() int {
176 signalChannel := make(chan os.Signal, 1)
177 signal.Notify(signalChannel,
178 syscall.SIGHUP,
179 syscall.SIGINT,
180 syscall.SIGTERM,
181 syscall.SIGQUIT)
182
183 exitChannel := make(chan int)
184
185 go func() {
186 s := <-signalChannel
187 switch s {
188 case syscall.SIGHUP,
189 syscall.SIGINT,
190 syscall.SIGTERM,
191 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400192 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400193 exitChannel <- 0
194 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400195 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400196 exitChannel <- 1
197 }
198 }()
199
200 code := <-exitChannel
201 return code
202}
203
khenaidoo5c11af72018-07-20 17:21:05 -0400204func printBanner() {
205 fmt.Println(" ")
206 fmt.Println(" ______ ______ ")
207 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
208 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
209 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
210 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
211 fmt.Println(" ")
212}
213
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700214func printVersion() {
215 fmt.Println("VOLTHA Read-Write Core")
216 fmt.Println(version.VersionInfo.String(" "))
217}
218
khenaidoocfee5f42018-07-19 22:47:38 -0400219func main() {
220 start := time.Now()
221
222 cf := config.NewRWCoreFlags()
223 cf.ParseCommandArguments()
224
khenaidoo2c6a0992019-04-29 13:46:56 -0400225 // Setup logging
khenaidoob9203542018-09-17 22:56:37 -0400226
227 //Setup default logger - applies for packages that do not have specific logger set
228 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400229 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
230 }
khenaidoob9203542018-09-17 22:56:37 -0400231
232 // Update all loggers (provisionned via init) with a common field
233 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
234 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
235 }
236
khenaidoo2c6a0992019-04-29 13:46:56 -0400237 //log.SetAllLogLevel(log.ErrorLevel)
238
khenaidoob9203542018-09-17 22:56:37 -0400239 log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidoo2c6a0992019-04-29 13:46:56 -0400240 log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
241 log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
khenaidoo1ce37ad2019-03-24 22:07:24 -0400242 //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
243 //log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
khenaidoob9203542018-09-17 22:56:37 -0400244
khenaidoocfee5f42018-07-19 22:47:38 -0400245 defer log.CleanUp()
246
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700247 // Print verison / build information and exit
248 if cf.DisplayVersionOnly {
249 printVersion()
250 return
251 }
252
khenaidoo5c11af72018-07-20 17:21:05 -0400253 // Print banner if specified
254 if cf.Banner {
255 printBanner()
256 }
257
258 log.Infow("rw-core-config", log.Fields{"config": *cf})
259
260 ctx, cancel := context.WithCancel(context.Background())
261 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400262
khenaidoob9203542018-09-17 22:56:37 -0400263 rw := newRWCore(cf)
264 go rw.start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400265
266 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400267 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400268
269 // Cleanup before leaving
khenaidoob9203542018-09-17 22:56:37 -0400270 rw.stop()
khenaidoocfee5f42018-07-19 22:47:38 -0400271
272 elapsed := time.Since(start)
khenaidoob9203542018-09-17 22:56:37 -0400273 log.Infow("rw-core-run-time", log.Fields{"core": rw.config.InstanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400274}