blob: c06627551a6906359d2ea909014535f5b8ffc78a [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoocfee5f42018-07-19 22:47:38 -040017package main
18
19import (
khenaidoo5c11af72018-07-20 17:21:05 -040020 "context"
21 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040022 "fmt"
23 "os"
24 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040025 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040026 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040027 "time"
npujar1d86a522019-11-14 17:11:16 +053028
29 "github.com/opencord/voltha-go/rw_core/config"
30 c "github.com/opencord/voltha-go/rw_core/core"
31 "github.com/opencord/voltha-go/rw_core/utils"
32 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
34 "github.com/opencord/voltha-lib-go/v2/pkg/log"
35 "github.com/opencord/voltha-lib-go/v2/pkg/probe"
36 "github.com/opencord/voltha-lib-go/v2/pkg/version"
37 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
khenaidoocfee5f42018-07-19 22:47:38 -040038)
39
40type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040041 kvClient kvstore.Client
42 config *config.RWCoreFlags
43 halted bool
44 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040045 //kmp *kafka.KafkaMessagingProxy
khenaidoo43c82122018-11-22 18:38:28 -050046 kafkaClient kafka.Client
47 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040048 //For test
khenaidoo79232702018-12-04 11:00:41 -050049 receiverChannels []<-chan *ic.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040050}
51
khenaidoob9203542018-09-17 22:56:37 -040052func init() {
npujar1d86a522019-11-14 17:11:16 +053053 _, err := log.AddPackage(log.JSON, log.DebugLevel, nil)
54 if err != nil {
55 log.Errorw("unable-to-register-package-to-the-log-map", log.Fields{"error": err})
56 }
khenaidoob9203542018-09-17 22:56:37 -040057}
58
khenaidoocfee5f42018-07-19 22:47:38 -040059func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
60
khenaidoo5c11af72018-07-20 17:21:05 -040061 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040062 switch storeType {
63 case "consul":
64 return kvstore.NewConsulClient(address, timeout)
65 case "etcd":
66 return kvstore.NewEtcdClient(address, timeout)
67 }
68 return nil, errors.New("unsupported-kv-store")
69}
70
Scott Bakeree6a0872019-10-29 15:59:52 -070071func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
khenaidoo43c82122018-11-22 18:38:28 -050072
73 log.Infow("kafka-client-type", log.Fields{"client": clientType})
74 switch clientType {
75 case "sarama":
76 return kafka.NewSaramaClient(
77 kafka.Host(host),
khenaidoo90847922018-12-03 14:47:51 -050078 kafka.Port(port),
khenaidooca301322019-01-09 23:06:32 -050079 kafka.ConsumerType(kafka.GroupCustomer),
khenaidoo90847922018-12-03 14:47:51 -050080 kafka.ProducerReturnOnErrors(true),
81 kafka.ProducerReturnOnSuccess(true),
82 kafka.ProducerMaxRetries(6),
khenaidooca301322019-01-09 23:06:32 -050083 kafka.NumPartitions(3),
84 kafka.ConsumerGroupName(instanceID),
85 kafka.ConsumerGroupPrefix(instanceID),
khenaidoo54e0ddf2019-02-27 16:21:33 -050086 kafka.AutoCreateTopic(true),
khenaidooca301322019-01-09 23:06:32 -050087 kafka.ProducerFlushFrequency(5),
Scott Bakeree6a0872019-10-29 15:59:52 -070088 kafka.ProducerRetryBackoff(time.Millisecond*30),
89 kafka.LivenessChannelInterval(livenessChannelInterval),
90 ), nil
khenaidoo43c82122018-11-22 18:38:28 -050091 }
92 return nil, errors.New("unsupported-client-type")
93}
94
khenaidoocfee5f42018-07-19 22:47:38 -040095func newRWCore(cf *config.RWCoreFlags) *rwCore {
96 var rwCore rwCore
97 rwCore.config = cf
98 rwCore.halted = false
99 rwCore.exitChannel = make(chan int, 1)
khenaidoo79232702018-12-04 11:00:41 -0500100 rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -0400101 return &rwCore
102}
103
khenaidoob9203542018-09-17 22:56:37 -0400104func (rw *rwCore) setKVClient() error {
105 addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
106 client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
khenaidoocfee5f42018-07-19 22:47:38 -0400107 if err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400108 rw.kvClient = nil
khenaidoocfee5f42018-07-19 22:47:38 -0400109 log.Error(err)
110 return err
111 }
khenaidoob9203542018-09-17 22:56:37 -0400112 rw.kvClient = client
khenaidoocfee5f42018-07-19 22:47:38 -0400113 return nil
114}
115
npujar1d86a522019-11-14 17:11:16 +0530116func (rw *rwCore) start(ctx context.Context, instanceID string) {
khenaidoo5c11af72018-07-20 17:21:05 -0400117 log.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400118
khenaidoo90847922018-12-03 14:47:51 -0500119 // Setup KV Client
Richard Jankowskie4d77662018-10-17 13:53:21 -0400120 log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
121 err := rw.setKVClient()
122 if err == nil {
123 // Setup KV transaction context
khenaidoo9cdc1a62019-01-24 21:57:40 -0500124 txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
npujar1d86a522019-11-14 17:11:16 +0530125 if err = c.SetTransactionContext(instanceID,
khenaidoo9cdc1a62019-01-24 21:57:40 -0500126 txnPrefix,
Richard Jankowskie4d77662018-10-17 13:53:21 -0400127 rw.kvClient,
khenaidoo09771ef2019-10-11 14:25:02 -0400128 rw.config.KVStoreTimeout); err != nil {
khenaidoo9cdc1a62019-01-24 21:57:40 -0500129 log.Fatal("creating-transaction-context-failed")
130 }
Richard Jankowskie4d77662018-10-17 13:53:21 -0400131 }
132
khenaidoo90847922018-12-03 14:47:51 -0500133 // Setup Kafka Client
Scott Bakeree6a0872019-10-29 15:59:52 -0700134 if rw.kafkaClient, err = newKafkaClient("sarama",
135 rw.config.KafkaAdapterHost,
136 rw.config.KafkaAdapterPort,
npujar1d86a522019-11-14 17:11:16 +0530137 instanceID,
Girish Kumar4d3887d2019-11-22 14:22:05 +0000138 rw.config.LiveProbeInterval/2); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500139 log.Fatal("Unsupported-kafka-client")
140 }
141
khenaidoob9203542018-09-17 22:56:37 -0400142 // Create the core service
npujar1d86a522019-11-14 17:11:16 +0530143 rw.core = c.NewCore(instanceID, rw.config, rw.kvClient, rw.kafkaClient)
khenaidoob9203542018-09-17 22:56:37 -0400144
145 // start the core
146 rw.core.Start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400147}
148
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700149func (rw *rwCore) stop(ctx context.Context) {
khenaidoocfee5f42018-07-19 22:47:38 -0400150 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400151 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400152
153 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400154 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400155
156 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400157 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400158 // Release all reservations
khenaidoob9203542018-09-17 22:56:37 -0400159 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400160 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400161 }
162 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400163 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400164 }
khenaidoo43c82122018-11-22 18:38:28 -0500165
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700166 rw.core.Stop(ctx)
khenaidoo43c82122018-11-22 18:38:28 -0500167
168 //if rw.kafkaClient != nil {
169 // rw.kafkaClient.Stop()
170 //}
khenaidoocfee5f42018-07-19 22:47:38 -0400171}
172
173func waitForExit() int {
174 signalChannel := make(chan os.Signal, 1)
175 signal.Notify(signalChannel,
176 syscall.SIGHUP,
177 syscall.SIGINT,
178 syscall.SIGTERM,
179 syscall.SIGQUIT)
180
181 exitChannel := make(chan int)
182
183 go func() {
184 s := <-signalChannel
185 switch s {
186 case syscall.SIGHUP,
187 syscall.SIGINT,
188 syscall.SIGTERM,
189 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400190 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400191 exitChannel <- 0
192 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400193 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400194 exitChannel <- 1
195 }
196 }()
197
198 code := <-exitChannel
199 return code
200}
201
khenaidoo5c11af72018-07-20 17:21:05 -0400202func printBanner() {
203 fmt.Println(" ")
204 fmt.Println(" ______ ______ ")
205 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
206 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
207 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
208 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
209 fmt.Println(" ")
210}
211
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700212func printVersion() {
213 fmt.Println("VOLTHA Read-Write Core")
214 fmt.Println(version.VersionInfo.String(" "))
215}
216
khenaidoocfee5f42018-07-19 22:47:38 -0400217func main() {
218 start := time.Now()
219
220 cf := config.NewRWCoreFlags()
221 cf.ParseCommandArguments()
222
khenaidoo631fe542019-05-31 15:44:43 -0400223 // Set the instance ID as the hostname
npujar1d86a522019-11-14 17:11:16 +0530224 var instanceID string
khenaidoo631fe542019-05-31 15:44:43 -0400225 hostName := utils.GetHostName()
226 if len(hostName) > 0 {
npujar1d86a522019-11-14 17:11:16 +0530227 instanceID = hostName
khenaidoo631fe542019-05-31 15:44:43 -0400228 } else {
229 log.Fatal("HOSTNAME not set")
230 }
khenaidoob9203542018-09-17 22:56:37 -0400231
232 //Setup default logger - applies for packages that do not have specific logger set
npujar1d86a522019-11-14 17:11:16 +0530233 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": instanceID}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400234 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
235 }
khenaidoob9203542018-09-17 22:56:37 -0400236
khenaidoo631fe542019-05-31 15:44:43 -0400237 // Update all loggers (provisioned via init) with a common field
npujar1d86a522019-11-14 17:11:16 +0530238 if err := log.UpdateAllLoggers(log.Fields{"instanceId": instanceID}); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400239 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
240 }
241
khenaidoo631fe542019-05-31 15:44:43 -0400242 // Update all loggers to log level specified as input parameter
243 log.SetAllLogLevel(cf.LogLevel)
khenaidoo2c6a0992019-04-29 13:46:56 -0400244
khenaidoo631fe542019-05-31 15:44:43 -0400245 //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidoob9203542018-09-17 22:56:37 -0400246
npujar1d86a522019-11-14 17:11:16 +0530247 defer func() {
248 err := log.CleanUp()
249 if err != nil {
250 log.Errorw("unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
251 }
252 }()
khenaidoocfee5f42018-07-19 22:47:38 -0400253
khenaidoo631fe542019-05-31 15:44:43 -0400254 // Print version / build information and exit
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700255 if cf.DisplayVersionOnly {
256 printVersion()
257 return
258 }
259
khenaidoo5c11af72018-07-20 17:21:05 -0400260 // Print banner if specified
261 if cf.Banner {
262 printBanner()
263 }
264
265 log.Infow("rw-core-config", log.Fields{"config": *cf})
266
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700267 // Create the core
268 rw := newRWCore(cf)
269
270 // Create a context adding the status update channel
khenaidoo5c11af72018-07-20 17:21:05 -0400271 ctx, cancel := context.WithCancel(context.Background())
272 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400273
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700274 /*
275 * Create and start the liveness and readiness container management probes. This
276 * is done in the main function so just in case the main starts multiple other
277 * objects there can be a single probe end point for the process.
278 */
279 p := &probe.Probe{}
Kent Hagermanc4618832019-10-07 12:24:36 -0400280 go p.ListenAndServe(fmt.Sprintf("%s:%d", rw.config.ProbeHost, rw.config.ProbePort))
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700281
282 // Add the probe to the context to pass to all the services started
283 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
284
285 // Start the core
npujar1d86a522019-11-14 17:11:16 +0530286 go rw.start(probeCtx, instanceID)
khenaidoocfee5f42018-07-19 22:47:38 -0400287
288 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400289 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400290
291 // Cleanup before leaving
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700292 rw.stop(probeCtx)
khenaidoocfee5f42018-07-19 22:47:38 -0400293
294 elapsed := time.Since(start)
npujar1d86a522019-11-14 17:11:16 +0530295 log.Infow("rw-core-run-time", log.Fields{"core": instanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400296}