blob: 7c87e2dbe77e4e10772daf29fba207b3bc347267 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoocfee5f42018-07-19 22:47:38 -040017package main
18
19import (
khenaidoo5c11af72018-07-20 17:21:05 -040020 "context"
21 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040022 "fmt"
23 "os"
24 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040025 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040026 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040027 "time"
npujar1d86a522019-11-14 17:11:16 +053028
29 "github.com/opencord/voltha-go/rw_core/config"
30 c "github.com/opencord/voltha-go/rw_core/core"
31 "github.com/opencord/voltha-go/rw_core/utils"
divyadesaic68c9c02020-02-26 12:48:09 +000032 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080033 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
35 "github.com/opencord/voltha-lib-go/v3/pkg/log"
36 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
37 "github.com/opencord/voltha-lib-go/v3/pkg/version"
38 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoocfee5f42018-07-19 22:47:38 -040039)
40
41type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040042 kvClient kvstore.Client
43 config *config.RWCoreFlags
44 halted bool
45 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040046 //kmp *kafka.KafkaMessagingProxy
khenaidoo43c82122018-11-22 18:38:28 -050047 kafkaClient kafka.Client
48 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040049 //For test
khenaidoo79232702018-12-04 11:00:41 -050050 receiverChannels []<-chan *ic.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040051}
52
53func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
54
Girish Kumarf56a4682020-03-20 20:07:46 +000055 logger.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040056 switch storeType {
57 case "consul":
58 return kvstore.NewConsulClient(address, timeout)
59 case "etcd":
60 return kvstore.NewEtcdClient(address, timeout)
61 }
62 return nil, errors.New("unsupported-kv-store")
63}
64
Scott Bakeree6a0872019-10-29 15:59:52 -070065func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
khenaidoo43c82122018-11-22 18:38:28 -050066
Girish Kumarf56a4682020-03-20 20:07:46 +000067 logger.Infow("kafka-client-type", log.Fields{"client": clientType})
khenaidoo43c82122018-11-22 18:38:28 -050068 switch clientType {
69 case "sarama":
70 return kafka.NewSaramaClient(
71 kafka.Host(host),
khenaidoo90847922018-12-03 14:47:51 -050072 kafka.Port(port),
khenaidooca301322019-01-09 23:06:32 -050073 kafka.ConsumerType(kafka.GroupCustomer),
khenaidoo90847922018-12-03 14:47:51 -050074 kafka.ProducerReturnOnErrors(true),
75 kafka.ProducerReturnOnSuccess(true),
76 kafka.ProducerMaxRetries(6),
khenaidooca301322019-01-09 23:06:32 -050077 kafka.NumPartitions(3),
78 kafka.ConsumerGroupName(instanceID),
79 kafka.ConsumerGroupPrefix(instanceID),
khenaidoo54e0ddf2019-02-27 16:21:33 -050080 kafka.AutoCreateTopic(true),
khenaidooca301322019-01-09 23:06:32 -050081 kafka.ProducerFlushFrequency(5),
Scott Bakeree6a0872019-10-29 15:59:52 -070082 kafka.ProducerRetryBackoff(time.Millisecond*30),
83 kafka.LivenessChannelInterval(livenessChannelInterval),
84 ), nil
khenaidoo43c82122018-11-22 18:38:28 -050085 }
86 return nil, errors.New("unsupported-client-type")
87}
88
khenaidoocfee5f42018-07-19 22:47:38 -040089func newRWCore(cf *config.RWCoreFlags) *rwCore {
90 var rwCore rwCore
91 rwCore.config = cf
92 rwCore.halted = false
93 rwCore.exitChannel = make(chan int, 1)
khenaidoo79232702018-12-04 11:00:41 -050094 rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040095 return &rwCore
96}
97
npujar1d86a522019-11-14 17:11:16 +053098func (rw *rwCore) start(ctx context.Context, instanceID string) {
Girish Kumarf56a4682020-03-20 20:07:46 +000099 logger.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400100
khenaidoo90847922018-12-03 14:47:51 -0500101 // Setup KV Client
Girish Kumarf56a4682020-03-20 20:07:46 +0000102 logger.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500103 var err error
104 if rw.kvClient, err = newKVClient(
105 rw.config.KVStoreType,
106 rw.config.KVStoreHost+":"+strconv.Itoa(rw.config.KVStorePort),
107 rw.config.KVStoreTimeout); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000108 logger.Fatal(err)
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500109 }
divyadesaic68c9c02020-02-26 12:48:09 +0000110 cm := conf.NewConfigManager(rw.kvClient, rw.config.KVStoreType, rw.config.KVStoreHost, rw.config.KVStorePort, rw.config.KVStoreTimeout)
divyadesai5ff30922020-03-19 05:46:25 +0000111 go conf.StartLogLevelConfigProcessing(cm, ctx)
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500112
113 // Setup KV transaction context
114 if err := c.SetTransactionContext(instanceID,
115 rw.config.KVStoreDataPrefix+"/transactions/",
116 rw.kvClient,
117 rw.config.KVStoreTimeout); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000118 logger.Fatal("creating-transaction-context-failed")
Richard Jankowskie4d77662018-10-17 13:53:21 -0400119 }
120
khenaidoo90847922018-12-03 14:47:51 -0500121 // Setup Kafka Client
Scott Bakeree6a0872019-10-29 15:59:52 -0700122 if rw.kafkaClient, err = newKafkaClient("sarama",
123 rw.config.KafkaAdapterHost,
124 rw.config.KafkaAdapterPort,
npujar1d86a522019-11-14 17:11:16 +0530125 instanceID,
Girish Kumar4d3887d2019-11-22 14:22:05 +0000126 rw.config.LiveProbeInterval/2); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000127 logger.Fatal("Unsupported-kafka-client")
khenaidoo43c82122018-11-22 18:38:28 -0500128 }
129
khenaidoob9203542018-09-17 22:56:37 -0400130 // Create the core service
Thomas Lee Se5a44012019-11-07 20:32:24 +0530131 rw.core = c.NewCore(ctx, instanceID, rw.config, rw.kvClient, rw.kafkaClient)
khenaidoob9203542018-09-17 22:56:37 -0400132
133 // start the core
Thomas Lee Se5a44012019-11-07 20:32:24 +0530134 err = rw.core.Start(ctx)
135 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000136 logger.Fatalf("failed-to-start-rwcore", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530137 }
khenaidoocfee5f42018-07-19 22:47:38 -0400138}
139
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700140func (rw *rwCore) stop(ctx context.Context) {
khenaidoocfee5f42018-07-19 22:47:38 -0400141 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400142 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400143
144 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400145 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400146
147 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400148 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400149 // Release all reservations
npujar467fe752020-01-16 20:17:45 +0530150 if err := rw.kvClient.ReleaseAllReservations(ctx); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000151 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400152 }
153 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400154 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400155 }
khenaidoo43c82122018-11-22 18:38:28 -0500156
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700157 rw.core.Stop(ctx)
khenaidoo43c82122018-11-22 18:38:28 -0500158
159 //if rw.kafkaClient != nil {
160 // rw.kafkaClient.Stop()
161 //}
khenaidoocfee5f42018-07-19 22:47:38 -0400162}
163
164func waitForExit() int {
165 signalChannel := make(chan os.Signal, 1)
166 signal.Notify(signalChannel,
167 syscall.SIGHUP,
168 syscall.SIGINT,
169 syscall.SIGTERM,
170 syscall.SIGQUIT)
171
172 exitChannel := make(chan int)
173
174 go func() {
175 s := <-signalChannel
176 switch s {
177 case syscall.SIGHUP,
178 syscall.SIGINT,
179 syscall.SIGTERM,
180 syscall.SIGQUIT:
Girish Kumarf56a4682020-03-20 20:07:46 +0000181 logger.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400182 exitChannel <- 0
183 default:
Girish Kumarf56a4682020-03-20 20:07:46 +0000184 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400185 exitChannel <- 1
186 }
187 }()
188
189 code := <-exitChannel
190 return code
191}
192
khenaidoo5c11af72018-07-20 17:21:05 -0400193func printBanner() {
194 fmt.Println(" ")
195 fmt.Println(" ______ ______ ")
196 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
197 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
198 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
199 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
200 fmt.Println(" ")
201}
202
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700203func printVersion() {
204 fmt.Println("VOLTHA Read-Write Core")
205 fmt.Println(version.VersionInfo.String(" "))
206}
207
khenaidoocfee5f42018-07-19 22:47:38 -0400208func main() {
209 start := time.Now()
210
211 cf := config.NewRWCoreFlags()
212 cf.ParseCommandArguments()
213
khenaidoo631fe542019-05-31 15:44:43 -0400214 // Set the instance ID as the hostname
npujar1d86a522019-11-14 17:11:16 +0530215 var instanceID string
khenaidoo631fe542019-05-31 15:44:43 -0400216 hostName := utils.GetHostName()
217 if len(hostName) > 0 {
npujar1d86a522019-11-14 17:11:16 +0530218 instanceID = hostName
khenaidoo631fe542019-05-31 15:44:43 -0400219 } else {
Girish Kumarf56a4682020-03-20 20:07:46 +0000220 logger.Fatal("HOSTNAME not set")
khenaidoo631fe542019-05-31 15:44:43 -0400221 }
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000222
Don Newton8eca4622020-02-10 16:44:48 -0500223 realMain()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000224
225 logLevel, err := log.StringToLogLevel(cf.LogLevel)
226 if err != nil {
227 panic(err)
228 }
229
khenaidoob9203542018-09-17 22:56:37 -0400230 //Setup default logger - applies for packages that do not have specific logger set
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000231 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": instanceID}); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000232 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
khenaidoocfee5f42018-07-19 22:47:38 -0400233 }
khenaidoob9203542018-09-17 22:56:37 -0400234
khenaidoo631fe542019-05-31 15:44:43 -0400235 // Update all loggers (provisioned via init) with a common field
npujar1d86a522019-11-14 17:11:16 +0530236 if err := log.UpdateAllLoggers(log.Fields{"instanceId": instanceID}); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000237 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
khenaidoob9203542018-09-17 22:56:37 -0400238 }
239
khenaidoo631fe542019-05-31 15:44:43 -0400240 // Update all loggers to log level specified as input parameter
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000241 log.SetAllLogLevel(logLevel)
khenaidoo2c6a0992019-04-29 13:46:56 -0400242
khenaidoo631fe542019-05-31 15:44:43 -0400243 //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidoob9203542018-09-17 22:56:37 -0400244
npujar1d86a522019-11-14 17:11:16 +0530245 defer func() {
246 err := log.CleanUp()
247 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000248 logger.Errorw("unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
npujar1d86a522019-11-14 17:11:16 +0530249 }
250 }()
khenaidoocfee5f42018-07-19 22:47:38 -0400251
khenaidoo631fe542019-05-31 15:44:43 -0400252 // Print version / build information and exit
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700253 if cf.DisplayVersionOnly {
254 printVersion()
255 return
256 }
257
khenaidoo5c11af72018-07-20 17:21:05 -0400258 // Print banner if specified
259 if cf.Banner {
260 printBanner()
261 }
262
Girish Kumarf56a4682020-03-20 20:07:46 +0000263 logger.Infow("rw-core-config", log.Fields{"config": *cf})
khenaidoo5c11af72018-07-20 17:21:05 -0400264
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700265 // Create the core
266 rw := newRWCore(cf)
267
268 // Create a context adding the status update channel
khenaidoo5c11af72018-07-20 17:21:05 -0400269 ctx, cancel := context.WithCancel(context.Background())
270 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400271
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700272 /*
273 * Create and start the liveness and readiness container management probes. This
274 * is done in the main function so just in case the main starts multiple other
275 * objects there can be a single probe end point for the process.
276 */
277 p := &probe.Probe{}
Kent Hagermanc4618832019-10-07 12:24:36 -0400278 go p.ListenAndServe(fmt.Sprintf("%s:%d", rw.config.ProbeHost, rw.config.ProbePort))
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700279
280 // Add the probe to the context to pass to all the services started
281 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
282
283 // Start the core
npujar1d86a522019-11-14 17:11:16 +0530284 go rw.start(probeCtx, instanceID)
khenaidoocfee5f42018-07-19 22:47:38 -0400285
286 code := waitForExit()
Girish Kumarf56a4682020-03-20 20:07:46 +0000287 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400288
289 // Cleanup before leaving
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700290 rw.stop(probeCtx)
khenaidoocfee5f42018-07-19 22:47:38 -0400291
292 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000293 logger.Infow("rw-core-run-time", log.Fields{"core": instanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400294}