blob: 336e7316b5f0c43d536b1fe52553f0482e1f796e [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package main
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "context"
20 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040021 "fmt"
khenaidoobf6e7bb2018-08-14 22:27:29 -040022 grpcserver "github.com/opencord/voltha-go/common/grpc"
khenaidoo5c11af72018-07-20 17:21:05 -040023 "github.com/opencord/voltha-go/common/log"
24 "github.com/opencord/voltha-go/db/kvstore"
khenaidoo43c82122018-11-22 18:38:28 -050025 "github.com/opencord/voltha-go/kafka"
khenaidoo79232702018-12-04 11:00:41 -050026 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo5c11af72018-07-20 17:21:05 -040027 "github.com/opencord/voltha-go/rw_core/config"
khenaidoob9203542018-09-17 22:56:37 -040028 c "github.com/opencord/voltha-go/rw_core/core"
khenaidoocfee5f42018-07-19 22:47:38 -040029 "os"
30 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040031 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040032 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040033 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040034)
35
36type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040037 kvClient kvstore.Client
38 config *config.RWCoreFlags
39 halted bool
40 exitChannel chan int
khenaidoob9203542018-09-17 22:56:37 -040041 //kmp *kafka.KafkaMessagingProxy
khenaidoo43c82122018-11-22 18:38:28 -050042 grpcServer *grpcserver.GrpcServer
43 kafkaClient kafka.Client
44 core *c.Core
khenaidooabad44c2018-08-03 16:58:35 -040045 //For test
khenaidoo79232702018-12-04 11:00:41 -050046 receiverChannels []<-chan *ic.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040047}
48
khenaidoob9203542018-09-17 22:56:37 -040049func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040050 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoob9203542018-09-17 22:56:37 -040051}
52
khenaidoocfee5f42018-07-19 22:47:38 -040053func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
54
khenaidoo5c11af72018-07-20 17:21:05 -040055 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040056 switch storeType {
57 case "consul":
58 return kvstore.NewConsulClient(address, timeout)
59 case "etcd":
60 return kvstore.NewEtcdClient(address, timeout)
61 }
62 return nil, errors.New("unsupported-kv-store")
63}
64
khenaidooca301322019-01-09 23:06:32 -050065func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
khenaidoo43c82122018-11-22 18:38:28 -050066
67 log.Infow("kafka-client-type", log.Fields{"client": clientType})
68 switch clientType {
69 case "sarama":
70 return kafka.NewSaramaClient(
71 kafka.Host(host),
khenaidoo90847922018-12-03 14:47:51 -050072 kafka.Port(port),
khenaidooca301322019-01-09 23:06:32 -050073 kafka.ConsumerType(kafka.GroupCustomer),
khenaidoo90847922018-12-03 14:47:51 -050074 kafka.ProducerReturnOnErrors(true),
75 kafka.ProducerReturnOnSuccess(true),
76 kafka.ProducerMaxRetries(6),
khenaidooca301322019-01-09 23:06:32 -050077 kafka.NumPartitions(3),
78 kafka.ConsumerGroupName(instanceID),
79 kafka.ConsumerGroupPrefix(instanceID),
80 kafka.AutoCreateTopic(false),
81 kafka.ProducerFlushFrequency(5),
khenaidoo79232702018-12-04 11:00:41 -050082 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
khenaidoo43c82122018-11-22 18:38:28 -050083 }
84 return nil, errors.New("unsupported-client-type")
85}
86
khenaidoocfee5f42018-07-19 22:47:38 -040087func newRWCore(cf *config.RWCoreFlags) *rwCore {
88 var rwCore rwCore
89 rwCore.config = cf
90 rwCore.halted = false
91 rwCore.exitChannel = make(chan int, 1)
khenaidoo79232702018-12-04 11:00:41 -050092 rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040093 return &rwCore
94}
95
khenaidoob9203542018-09-17 22:56:37 -040096func (rw *rwCore) setKVClient() error {
97 addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
98 client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
khenaidoocfee5f42018-07-19 22:47:38 -040099 if err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400100 rw.kvClient = nil
khenaidoocfee5f42018-07-19 22:47:38 -0400101 log.Error(err)
102 return err
103 }
khenaidoob9203542018-09-17 22:56:37 -0400104 rw.kvClient = client
khenaidoocfee5f42018-07-19 22:47:38 -0400105 return nil
106}
107
khenaidoocfee5f42018-07-19 22:47:38 -0400108func toString(value interface{}) (string, error) {
109 switch t := value.(type) {
110 case []byte:
111 return string(value.([]byte)), nil
112 case string:
113 return value.(string), nil
114 default:
115 return "", fmt.Errorf("unexpected-type-%T", t)
116 }
117}
118
khenaidoob9203542018-09-17 22:56:37 -0400119func (rw *rwCore) start(ctx context.Context) {
khenaidoo5c11af72018-07-20 17:21:05 -0400120 log.Info("Starting RW Core components")
khenaidoob9203542018-09-17 22:56:37 -0400121
khenaidoo90847922018-12-03 14:47:51 -0500122 // Setup KV Client
Richard Jankowskie4d77662018-10-17 13:53:21 -0400123 log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
124 err := rw.setKVClient()
125 if err == nil {
126 // Setup KV transaction context
127 c.SetTransactionContext(rw.config.InstanceID,
128 "service/voltha/transactions/",
129 rw.kvClient,
130 rw.config.KVStoreTimeout,
131 rw.config.KVTxnKeyDelTime)
132 }
133
khenaidoo90847922018-12-03 14:47:51 -0500134 // Setup Kafka Client
khenaidooca301322019-01-09 23:06:32 -0500135 if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, rw.config.InstanceID); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500136 log.Fatal("Unsupported-kafka-client")
137 }
138
khenaidoob9203542018-09-17 22:56:37 -0400139 // Create the core service
khenaidoo43c82122018-11-22 18:38:28 -0500140 rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient, rw.kafkaClient)
khenaidoob9203542018-09-17 22:56:37 -0400141
142 // start the core
143 rw.core.Start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400144}
145
khenaidoob9203542018-09-17 22:56:37 -0400146func (rw *rwCore) stop() {
khenaidoocfee5f42018-07-19 22:47:38 -0400147 // Stop leadership tracking
khenaidoob9203542018-09-17 22:56:37 -0400148 rw.halted = true
khenaidoocfee5f42018-07-19 22:47:38 -0400149
150 // send exit signal
khenaidoob9203542018-09-17 22:56:37 -0400151 rw.exitChannel <- 0
khenaidoocfee5f42018-07-19 22:47:38 -0400152
153 // Cleanup - applies only if we had a kvClient
khenaidoob9203542018-09-17 22:56:37 -0400154 if rw.kvClient != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400155 // Release all reservations
khenaidoob9203542018-09-17 22:56:37 -0400156 if err := rw.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400157 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400158 }
159 // Close the DB connection
khenaidoob9203542018-09-17 22:56:37 -0400160 rw.kvClient.Close()
khenaidoocfee5f42018-07-19 22:47:38 -0400161 }
khenaidoo43c82122018-11-22 18:38:28 -0500162
163 rw.core.Stop(nil)
164
165 //if rw.kafkaClient != nil {
166 // rw.kafkaClient.Stop()
167 //}
khenaidoocfee5f42018-07-19 22:47:38 -0400168}
169
170func waitForExit() int {
171 signalChannel := make(chan os.Signal, 1)
172 signal.Notify(signalChannel,
173 syscall.SIGHUP,
174 syscall.SIGINT,
175 syscall.SIGTERM,
176 syscall.SIGQUIT)
177
178 exitChannel := make(chan int)
179
180 go func() {
181 s := <-signalChannel
182 switch s {
183 case syscall.SIGHUP,
184 syscall.SIGINT,
185 syscall.SIGTERM,
186 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400187 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400188 exitChannel <- 0
189 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400190 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400191 exitChannel <- 1
192 }
193 }()
194
195 code := <-exitChannel
196 return code
197}
198
khenaidoo5c11af72018-07-20 17:21:05 -0400199func printBanner() {
200 fmt.Println(" ")
201 fmt.Println(" ______ ______ ")
202 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
203 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
204 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
205 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
206 fmt.Println(" ")
207}
208
khenaidoocfee5f42018-07-19 22:47:38 -0400209func main() {
210 start := time.Now()
211
212 cf := config.NewRWCoreFlags()
213 cf.ParseCommandArguments()
214
khenaidoob9203542018-09-17 22:56:37 -0400215 //// Setup logging
216
217 //Setup default logger - applies for packages that do not have specific logger set
218 if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400219 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
220 }
khenaidoob9203542018-09-17 22:56:37 -0400221
222 // Update all loggers (provisionned via init) with a common field
223 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
224 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
225 }
226
227 log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
khenaidooca301322019-01-09 23:06:32 -0500228 log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
khenaidoob9203542018-09-17 22:56:37 -0400229
khenaidoocfee5f42018-07-19 22:47:38 -0400230 defer log.CleanUp()
231
khenaidoo5c11af72018-07-20 17:21:05 -0400232 // Print banner if specified
233 if cf.Banner {
234 printBanner()
235 }
236
237 log.Infow("rw-core-config", log.Fields{"config": *cf})
238
239 ctx, cancel := context.WithCancel(context.Background())
240 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400241
khenaidoob9203542018-09-17 22:56:37 -0400242 rw := newRWCore(cf)
243 go rw.start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400244
245 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400246 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400247
248 // Cleanup before leaving
khenaidoob9203542018-09-17 22:56:37 -0400249 rw.stop()
khenaidoocfee5f42018-07-19 22:47:38 -0400250
251 elapsed := time.Since(start)
khenaidoob9203542018-09-17 22:56:37 -0400252 log.Infow("rw-core-run-time", log.Fields{"core": rw.config.InstanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400253}