blob: 2311029e6a4820ba231e9417e623152b36de145a [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package main
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "context"
20 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040021 "fmt"
khenaidoobf6e7bb2018-08-14 22:27:29 -040022 grpcserver "github.com/opencord/voltha-go/common/grpc"
khenaidoo5c11af72018-07-20 17:21:05 -040023 "github.com/opencord/voltha-go/common/log"
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070024 "github.com/opencord/voltha-go/common/probe"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070025 "github.com/opencord/voltha-go/common/version"
khenaidoo5c11af72018-07-20 17:21:05 -040026 "github.com/opencord/voltha-go/db/kvstore"
khenaidoo43c82122018-11-22 18:38:28 -050027 "github.com/opencord/voltha-go/kafka"
khenaidoo5c11af72018-07-20 17:21:05 -040028 "github.com/opencord/voltha-go/rw_core/config"
khenaidoob9203542018-09-17 22:56:37 -040029 c "github.com/opencord/voltha-go/rw_core/core"
khenaidoo631fe542019-05-31 15:44:43 -040030 "github.com/opencord/voltha-go/rw_core/utils"
khenaidoo2c6a0992019-04-29 13:46:56 -040031 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoocfee5f42018-07-19 22:47:38 -040032 "os"
33 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040034 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040035 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040036 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040037)
38
39type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040040 kvClient kvstore.Client
41 config *config.RWCoreFlags
42 halted bool
43 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040044 //kmp *kafka.KafkaMessagingProxy
khenaidoo43c82122018-11-22 18:38:28 -050045 grpcServer *grpcserver.GrpcServer
46 kafkaClient kafka.Client
47 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040048 //For test
khenaidoo79232702018-12-04 11:00:41 -050049 receiverChannels []<-chan *ic.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040050}
51
khenaidoob9203542018-09-17 22:56:37 -040052func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040053 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoob9203542018-09-17 22:56:37 -040054}
55
khenaidoocfee5f42018-07-19 22:47:38 -040056func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
57
khenaidoo5c11af72018-07-20 17:21:05 -040058 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040059 switch storeType {
60 case "consul":
61 return kvstore.NewConsulClient(address, timeout)
62 case "etcd":
63 return kvstore.NewEtcdClient(address, timeout)
64 }
65 return nil, errors.New("unsupported-kv-store")
66}
67
khenaidooca301322019-01-09 23:06:32 -050068func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
khenaidoo43c82122018-11-22 18:38:28 -050069
70 log.Infow("kafka-client-type", log.Fields{"client": clientType})
71 switch clientType {
72 case "sarama":
73 return kafka.NewSaramaClient(
74 kafka.Host(host),
khenaidoo90847922018-12-03 14:47:51 -050075 kafka.Port(port),
khenaidooca301322019-01-09 23:06:32 -050076 kafka.ConsumerType(kafka.GroupCustomer),
khenaidoo90847922018-12-03 14:47:51 -050077 kafka.ProducerReturnOnErrors(true),
78 kafka.ProducerReturnOnSuccess(true),
79 kafka.ProducerMaxRetries(6),
khenaidooca301322019-01-09 23:06:32 -050080 kafka.NumPartitions(3),
81 kafka.ConsumerGroupName(instanceID),
82 kafka.ConsumerGroupPrefix(instanceID),
khenaidoo54e0ddf2019-02-27 16:21:33 -050083 kafka.AutoCreateTopic(true),
khenaidooca301322019-01-09 23:06:32 -050084 kafka.ProducerFlushFrequency(5),
khenaidoo79232702018-12-04 11:00:41 -050085 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
khenaidoo43c82122018-11-22 18:38:28 -050086 }
87 return nil, errors.New("unsupported-client-type")
88}
89
khenaidoocfee5f42018-07-19 22:47:38 -040090func newRWCore(cf *config.RWCoreFlags) *rwCore {
91 var rwCore rwCore
92 rwCore.config = cf
93 rwCore.halted = false
94 rwCore.exitChannel = make(chan int, 1)
khenaidoo79232702018-12-04 11:00:41 -050095 rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040096 return &rwCore
97}
98
khenaidoob9203542018-09-17 22:56:37 -040099func (rw *rwCore) setKVClient() error {
100 addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
101 client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
khenaidoocfee5f42018-07-19 22:47:38 -0400102 if err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400103 rw.kvClient = nil
khenaidoocfee5f42018-07-19 22:47:38 -0400104 log.Error(err)
105 return err
106 }
khenaidoob9203542018-09-17 22:56:37 -0400107 rw.kvClient = client
khenaidoocfee5f42018-07-19 22:47:38 -0400108 return nil
109}
110
khenaidoocfee5f42018-07-19 22:47:38 -0400111func toString(value interface{}) (string, error) {
112 switch t := value.(type) {
113 case []byte:
114 return string(value.([]byte)), nil
115 case string:
116 return value.(string), nil
117 default:
118 return "", fmt.Errorf("unexpected-type-%T", t)
119 }
120}
121
khenaidoo631fe542019-05-31 15:44:43 -0400122func (rw *rwCore) start(ctx context.Context, instanceId string) {
khenaidoo5c11af72018-07-20 17:21:05 -0400123 log.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400124
khenaidoo90847922018-12-03 14:47:51 -0500125 // Setup KV Client
Richard Jankowskie4d77662018-10-17 13:53:21 -0400126 log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
127 err := rw.setKVClient()
128 if err == nil {
129 // Setup KV transaction context
khenaidoo9cdc1a62019-01-24 21:57:40 -0500130 txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
khenaidoo631fe542019-05-31 15:44:43 -0400131 if err = c.SetTransactionContext(instanceId,
khenaidoo9cdc1a62019-01-24 21:57:40 -0500132 txnPrefix,
Richard Jankowskie4d77662018-10-17 13:53:21 -0400133 rw.kvClient,
134 rw.config.KVStoreTimeout,
Richard Jankowski199fd862019-03-18 14:49:51 -0400135 rw.config.KVTxnKeyDelTime,
khenaidoo1ce37ad2019-03-24 22:07:24 -0400136 1); err != nil {
khenaidoo9cdc1a62019-01-24 21:57:40 -0500137 log.Fatal("creating-transaction-context-failed")
138 }
Richard Jankowskie4d77662018-10-17 13:53:21 -0400139 }
140
khenaidoo90847922018-12-03 14:47:51 -0500141 // Setup Kafka Client
khenaidoo631fe542019-05-31 15:44:43 -0400142 if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, instanceId); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500143 log.Fatal("Unsupported-kafka-client")
144 }
145
khenaidoob9203542018-09-17 22:56:37 -0400146 // Create the core service
khenaidoo631fe542019-05-31 15:44:43 -0400147 rw.core = c.NewCore(instanceId, rw.config, rw.kvClient, rw.kafkaClient)
khenaidoob9203542018-09-17 22:56:37 -0400148
149 // start the core
150 rw.core.Start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400151}
152
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700153func (rw *rwCore) stop(ctx context.Context) {
khenaidoocfee5f42018-07-19 22:47:38 -0400154 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400155 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400156
157 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400158 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400159
160 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400161 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400162 // Release all reservations
khenaidoob9203542018-09-17 22:56:37 -0400163 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400164 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400165 }
166 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400167 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400168 }
khenaidoo43c82122018-11-22 18:38:28 -0500169
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700170 rw.core.Stop(ctx)
khenaidoo43c82122018-11-22 18:38:28 -0500171
172 //if rw.kafkaClient != nil {
173 // rw.kafkaClient.Stop()
174 //}
khenaidoocfee5f42018-07-19 22:47:38 -0400175}
176
177func waitForExit() int {
178 signalChannel := make(chan os.Signal, 1)
179 signal.Notify(signalChannel,
180 syscall.SIGHUP,
181 syscall.SIGINT,
182 syscall.SIGTERM,
183 syscall.SIGQUIT)
184
185 exitChannel := make(chan int)
186
187 go func() {
188 s := <-signalChannel
189 switch s {
190 case syscall.SIGHUP,
191 syscall.SIGINT,
192 syscall.SIGTERM,
193 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400194 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400195 exitChannel <- 0
196 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400197 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400198 exitChannel <- 1
199 }
200 }()
201
202 code := <-exitChannel
203 return code
204}
205
khenaidoo5c11af72018-07-20 17:21:05 -0400206func printBanner() {
207 fmt.Println(" ")
208 fmt.Println(" ______ ______ ")
209 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
210 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
211 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
212 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
213 fmt.Println(" ")
214}
215
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700216func printVersion() {
217 fmt.Println("VOLTHA Read-Write Core")
218 fmt.Println(version.VersionInfo.String(" "))
219}
220
khenaidoocfee5f42018-07-19 22:47:38 -0400221func main() {
222 start := time.Now()
223
224 cf := config.NewRWCoreFlags()
225 cf.ParseCommandArguments()
226
khenaidoo631fe542019-05-31 15:44:43 -0400227 // Set the instance ID as the hostname
228 var instanceId string
229 hostName := utils.GetHostName()
230 if len(hostName) > 0 {
231 instanceId = hostName
232 } else {
233 log.Fatal("HOSTNAME not set")
234 }
khenaidoob9203542018-09-17 22:56:37 -0400235
236 //Setup default logger - applies for packages that do not have specific logger set
khenaidoo631fe542019-05-31 15:44:43 -0400237 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": instanceId}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400238 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
239 }
khenaidoob9203542018-09-17 22:56:37 -0400240
khenaidoo631fe542019-05-31 15:44:43 -0400241 // Update all loggers (provisioned via init) with a common field
242 if err := log.UpdateAllLoggers(log.Fields{"instanceId": instanceId}); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400243 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
244 }
245
khenaidoo631fe542019-05-31 15:44:43 -0400246 // Update all loggers to log level specified as input parameter
247 log.SetAllLogLevel(cf.LogLevel)
khenaidoo2c6a0992019-04-29 13:46:56 -0400248
khenaidoo631fe542019-05-31 15:44:43 -0400249 //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidoob9203542018-09-17 22:56:37 -0400250
khenaidoocfee5f42018-07-19 22:47:38 -0400251 defer log.CleanUp()
252
khenaidoo631fe542019-05-31 15:44:43 -0400253 // Print version / build information and exit
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700254 if cf.DisplayVersionOnly {
255 printVersion()
256 return
257 }
258
khenaidoo5c11af72018-07-20 17:21:05 -0400259 // Print banner if specified
260 if cf.Banner {
261 printBanner()
262 }
263
264 log.Infow("rw-core-config", log.Fields{"config": *cf})
265
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700266 // Create the core
267 rw := newRWCore(cf)
268
269 // Create a context adding the status update channel
khenaidoo5c11af72018-07-20 17:21:05 -0400270 ctx, cancel := context.WithCancel(context.Background())
271 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400272
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700273 /*
274 * Create and start the liveness and readiness container management probes. This
275 * is done in the main function so just in case the main starts multiple other
276 * objects there can be a single probe end point for the process.
277 */
278 p := &probe.Probe{}
Kent Hagermanc4618832019-10-07 12:24:36 -0400279 go p.ListenAndServe(fmt.Sprintf("%s:%d", rw.config.ProbeHost, rw.config.ProbePort))
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700280
281 // Add the probe to the context to pass to all the services started
282 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
283
284 // Start the core
285 go rw.start(probeCtx, instanceId)
khenaidoocfee5f42018-07-19 22:47:38 -0400286
287 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400288 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400289
290 // Cleanup before leaving
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700291 rw.stop(probeCtx)
khenaidoocfee5f42018-07-19 22:47:38 -0400292
293 elapsed := time.Since(start)
khenaidoo631fe542019-05-31 15:44:43 -0400294 log.Infow("rw-core-run-time", log.Fields{"core": instanceId, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400295}