blob: b2547c20bd5dafa0b94d188e0f105e2264328b3e [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
sbarbari17d7e222019-11-05 10:02:29 -050017package db
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040018
19import (
Scott Baker973f8ba2019-11-19 15:00:22 -080020 "context"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040021 "errors"
22 "fmt"
Scott Baker807addd2019-10-24 15:16:21 -070023 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
24 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker973f8ba2019-11-19 15:00:22 -080025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027 "strconv"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050028 "sync"
Scott Baker973f8ba2019-11-19 15:00:22 -080029 "time"
30)
31
32const (
33 // Default Minimal Interval for posting alive state of backend kvstore on Liveness Channel
34 DefaultLivenessChannelInterval = time.Second * 30
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040035)
36
Stephane Barbariedc5022d2018-11-19 15:21:44 -050037// Backend structure holds details for accessing the kv store
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040038type Backend struct {
Stephane Barbariedc5022d2018-11-19 15:21:44 -050039 sync.RWMutex
Scott Baker973f8ba2019-11-19 15:00:22 -080040 Client kvstore.Client
41 StoreType string
42 Host string
43 Port int
44 Timeout int
45 PathPrefix string
46 alive bool // Is this backend connection alive?
47 liveness chan bool // channel to post alive state
48 LivenessChannelInterval time.Duration // regularly push alive state beyond this interval
49 lastLivenessTime time.Time // Instant of last alive state push
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040050}
51
Stephane Barbariedc5022d2018-11-19 15:21:44 -050052// NewBackend creates a new instance of a Backend structure
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040053func NewBackend(storeType string, host string, port int, timeout int, pathPrefix string) *Backend {
54 var err error
55
56 b := &Backend{
Scott Baker973f8ba2019-11-19 15:00:22 -080057 StoreType: storeType,
58 Host: host,
59 Port: port,
60 Timeout: timeout,
61 LivenessChannelInterval: DefaultLivenessChannelInterval,
62 PathPrefix: pathPrefix,
63 alive: false, // connection considered down at start
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040064 }
65
66 address := host + ":" + strconv.Itoa(port)
67 if b.Client, err = b.newClient(address, timeout); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050068 log.Errorw("failed-to-create-kv-client",
69 log.Fields{
70 "type": storeType, "host": host, "port": port,
71 "timeout": timeout, "prefix": pathPrefix,
72 "error": err.Error(),
73 })
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040074 }
75
76 return b
77}
78
79func (b *Backend) newClient(address string, timeout int) (kvstore.Client, error) {
80 switch b.StoreType {
81 case "consul":
82 return kvstore.NewConsulClient(address, timeout)
83 case "etcd":
84 return kvstore.NewEtcdClient(address, timeout)
85 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -050086 return nil, errors.New("unsupported-kv-store")
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040087}
88
89func (b *Backend) makePath(key string) string {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040090 path := fmt.Sprintf("%s/%s", b.PathPrefix, key)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040091 return path
92}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050093
Scott Baker973f8ba2019-11-19 15:00:22 -080094func (b *Backend) updateLiveness(alive bool) {
95 // Periodically push stream of liveness data to the channel,
96 // so that in a live state, the core does not timeout and
97 // send a forced liveness message. Push alive state if the
98 // last push to channel was beyond livenessChannelInterval
99 if b.liveness != nil {
100
101 if b.alive != alive {
102 log.Debug("update-liveness-channel-reason-change")
103 b.liveness <- alive
104 b.lastLivenessTime = time.Now()
105 } else if time.Now().Sub(b.lastLivenessTime) > b.LivenessChannelInterval {
106 log.Debug("update-liveness-channel-reason-interval")
107 b.liveness <- alive
108 b.lastLivenessTime = time.Now()
109 }
110 }
111
112 // Emit log message only for alive state change
113 if b.alive != alive {
114 log.Debugw("change-kvstore-alive-status", log.Fields{"alive": alive})
115 b.alive = alive
116 }
117}
118
119// Perform a dummy Key Lookup on kvstore to test Connection Liveness and
120// post on Liveness channel
121func (b *Backend) PerformLivenessCheck(timeout int) bool {
122 alive := b.Client.IsConnectionUp(timeout)
123 log.Debugw("kvstore-liveness-check-result", log.Fields{"alive": alive})
124
125 b.updateLiveness(alive)
126 return alive
127}
128
129// Enable the liveness monitor channel. This channel will report
130// a "true" or "false" on every kvstore operation which indicates whether
131// or not the connection is still Live. This channel is then picked up
132// by the service (i.e. rw_core / ro_core) to update readiness status
133// and/or take other actions.
134func (b *Backend) EnableLivenessChannel() chan bool {
135 log.Debug("enable-kvstore-liveness-channel")
136
137 if b.liveness == nil {
138 log.Debug("create-kvstore-liveness-channel")
139
140 // Channel size of 10 to avoid any possibility of blocking in Load conditions
141 b.liveness = make(chan bool, 10)
142
143 // Post initial alive state
144 b.liveness <- b.alive
145 b.lastLivenessTime = time.Now()
146 }
147
148 return b.liveness
149}
150
151// Extract Alive status of Kvstore based on type of error
152func (b *Backend) isErrorIndicatingAliveKvstore(err error) bool {
153 // Alive unless observed an error indicating so
154 alive := true
155
156 if err != nil {
157
158 // timeout indicates kvstore not reachable/alive
159 if err == context.DeadlineExceeded {
160 alive = false
161 }
162
163 // Need to analyze client-specific errors based on backend type
164 if b.StoreType == "etcd" {
165
166 // For etcd backend, consider not-alive only for errors indicating
167 // timedout request or unavailable/corrupted cluster. For all remaining
168 // error codes listed in https://godoc.org/google.golang.org/grpc/codes#Code,
169 // we would not infer a not-alive backend because such a error may also
170 // occur due to bad client requests or sequence of operations
171 switch status.Code(err) {
172 case codes.DeadlineExceeded:
173 fallthrough
174 case codes.Unavailable:
175 fallthrough
176 case codes.DataLoss:
177 alive = false
178 }
179
180 //} else {
181 // TODO: Implement for consul backend; would it be needed ever?
182 }
183 }
184
185 return alive
186}
187
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500188// List retrieves one or more items that match the specified key
sbarbari17d7e222019-11-05 10:02:29 -0500189func (b *Backend) List(key string) (map[string]*kvstore.KVPair, error) {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500190 b.Lock()
191 defer b.Unlock()
192
193 formattedPath := b.makePath(key)
sbarbari17d7e222019-11-05 10:02:29 -0500194 log.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500195
Scott Baker973f8ba2019-11-19 15:00:22 -0800196 pair, err := b.Client.List(formattedPath, b.Timeout)
197
198 b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
199
200 return pair, err
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400201}
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500202
203// Get retrieves an item that matches the specified key
sbarbari17d7e222019-11-05 10:02:29 -0500204func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500205 b.Lock()
206 defer b.Unlock()
207
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400208 formattedPath := b.makePath(key)
sbarbari17d7e222019-11-05 10:02:29 -0500209 log.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500210
Scott Baker973f8ba2019-11-19 15:00:22 -0800211 pair, err := b.Client.Get(formattedPath, b.Timeout)
212
213 b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
214
215 return pair, err
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400216}
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500217
218// Put stores an item value under the specifed key
sbarbari17d7e222019-11-05 10:02:29 -0500219func (b *Backend) Put(key string, value interface{}) error {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500220 b.Lock()
221 defer b.Unlock()
222
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400223 formattedPath := b.makePath(key)
sbarbari17d7e222019-11-05 10:02:29 -0500224 log.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500225
Scott Baker973f8ba2019-11-19 15:00:22 -0800226 err := b.Client.Put(formattedPath, value, b.Timeout)
227
228 b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
229
230 return err
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400231}
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500232
233// Delete removes an item under the specified key
sbarbari17d7e222019-11-05 10:02:29 -0500234func (b *Backend) Delete(key string) error {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500235 b.Lock()
236 defer b.Unlock()
237
238 formattedPath := b.makePath(key)
sbarbari17d7e222019-11-05 10:02:29 -0500239 log.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500240
Scott Baker973f8ba2019-11-19 15:00:22 -0800241 err := b.Client.Delete(formattedPath, b.Timeout)
242
243 b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
244
245 return err
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400246}
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500247
248// CreateWatch starts watching events for the specified key
249func (b *Backend) CreateWatch(key string) chan *kvstore.Event {
250 b.Lock()
251 defer b.Unlock()
252
253 formattedPath := b.makePath(key)
254 log.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
255
256 return b.Client.Watch(formattedPath)
257}
258
259// DeleteWatch stops watching events for the specified key
260func (b *Backend) DeleteWatch(key string, ch chan *kvstore.Event) {
261 b.Lock()
262 defer b.Unlock()
263
264 formattedPath := b.makePath(key)
265 log.Debugw("deleting-key-watch", log.Fields{"key": key, "path": formattedPath})
266
267 b.Client.CloseWatch(formattedPath, ch)
268}