blob: 742916c301fedccec9bf11ba5bfa7dc64ac70d78 [file] [log] [blame]
serkant.uluderyae5afeff2021-02-23 18:00:23 +03001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
serkant.uluderyae5afeff2021-02-23 18:00:23 +03003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
serkant.uluderyae5afeff2021-02-23 18:00:23 +03007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
serkant.uluderyae5afeff2021-02-23 18:00:23 +03009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
serkant.uluderyae5afeff2021-02-23 18:00:23 +030015 */
16package kvstore
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/go-redis/redis/v8"
27 "github.com/opencord/voltha-lib-go/v7/pkg/log"
28)
29
30type RedisClient struct {
31 redisAPI *redis.Client
32 keyReservations map[string]time.Duration
33 watchedChannels sync.Map
34 writeLock sync.Mutex
35 keyReservationsLock sync.RWMutex
36}
37
38func NewRedisClient(addr string, timeout time.Duration, useSentinel bool) (*RedisClient, error) {
39 var r *redis.Client
40 if !useSentinel {
41 r = redis.NewClient(&redis.Options{Addr: addr})
42 } else {
43 // Redis Master-Replicas with Sentinel, sentinel masterSet config
44 // should be set to sebaRedis
45 r = redis.NewFailoverClient(&redis.FailoverOptions{
46 MasterName: "sebaRedis",
47 SentinelAddrs: []string{addr},
48 })
49 }
50
51 reservations := make(map[string]time.Duration)
52 return &RedisClient{redisAPI: r, keyReservations: reservations}, nil
53}
54
55func (c *RedisClient) Get(ctx context.Context, key string) (*KVPair, error) {
56
57 val, err := c.redisAPI.Get(ctx, key).Result()
58 valBytes, _ := ToByte(val)
59 if err != nil {
60 return nil, nil
61 }
62 return NewKVPair(key, valBytes, "", 0, 0), nil
63}
64
65func (c *RedisClient) Put(ctx context.Context, key string, value interface{}) error {
66
67 // Validate that we can convert value to a string as etcd API expects a string
68 var val string
69 var er error
70 if val, er = ToString(value); er != nil {
71 return fmt.Errorf("unexpected-type-%T", value)
72 }
73
74 // Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
75 // that KV key permanent instead of automatically removing it after a lease expiration
76 setErr := c.redisAPI.Set(ctx, key, val, 0)
77 err := setErr.Err()
78
79 if err != nil {
80 switch setErr.Err() {
81 case context.Canceled:
82 logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
83 case context.DeadlineExceeded:
84 logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
85 default:
86 logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
87 }
88 return err
89 }
90 return nil
91}
92
93func (c *RedisClient) scanAllKeysWithPrefix(ctx context.Context, key string) ([]string, error) {
94 var err error
95 allkeys := []string{}
96 cont := true
97 cursor := uint64(0)
98 matchPrefix := key + "*"
99
100 for cont {
101 // search in the first 10000 entries starting from the point indicated by the cursor
102 logger.Debugw(ctx, "redis-scan", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
103 var keys []string
104 keys, cursor, err = c.redisAPI.Scan(context.Background(), cursor, matchPrefix, 10000).Result()
105 if err != nil {
106 return nil, err
107 }
108 if cursor == 0 {
109 // all data searched. break the loop
110 logger.Debugw(ctx, "redis-scan-ended", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
111 cont = false
112 }
113 if len(keys) == 0 {
114 // no matched data found in this cycle. Continue to search
115 logger.Debugw(ctx, "redis-scan-no-data-found-continue", log.Fields{"matchPrefix": matchPrefix, "cursor": cursor})
116 continue
117 }
118 allkeys = append(allkeys, keys...)
119 }
120 return allkeys, nil
121}
122
123func (c *RedisClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
124 var err error
125 var keys []string
126 m := make(map[string]*KVPair)
127 var values []interface{}
128
129 if keys, err = c.scanAllKeysWithPrefix(ctx, key); err != nil {
130 return nil, err
131 }
132
133 if len(keys) != 0 {
134 values, err = c.redisAPI.MGet(ctx, keys...).Result()
135 if err != nil {
136 return nil, err
137 }
138 }
139 for i, key := range keys {
140 if valBytes, err := ToByte(values[i]); err == nil {
141 m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
142 }
143 }
144 return m, nil
145}
146
147func (c *RedisClient) Delete(ctx context.Context, key string) error {
148 // delete the key
149 if _, err := c.redisAPI.Del(ctx, key).Result(); err != nil {
150 logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
151 return err
152 }
153 logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
154 return nil
155}
156
157func (c *RedisClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
158 var keys []string
159 var err error
160 if keys, err = c.scanAllKeysWithPrefix(ctx, prefixKey); err != nil {
161 return err
162 }
163 if len(keys) == 0 {
164 logger.Warn(ctx, "nothing-to-delete-from-kv", log.Fields{"key": prefixKey})
165 return nil
166 }
167 //call delete for keys
168 entryCount := int64(0)
169 start := 0
170 pageSize := 5000
171 length := len(keys)
172 for start < length {
173 end := start + pageSize
174 if end >= length {
175 end = length
176 }
177 keysToDelete := keys[start:end]
178 count := int64(0)
179 if count, err = c.redisAPI.Del(ctx, keysToDelete...).Result(); err != nil {
180 logger.Errorw(ctx, "DeleteWithPrefix method failed", log.Fields{"prefixKey": prefixKey, "numOfMatchedKeys": len(keysToDelete), "err": err})
181 return err
182 }
183 entryCount += count
184 start = end
185 }
186 logger.Debugf(ctx, "%d entries matching with the key prefix %s have been deleted successfully", entryCount, prefixKey)
187 return nil
188}
189
190func (c *RedisClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
191 var val string
192 var er error
193 if val, er = ToString(value); er != nil {
194 return nil, fmt.Errorf("unexpected-type%T", value)
195 }
196
197 // SetNX -- Only set the key if it does not already exist.
198 c.redisAPI.SetNX(ctx, key, value, ttl)
199
200 // Check if set is successful
201 redisVal := c.redisAPI.Get(ctx, key).Val()
202 if redisVal == "" {
203 println("NULL")
204 return nil, nil
205 }
206
207 if val == redisVal {
208 // set is successful, return new reservation value
209 c.keyReservationsLock.Lock()
210 c.keyReservations[key] = ttl
211 c.keyReservationsLock.Unlock()
212 bytes, _ := ToByte(val)
213 return bytes, nil
214 } else {
215 // set is not successful, return existing reservation value
216 bytes, _ := ToByte(redisVal)
217 return bytes, nil
218 }
219
220}
221
222func (c *RedisClient) ReleaseReservation(ctx context.Context, key string) error {
223
224 redisVal := c.redisAPI.Get(ctx, key).Val()
225 if redisVal == "" {
226 return nil
227 }
228
229 // Override SetNX value with no TTL
230 _, err := c.redisAPI.Set(ctx, key, redisVal, 0).Result()
231 if err != nil {
232 delete(c.keyReservations, key)
233 } else {
234 return err
235 }
236 return nil
237
238}
239
240func (c *RedisClient) ReleaseAllReservations(ctx context.Context) error {
241 c.writeLock.Lock()
242 defer c.writeLock.Unlock()
243 for key := range c.keyReservations {
244 err := c.ReleaseReservation(ctx, key)
245 if err != nil {
246 logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key, "error": err})
247 return err
248 }
249 }
250 return nil
251}
252
253func (c *RedisClient) RenewReservation(ctx context.Context, key string) error {
254 c.writeLock.Lock()
255 defer c.writeLock.Unlock()
256
257 // Verify the key was reserved
258 ttl, ok := c.keyReservations[key]
259 if !ok {
260 return errors.New("key-not-reserved. Key not found")
261 }
262
263 redisVal := c.redisAPI.Get(ctx, key).Val()
264 if redisVal != "" {
265 c.redisAPI.Set(ctx, key, redisVal, ttl)
266 }
267 return nil
268}
269
270func (c *RedisClient) listenForKeyChange(ctx context.Context, redisCh <-chan *redis.Message, ch chan<- *Event, cancel context.CancelFunc) {
271 logger.Debug(ctx, "start-listening-on-channel ...")
272 defer cancel()
273 defer close(ch)
274 for msg := range redisCh {
275 words := strings.Split(msg.Channel, ":")
276 key := words[1]
277 msgType := getMessageType(msg.Payload)
278 var valBytes []byte
279 if msgType == PUT {
280 ev, _ := c.Get(ctx, key)
281 valBytes, _ = ToByte(ev.Value)
282 }
283 ch <- NewEvent(getMessageType(msg.Payload), []byte(key), valBytes, 0)
284 }
285 logger.Debug(ctx, "stop-listening-on-channel ...")
286}
287
288func getMessageType(msg string) int {
289 isPut := strings.HasSuffix(msg, "set")
290 isDel := strings.HasSuffix(msg, "del")
291 if isPut {
292 return PUT
293 } else if isDel {
294 return DELETE
295 } else {
296 return UNKNOWN
297 }
298}
299
300func (c *RedisClient) addChannelMap(key string, channelMap map[chan *Event]*redis.PubSub) []map[chan *Event]*redis.PubSub {
301
302 var channels interface{}
303 var exists bool
304
305 if channels, exists = c.watchedChannels.Load(key); exists {
306 channels = append(channels.([]map[chan *Event]*redis.PubSub), channelMap)
307 } else {
308 channels = []map[chan *Event]*redis.PubSub{channelMap}
309 }
310 c.watchedChannels.Store(key, channels)
311
312 return channels.([]map[chan *Event]*redis.PubSub)
313}
314
315func (c *RedisClient) removeChannelMap(key string, pos int) []map[chan *Event]*redis.PubSub {
316 var channels interface{}
317 var exists bool
318
319 if channels, exists = c.watchedChannels.Load(key); exists {
320 channels = append(channels.([]map[chan *Event]*redis.PubSub)[:pos], channels.([]map[chan *Event]*redis.PubSub)[pos+1:]...)
321 c.watchedChannels.Store(key, channels)
322 }
323
324 return channels.([]map[chan *Event]*redis.PubSub)
325}
326
327func (c *RedisClient) getChannelMaps(key string) ([]map[chan *Event]*redis.PubSub, bool) {
328 var channels interface{}
329 var exists bool
330
331 channels, exists = c.watchedChannels.Load(key)
332
333 if channels == nil {
334 return nil, exists
335 }
336
337 return channels.([]map[chan *Event]*redis.PubSub), exists
338}
339
340func (c *RedisClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
341
342 ctx, cancel := context.WithCancel(ctx)
343
344 var subscribePath string
345 subscribePath = "__key*__:" + key
346 if withPrefix {
347 subscribePath += "*"
348 }
349 pubsub := c.redisAPI.PSubscribe(ctx, subscribePath)
350 redisCh := pubsub.Channel()
351
352 // Create new channel
353 ch := make(chan *Event, maxClientChannelBufferSize)
354
355 // Keep track of the created channels so they can be closed when required
356 channelMap := make(map[chan *Event]*redis.PubSub)
357 channelMap[ch] = pubsub
358
359 channelMaps := c.addChannelMap(key, channelMap)
360 logger.Debugw(ctx, "watched-channels", log.Fields{"len": len(channelMaps)})
361
362 // Launch a go routine to listen for updates
363 go c.listenForKeyChange(ctx, redisCh, ch, cancel)
364 return ch
365}
366
367func (c *RedisClient) CloseWatch(ctx context.Context, key string, ch chan *Event) {
368 // Get the array of channels mapping
369 var watchedChannels []map[chan *Event]*redis.PubSub
370 var ok bool
371
372 if watchedChannels, ok = c.getChannelMaps(key); !ok {
373 logger.Warnw(ctx, "key-has-no-watched-channels", log.Fields{"key": key})
374 return
375 }
376 // Look for the channels
377 var pos = -1
378 for i, chMap := range watchedChannels {
379 if t, ok := chMap[ch]; ok {
380 logger.Debug(ctx, "channel-found")
381 // Close the Redis watcher before the client channel. This should close the etcd channel as well
382 if err := t.Close(); err != nil {
383 logger.Errorw(ctx, "watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
384 }
385 pos = i
386 break
387 }
388 }
389
390 channelMaps, _ := c.getChannelMaps(key)
391 // Remove that entry if present
392 if pos >= 0 {
393 channelMaps = c.removeChannelMap(key, pos)
394 }
395 logger.Infow(ctx, "watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
396}
397func (c *RedisClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
398 return nil
399}
400
401func (c *RedisClient) ReleaseLock(lockName string) error {
402 return nil
403}
404
405func (c *RedisClient) IsConnectionUp(ctx context.Context) bool {
406 if _, err := c.redisAPI.Set(ctx, "connection-check", "1", 0).Result(); err != nil {
407 return false
408 }
409 return true
410
411}
412
413func (c *RedisClient) Close(ctx context.Context) {
414 if err := c.redisAPI.Close(); err != nil {
415 logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
416 }
417}