blob: bdf2d10a3028bd3994cff1e9d32a107e322f4eee [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "bytes"
20 "context"
21 "errors"
22 log "github.com/opencord/voltha-lib-go/v3/pkg/log"
23 "sync"
24 "time"
25 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
35// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
47func NewConsulClient(addr string, timeout int) (*ConsulClient, error) {
48
49 duration := GetDuration(timeout)
50
51 config := consulapi.DefaultConfig()
52 config.Address = addr
53 config.WaitTime = duration
54 consul, err := consulapi.NewClient(config)
55 if err != nil {
56 logger.Error(err)
57 return nil, err
58 }
59
60 doneCh := make(chan int, 1)
61 wChannelsContext := make(map[string][]*channelContextMap)
62 reservations := make(map[string]interface{})
63 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
64}
65
66// IsConnectionUp returns whether the connection to the Consul KV store is up
67func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
68 logger.Error("Unimplemented function")
69 return false
70}
71
72// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
73// wait for a response
74func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
75
76 deadline, _ := ctx.Deadline()
77 kv := c.consul.KV()
78 var queryOptions consulapi.QueryOptions
79 queryOptions.WaitTime = GetDuration(deadline.Second())
80 // For now we ignore meta data
81 kvps, _, err := kv.List(key, &queryOptions)
82 if err != nil {
83 logger.Error(err)
84 return nil, err
85 }
86 m := make(map[string]*KVPair)
87 for _, kvp := range kvps {
88 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
89 }
90 return m, nil
91}
92
93// Get returns a key-value pair for a given key. Timeout defines how long the function will
94// wait for a response
95func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
96
97 deadline, _ := ctx.Deadline()
98 kv := c.consul.KV()
99 var queryOptions consulapi.QueryOptions
100 queryOptions.WaitTime = GetDuration(deadline.Second())
101 // For now we ignore meta data
102 kvp, _, err := kv.Get(key, &queryOptions)
103 if err != nil {
104 logger.Error(err)
105 return nil, err
106 }
107 if kvp != nil {
108 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
109 }
110
111 return nil, nil
112}
113
114// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
115// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
116// wait for a response
117func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
118
119 // Validate that we can create a byte array from the value as consul API expects a byte array
120 var val []byte
121 var er error
122 if val, er = ToByte(value); er != nil {
123 logger.Error(er)
124 return er
125 }
126
127 // Create a key value pair
128 kvp := consulapi.KVPair{Key: key, Value: val}
129 kv := c.consul.KV()
130 var writeOptions consulapi.WriteOptions
131 c.writeLock.Lock()
132 defer c.writeLock.Unlock()
133 _, err := kv.Put(&kvp, &writeOptions)
134 if err != nil {
135 logger.Error(err)
136 return err
137 }
138 return nil
139}
140
141// Delete removes a key from the KV store. Timeout defines how long the function will
142// wait for a response
143func (c *ConsulClient) Delete(ctx context.Context, key string) error {
144 kv := c.consul.KV()
145 var writeOptions consulapi.WriteOptions
146 c.writeLock.Lock()
147 defer c.writeLock.Unlock()
148 _, err := kv.Delete(key, &writeOptions)
149 if err != nil {
150 logger.Error(err)
151 return err
152 }
153 return nil
154}
155
156func (c *ConsulClient) deleteSession() {
157 if c.sessionID != "" {
158 logger.Debug("cleaning-up-session")
159 session := c.consul.Session()
160 _, err := session.Destroy(c.sessionID, nil)
161 if err != nil {
162 logger.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
163 }
164 }
165 c.sessionID = ""
166 c.session = nil
167}
168
169func (c *ConsulClient) createSession(ttl int64, retries int) (*consulapi.Session, string, error) {
170 session := c.consul.Session()
171 entry := &consulapi.SessionEntry{
172 Behavior: consulapi.SessionBehaviorDelete,
173 TTL: "10s", // strconv.FormatInt(ttl, 10) + "s", // disable ttl
174 }
175
176 for {
177 id, meta, err := session.Create(entry, nil)
178 if err != nil {
179 logger.Errorw("create-session-error", log.Fields{"error": err})
180 if retries == 0 {
181 return nil, "", err
182 }
183 } else if meta.RequestTime == 0 {
184 logger.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
185 if retries == 0 {
186 return nil, "", errors.New("bad-meta-data")
187 }
188 } else if id == "" {
189 logger.Error("create-session-nil-id")
190 if retries == 0 {
191 return nil, "", errors.New("ID-nil")
192 }
193 } else {
194 return session, id, nil
195 }
196 // If retry param is -1 we will retry indefinitely
197 if retries > 0 {
198 retries--
199 }
200 logger.Debug("retrying-session-create-after-a-second-delay")
201 time.Sleep(time.Duration(1) * time.Second)
202 }
203}
204
205// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
206// string types
207func isEqual(val1 interface{}, val2 interface{}) bool {
208 b1, err := ToByte(val1)
209 b2, er := ToByte(val2)
210 if err == nil && er == nil {
211 return bytes.Equal(b1, b2)
212 }
213 return val1 == val2
214}
215
216// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
217// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
218// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
219// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
220// then the value assigned to that key will be returned.
221func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
222
223 // Validate that we can create a byte array from the value as consul API expects a byte array
224 var val []byte
225 var er error
226 if val, er = ToByte(value); er != nil {
227 logger.Error(er)
228 return nil, er
229 }
230
231 // Cleanup any existing session and recreate new ones. A key is reserved against a session
232 if c.sessionID != "" {
233 c.deleteSession()
234 }
235
236 // Clear session if reservation is not successful
237 reservationSuccessful := false
238 defer func() {
239 if !reservationSuccessful {
240 logger.Debug("deleting-session")
241 c.deleteSession()
242 }
243 }()
244
245 session, sessionID, err := c.createSession(ttl, -1)
246 if err != nil {
247 logger.Errorw("no-session-created", log.Fields{"error": err})
248 return "", errors.New("no-session-created")
249 }
250 logger.Debugw("session-created", log.Fields{"session-id": sessionID})
251 c.sessionID = sessionID
252 c.session = session
253
254 // Try to grap the Key using the session
255 kv := c.consul.KV()
256 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
257 result, _, err := kv.Acquire(&kvp, nil)
258 if err != nil {
259 logger.Errorw("error-acquiring-keys", log.Fields{"error": err})
260 return nil, err
261 }
262
263 logger.Debugw("key-acquired", log.Fields{"key": key, "status": result})
264
265 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
266 m, err := c.Get(ctx, key)
267 if err != nil {
268 return nil, err
269 }
270 if m != nil {
271 logger.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
272 if m.Key == key && isEqual(m.Value, value) {
273 // My reservation is successful - register it. For now, support is only for 1 reservation per key
274 // per session.
275 reservationSuccessful = true
276 c.writeLock.Lock()
277 c.keyReservations[key] = m.Value
278 c.writeLock.Unlock()
279 return m.Value, nil
280 }
281 // My reservation has failed. Return the owner of that key
282 return m.Value, nil
283 }
284 return nil, nil
285}
286
287// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
288func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
289 kv := c.consul.KV()
290 var kvp consulapi.KVPair
291 var result bool
292 var err error
293
294 c.writeLock.Lock()
295 defer c.writeLock.Unlock()
296
297 for key, value := range c.keyReservations {
298 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
299 result, _, err = kv.Release(&kvp, nil)
300 if err != nil {
301 logger.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
302 return err
303 }
304 if !result {
305 logger.Errorw("cannot-release-reservation", log.Fields{"key": key})
306 }
307 delete(c.keyReservations, key)
308 }
309 return nil
310}
311
312// ReleaseReservation releases reservation for a specific key.
313func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
314 var ok bool
315 var reservedValue interface{}
316 c.writeLock.Lock()
317 defer c.writeLock.Unlock()
318 if reservedValue, ok = c.keyReservations[key]; !ok {
319 return errors.New("key-not-reserved:" + key)
320 }
321 // Release the reservation
322 kv := c.consul.KV()
323 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
324
325 result, _, er := kv.Release(&kvp, nil)
326 if er != nil {
327 return er
328 }
329 // Remove that key entry on success
330 if result {
331 delete(c.keyReservations, key)
332 return nil
333 }
334 return errors.New("key-cannot-be-unreserved")
335}
336
337// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
338// period specified when reserving the key
339func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
340 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
341
342 c.writeLock.Lock()
343 defer c.writeLock.Unlock()
344
345 // Verify the key was reserved
346 if _, ok := c.keyReservations[key]; !ok {
347 return errors.New("key-not-reserved")
348 }
349
350 if c.session == nil {
351 return errors.New("no-session-exist")
352 }
353
354 var writeOptions consulapi.WriteOptions
355 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
356 return err
357 }
358 return nil
359}
360
361// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
362// listen to receive Events.
363func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
364
365 // Create a new channel
366 ch := make(chan *Event, maxClientChannelBufferSize)
367
368 // Create a context to track this request
369 watchContext, cFunc := context.WithCancel(context.Background())
370
371 // Save the channel and context reference for later
372 c.writeLock.Lock()
373 defer c.writeLock.Unlock()
374 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
375 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
376
377 // Launch a go routine to listen for updates
378 go c.listenForKeyChange(watchContext, key, ch)
379
380 return ch
381}
382
383// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
384// may be multiple listeners on the same key. The previously created channel serves as a key
385func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
386 // First close the context
387 var ok bool
388 var watchedChannelsContexts []*channelContextMap
389 c.writeLock.Lock()
390 defer c.writeLock.Unlock()
391 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
392 logger.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
393 return
394 }
395 // Look for the channels
396 var pos = -1
397 for i, chCtxMap := range watchedChannelsContexts {
398 if chCtxMap.channel == ch {
399 logger.Debug("channel-found")
400 chCtxMap.cancel()
401 //close the channel
402 close(ch)
403 pos = i
404 break
405 }
406 }
407 // Remove that entry if present
408 if pos >= 0 {
409 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
410 }
411 logger.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
412}
413
414func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
415 if (kv1 == nil) && (kv2 == nil) {
416 return true
417 } else if (kv1 == nil) || (kv2 == nil) {
418 return false
419 }
420 // Both the KV should be non-null here
421 if kv1.Key != kv2.Key ||
422 !bytes.Equal(kv1.Value, kv2.Value) ||
423 kv1.Session != kv2.Session ||
424 kv1.LockIndex != kv2.LockIndex ||
425 kv1.ModifyIndex != kv2.ModifyIndex {
426 return false
427 }
428 return true
429}
430
431func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
432 logger.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
433
434 defer c.CloseWatch(key, ch)
435 duration := GetDuration(defaultKVGetTimeout)
436 kv := c.consul.KV()
437 var queryOptions consulapi.QueryOptions
438 queryOptions.WaitTime = duration
439
440 // Get the existing value, if any
441 previousKVPair, meta, err := kv.Get(key, &queryOptions)
442 if err != nil {
443 logger.Debug(err)
444 }
445 lastIndex := meta.LastIndex
446
447 // Wait for change. Push any change onto the channel and keep waiting for new update
448 //var waitOptions consulapi.QueryOptions
449 var pair *consulapi.KVPair
450 //watchContext, _ := context.WithCancel(context.Background())
451 waitOptions := queryOptions.WithContext(watchContext)
452 for {
453 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
454 waitOptions.WaitIndex = lastIndex
455 pair, meta, err = kv.Get(key, waitOptions)
456 select {
457 case <-watchContext.Done():
458 logger.Debug("done-event-received-exiting")
459 return
460 default:
461 if err != nil {
462 logger.Warnw("error-from-watch", log.Fields{"error": err})
463 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
464 } else {
465 logger.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
466 }
467 }
468 if err != nil {
469 logger.Debug(err)
470 // On error, block for 10 milliseconds to prevent endless loop
471 time.Sleep(10 * time.Millisecond)
472 } else if meta.LastIndex <= lastIndex {
473 logger.Info("no-index-change-or-negative")
474 } else {
475 logger.Debugw("update-received", log.Fields{"pair": pair})
476 if pair == nil {
477 ch <- NewEvent(DELETE, key, []byte(""), -1)
478 } else if !c.isKVEqual(pair, previousKVPair) {
479 // Push the change onto the channel if the data has changed
480 // For now just assume it's a PUT change
481 logger.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
482 ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
483 }
484 previousKVPair = pair
485 lastIndex = meta.LastIndex
486 }
487 }
488}
489
490// Close closes the KV store client
491func (c *ConsulClient) Close() {
492 var writeOptions consulapi.WriteOptions
493 // Inform any goroutine it's time to say goodbye.
494 c.writeLock.Lock()
495 defer c.writeLock.Unlock()
496 if c.doneCh != nil {
497 close(*c.doneCh)
498 }
499
500 // Clear the sessionID
501 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
502 logger.Errorw("error-closing-client", log.Fields{"error": err})
503 }
504}
505
506func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
507 return nil
508}
509
510func (c *ConsulClient) ReleaseLock(lockName string) error {
511 return nil
512}