blob: e7669041a0130e402efb1f8a67033467a7f9d6b3 [file] [log] [blame]
khenaidoocfee5f42018-07-19 22:47:38 -04001package main
2
3import (
4 "fmt"
5 "os"
6 "os/signal"
7 "time"
8 "errors"
9 "strconv"
10 "github.com/opencord/voltha-go/db/kvstore"
11 "github.com/opencord/voltha-go/common/log"
12 "github.com/opencord/voltha-go/rw_core/config"
13 "syscall"
14)
15
16type rwCore struct {
17 kvClient kvstore.Client
18 config *config.RWCoreFlags
19 halted bool
20 exitChannel chan int
21}
22
23func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
24
25 log.Infow("kv-store-type", log.Fields{"store":storeType})
26 switch storeType {
27 case "consul":
28 return kvstore.NewConsulClient(address, timeout)
29 case "etcd":
30 return kvstore.NewEtcdClient(address, timeout)
31 }
32 return nil, errors.New("unsupported-kv-store")
33}
34
35func newRWCore(cf *config.RWCoreFlags) *rwCore {
36 var rwCore rwCore
37 rwCore.config = cf
38 rwCore.halted = false
39 rwCore.exitChannel = make(chan int, 1)
40 return &rwCore
41}
42
43func (core *rwCore) setKVClient() error {
44 addr := core.config.KVStoreHost + ":" + strconv.Itoa(core.config.KVStorePort)
45 client, err := newKVClient(core.config.KVStoreType, addr, core.config.KVStoreTimeout)
46 if err != nil {
47 log.Error(err)
48 return err
49 }
50 core.kvClient = client
51 return nil
52}
53
54
55func toString(value interface{}) (string, error) {
56 switch t := value.(type) {
57 case []byte:
58 return string(value.([]byte)), nil
59 case string:
60 return value.(string), nil
61 default:
62 return "", fmt.Errorf("unexpected-type-%T", t)
63 }
64}
65
66
67func (core *rwCore) start() {
68 log.Info("core-starting")
69
70 //First set the KV client. Some client immediately tries to connect to the KV store (etcd) while others does
71 // not create the connection until a request to the store is made (consul)
72 tick := time.Tick(kvstore.GetDuration(core.config.KVStoreTimeout))
73 connected := false
74KVStoreConnectLoop:
75 for {
76 if err := core.setKVClient(); err != nil {
77 log.Warn("cannot-create-kv-client-retrying")
78 select {
79 case <-tick:
80 log.Debug("kv-client-retry")
81 continue
82 case <-core.exitChannel:
83 log.Info("exit-request-received")
84 break KVStoreConnectLoop
85 }
86 } else {
87 log.Debug("got-kv-client.")
88 connected = true
89 break
90 }
91 }
92 // Connected is true only if there is a valid KV store connection and no exit request has been received
93 if connected {
94 log.Info("core-started")
95 } else {
96 log.Info("core-ended")
97 }
98}
99
100
101func (core *rwCore) stop() {
102 // Stop leadership tracking
103 core.halted = true
104
105 // send exit signal
106 core.exitChannel <- 0
107
108 // Cleanup - applies only if we had a kvClient
109 if core.kvClient != nil {
110 // Release all reservations
111 if err := core.kvClient.ReleaseAllReservations(); err != nil {
112 log.Infow("fail-to-release-all-reservations", log.Fields{"error":err})
113 }
114 // Close the DB connection
115 core.kvClient.Close()
116 }
117}
118
119func waitForExit() int {
120 signalChannel := make(chan os.Signal, 1)
121 signal.Notify(signalChannel,
122 syscall.SIGHUP,
123 syscall.SIGINT,
124 syscall.SIGTERM,
125 syscall.SIGQUIT)
126
127 exitChannel := make(chan int)
128
129 go func() {
130 s := <-signalChannel
131 switch s {
132 case syscall.SIGHUP,
133 syscall.SIGINT,
134 syscall.SIGTERM,
135 syscall.SIGQUIT:
136 log.Infow("closing-signal-received", log.Fields{"signal":s})
137 exitChannel <- 0
138 default:
139 log.Infow("unexpected-signal-received", log.Fields{"signal":s})
140 exitChannel <- 1
141 }
142 }()
143
144 code := <-exitChannel
145 return code
146}
147
148func main() {
149 start := time.Now()
150
151 cf := config.NewRWCoreFlags()
152 cf.ParseCommandArguments()
153
154 // Setup logging
155 if _, err := log.SetLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId":cf.InstanceID}); err != nil {
156 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
157 }
158 defer log.CleanUp()
159
160 log.Infow("rw-core-config", log.Fields{"config":*cf})
161
162 core := newRWCore(cf)
163 go core.start()
164
165 code := waitForExit()
166 log.Infow("received-a-closing-signal", log.Fields{"code":code})
167
168 // Cleanup before leaving
169 core.stop()
170
171 elapsed := time.Since(start)
172 log.Infow("rw-core-run-time", log.Fields{"core":core.config.InstanceID, "time":elapsed/time.Second})
173}