blob: a0928c5817525473998f6d8eb8ed3b351a32391f [file] [log] [blame]
khenaidoocfee5f42018-07-19 22:47:38 -04001package main
2
3import (
khenaidoo5c11af72018-07-20 17:21:05 -04004 "context"
5 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -04006 "fmt"
khenaidoo5c11af72018-07-20 17:21:05 -04007 "github.com/opencord/voltha-go/common/log"
8 "github.com/opencord/voltha-go/db/kvstore"
khenaidooabad44c2018-08-03 16:58:35 -04009 "github.com/opencord/voltha-go/kafka"
10 ca "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo5c11af72018-07-20 17:21:05 -040011 "github.com/opencord/voltha-go/rw_core/config"
khenaidoocfee5f42018-07-19 22:47:38 -040012 "os"
13 "os/signal"
khenaidoocfee5f42018-07-19 22:47:38 -040014 "strconv"
khenaidoocfee5f42018-07-19 22:47:38 -040015 "syscall"
khenaidoo5c11af72018-07-20 17:21:05 -040016 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040017)
18
19type rwCore struct {
khenaidoo5c11af72018-07-20 17:21:05 -040020 kvClient kvstore.Client
21 config *config.RWCoreFlags
22 halted bool
23 exitChannel chan int
khenaidooabad44c2018-08-03 16:58:35 -040024 kmp *kafka.KafkaMessagingProxy
25 //For test
26 receiverChannels []<-chan *ca.InterContainerMessage
khenaidoocfee5f42018-07-19 22:47:38 -040027}
28
29func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
30
khenaidoo5c11af72018-07-20 17:21:05 -040031 log.Infow("kv-store-type", log.Fields{"store": storeType})
khenaidoocfee5f42018-07-19 22:47:38 -040032 switch storeType {
33 case "consul":
34 return kvstore.NewConsulClient(address, timeout)
35 case "etcd":
36 return kvstore.NewEtcdClient(address, timeout)
37 }
38 return nil, errors.New("unsupported-kv-store")
39}
40
41func newRWCore(cf *config.RWCoreFlags) *rwCore {
42 var rwCore rwCore
43 rwCore.config = cf
44 rwCore.halted = false
45 rwCore.exitChannel = make(chan int, 1)
khenaidooabad44c2018-08-03 16:58:35 -040046 rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
khenaidoocfee5f42018-07-19 22:47:38 -040047 return &rwCore
48}
49
50func (core *rwCore) setKVClient() error {
51 addr := core.config.KVStoreHost + ":" + strconv.Itoa(core.config.KVStorePort)
52 client, err := newKVClient(core.config.KVStoreType, addr, core.config.KVStoreTimeout)
53 if err != nil {
54 log.Error(err)
55 return err
56 }
57 core.kvClient = client
58 return nil
59}
60
khenaidoocfee5f42018-07-19 22:47:38 -040061func toString(value interface{}) (string, error) {
62 switch t := value.(type) {
63 case []byte:
64 return string(value.([]byte)), nil
65 case string:
66 return value.(string), nil
67 default:
68 return "", fmt.Errorf("unexpected-type-%T", t)
69 }
70}
71
khenaidooabad44c2018-08-03 16:58:35 -040072
khenaidoo5c11af72018-07-20 17:21:05 -040073func (core *rwCore) start(ctx context.Context) {
74 log.Info("Starting RW Core components")
75 // Setup GRPC Server
khenaidoocfee5f42018-07-19 22:47:38 -040076
khenaidoo5c11af72018-07-20 17:21:05 -040077 // Setup KV Client
khenaidoocfee5f42018-07-19 22:47:38 -040078
khenaidoo5c11af72018-07-20 17:21:05 -040079 // Setup Kafka messaging services
khenaidooabad44c2018-08-03 16:58:35 -040080 var err error
81 if core.kmp, err = kafka.NewKafkaMessagingProxy(
82 kafka.KafkaHost("10.100.198.220"),
83 kafka.KafkaPort(9092),
84 kafka.DefaultTopic(&kafka.Topic{Name: "Adapter"})); err != nil {
85 log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
86 return
87 }
88 // Start the kafka messaging service - synchronous call to ensure
89 if err = core.kmp.Start(); err != nil {
90 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
91 }
khenaidoocfee5f42018-07-19 22:47:38 -040092}
93
khenaidoocfee5f42018-07-19 22:47:38 -040094func (core *rwCore) stop() {
95 // Stop leadership tracking
96 core.halted = true
97
khenaidooabad44c2018-08-03 16:58:35 -040098 // Stop the Kafka messaging service
99 if core.kmp != nil {
100 core.kmp.Stop()
101 }
102
khenaidoocfee5f42018-07-19 22:47:38 -0400103 // send exit signal
104 core.exitChannel <- 0
105
106 // Cleanup - applies only if we had a kvClient
107 if core.kvClient != nil {
108 // Release all reservations
109 if err := core.kvClient.ReleaseAllReservations(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400110 log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400111 }
112 // Close the DB connection
113 core.kvClient.Close()
114 }
115}
116
117func waitForExit() int {
118 signalChannel := make(chan os.Signal, 1)
119 signal.Notify(signalChannel,
120 syscall.SIGHUP,
121 syscall.SIGINT,
122 syscall.SIGTERM,
123 syscall.SIGQUIT)
124
125 exitChannel := make(chan int)
126
127 go func() {
128 s := <-signalChannel
129 switch s {
130 case syscall.SIGHUP,
131 syscall.SIGINT,
132 syscall.SIGTERM,
133 syscall.SIGQUIT:
khenaidoo5c11af72018-07-20 17:21:05 -0400134 log.Infow("closing-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400135 exitChannel <- 0
136 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400137 log.Infow("unexpected-signal-received", log.Fields{"signal": s})
khenaidoocfee5f42018-07-19 22:47:38 -0400138 exitChannel <- 1
139 }
140 }()
141
142 code := <-exitChannel
143 return code
144}
145
khenaidoo5c11af72018-07-20 17:21:05 -0400146func printBanner() {
147 fmt.Println(" ")
148 fmt.Println(" ______ ______ ")
149 fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
150 fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
151 fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
152 fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
153 fmt.Println(" ")
154}
155
khenaidoocfee5f42018-07-19 22:47:38 -0400156func main() {
157 start := time.Now()
158
159 cf := config.NewRWCoreFlags()
160 cf.ParseCommandArguments()
161
162 // Setup logging
khenaidoo5c11af72018-07-20 17:21:05 -0400163 if _, err := log.SetLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
khenaidoocfee5f42018-07-19 22:47:38 -0400164 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
165 }
166 defer log.CleanUp()
167
khenaidoo5c11af72018-07-20 17:21:05 -0400168 // Print banner if specified
169 if cf.Banner {
170 printBanner()
171 }
172
173 log.Infow("rw-core-config", log.Fields{"config": *cf})
174
175 ctx, cancel := context.WithCancel(context.Background())
176 defer cancel()
khenaidoocfee5f42018-07-19 22:47:38 -0400177
178 core := newRWCore(cf)
khenaidoo5c11af72018-07-20 17:21:05 -0400179 go core.start(ctx)
khenaidoocfee5f42018-07-19 22:47:38 -0400180
181 code := waitForExit()
khenaidoo5c11af72018-07-20 17:21:05 -0400182 log.Infow("received-a-closing-signal", log.Fields{"code": code})
khenaidoocfee5f42018-07-19 22:47:38 -0400183
184 // Cleanup before leaving
185 core.stop()
186
187 elapsed := time.Since(start)
khenaidoo5c11af72018-07-20 17:21:05 -0400188 log.Infow("rw-core-run-time", log.Fields{"core": core.config.InstanceID, "time": elapsed / time.Second})
khenaidoocfee5f42018-07-19 22:47:38 -0400189}