blob: d2544dd32ecfb117f9af267de885e098bf29cb48 [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "bytes"
20 "context"
21 "errors"
22 log "github.com/opencord/voltha-lib-go/v3/pkg/log"
23 "sync"
24 "time"
25 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
35// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
Neha Sharma87d43d72020-04-08 14:10:40 +000047func NewConsulClient(addr string, timeout time.Duration) (*ConsulClient, error) {
divyadesai81bb7ba2020-03-11 11:45:23 +000048
49 config := consulapi.DefaultConfig()
50 config.Address = addr
Neha Sharma87d43d72020-04-08 14:10:40 +000051 config.WaitTime = timeout
divyadesai81bb7ba2020-03-11 11:45:23 +000052 consul, err := consulapi.NewClient(config)
53 if err != nil {
54 logger.Error(err)
55 return nil, err
56 }
57
58 doneCh := make(chan int, 1)
59 wChannelsContext := make(map[string][]*channelContextMap)
60 reservations := make(map[string]interface{})
61 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
62}
63
64// IsConnectionUp returns whether the connection to the Consul KV store is up
65func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
66 logger.Error("Unimplemented function")
67 return false
68}
69
70// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
71// wait for a response
72func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
73
74 deadline, _ := ctx.Deadline()
75 kv := c.consul.KV()
76 var queryOptions consulapi.QueryOptions
Neha Sharma87d43d72020-04-08 14:10:40 +000077 // Substract current time from deadline to get the waitTime duration
78 queryOptions.WaitTime = time.Until(deadline)
79
divyadesai81bb7ba2020-03-11 11:45:23 +000080 // For now we ignore meta data
81 kvps, _, err := kv.List(key, &queryOptions)
82 if err != nil {
83 logger.Error(err)
84 return nil, err
85 }
86 m := make(map[string]*KVPair)
87 for _, kvp := range kvps {
88 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
89 }
90 return m, nil
91}
92
93// Get returns a key-value pair for a given key. Timeout defines how long the function will
94// wait for a response
95func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
96
97 deadline, _ := ctx.Deadline()
98 kv := c.consul.KV()
99 var queryOptions consulapi.QueryOptions
Neha Sharma87d43d72020-04-08 14:10:40 +0000100 // Substract current time from deadline to get the waitTime duration
101 queryOptions.WaitTime = time.Until(deadline)
102
divyadesai81bb7ba2020-03-11 11:45:23 +0000103 // For now we ignore meta data
104 kvp, _, err := kv.Get(key, &queryOptions)
105 if err != nil {
106 logger.Error(err)
107 return nil, err
108 }
109 if kvp != nil {
110 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
111 }
112
113 return nil, nil
114}
115
116// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
117// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
118// wait for a response
119func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
120
121 // Validate that we can create a byte array from the value as consul API expects a byte array
122 var val []byte
123 var er error
124 if val, er = ToByte(value); er != nil {
125 logger.Error(er)
126 return er
127 }
128
129 // Create a key value pair
130 kvp := consulapi.KVPair{Key: key, Value: val}
131 kv := c.consul.KV()
132 var writeOptions consulapi.WriteOptions
133 c.writeLock.Lock()
134 defer c.writeLock.Unlock()
135 _, err := kv.Put(&kvp, &writeOptions)
136 if err != nil {
137 logger.Error(err)
138 return err
139 }
140 return nil
141}
142
143// Delete removes a key from the KV store. Timeout defines how long the function will
144// wait for a response
145func (c *ConsulClient) Delete(ctx context.Context, key string) error {
146 kv := c.consul.KV()
147 var writeOptions consulapi.WriteOptions
148 c.writeLock.Lock()
149 defer c.writeLock.Unlock()
150 _, err := kv.Delete(key, &writeOptions)
151 if err != nil {
152 logger.Error(err)
153 return err
154 }
155 return nil
156}
157
158func (c *ConsulClient) deleteSession() {
159 if c.sessionID != "" {
160 logger.Debug("cleaning-up-session")
161 session := c.consul.Session()
162 _, err := session.Destroy(c.sessionID, nil)
163 if err != nil {
164 logger.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
165 }
166 }
167 c.sessionID = ""
168 c.session = nil
169}
170
Neha Sharma87d43d72020-04-08 14:10:40 +0000171func (c *ConsulClient) createSession(ttl time.Duration, retries int) (*consulapi.Session, string, error) {
divyadesai81bb7ba2020-03-11 11:45:23 +0000172 session := c.consul.Session()
173 entry := &consulapi.SessionEntry{
174 Behavior: consulapi.SessionBehaviorDelete,
Neha Sharma87d43d72020-04-08 14:10:40 +0000175 TTL: ttl.String(),
divyadesai81bb7ba2020-03-11 11:45:23 +0000176 }
177
178 for {
179 id, meta, err := session.Create(entry, nil)
180 if err != nil {
181 logger.Errorw("create-session-error", log.Fields{"error": err})
182 if retries == 0 {
183 return nil, "", err
184 }
185 } else if meta.RequestTime == 0 {
186 logger.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
187 if retries == 0 {
188 return nil, "", errors.New("bad-meta-data")
189 }
190 } else if id == "" {
191 logger.Error("create-session-nil-id")
192 if retries == 0 {
193 return nil, "", errors.New("ID-nil")
194 }
195 } else {
196 return session, id, nil
197 }
198 // If retry param is -1 we will retry indefinitely
199 if retries > 0 {
200 retries--
201 }
202 logger.Debug("retrying-session-create-after-a-second-delay")
203 time.Sleep(time.Duration(1) * time.Second)
204 }
205}
206
207// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
208// string types
209func isEqual(val1 interface{}, val2 interface{}) bool {
210 b1, err := ToByte(val1)
211 b2, er := ToByte(val2)
212 if err == nil && er == nil {
213 return bytes.Equal(b1, b2)
214 }
215 return val1 == val2
216}
217
218// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
219// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
220// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
221// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
222// then the value assigned to that key will be returned.
Neha Sharma87d43d72020-04-08 14:10:40 +0000223func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
divyadesai81bb7ba2020-03-11 11:45:23 +0000224
225 // Validate that we can create a byte array from the value as consul API expects a byte array
226 var val []byte
227 var er error
228 if val, er = ToByte(value); er != nil {
229 logger.Error(er)
230 return nil, er
231 }
232
233 // Cleanup any existing session and recreate new ones. A key is reserved against a session
234 if c.sessionID != "" {
235 c.deleteSession()
236 }
237
238 // Clear session if reservation is not successful
239 reservationSuccessful := false
240 defer func() {
241 if !reservationSuccessful {
242 logger.Debug("deleting-session")
243 c.deleteSession()
244 }
245 }()
246
247 session, sessionID, err := c.createSession(ttl, -1)
248 if err != nil {
249 logger.Errorw("no-session-created", log.Fields{"error": err})
250 return "", errors.New("no-session-created")
251 }
252 logger.Debugw("session-created", log.Fields{"session-id": sessionID})
253 c.sessionID = sessionID
254 c.session = session
255
256 // Try to grap the Key using the session
257 kv := c.consul.KV()
258 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
259 result, _, err := kv.Acquire(&kvp, nil)
260 if err != nil {
261 logger.Errorw("error-acquiring-keys", log.Fields{"error": err})
262 return nil, err
263 }
264
265 logger.Debugw("key-acquired", log.Fields{"key": key, "status": result})
266
267 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
268 m, err := c.Get(ctx, key)
269 if err != nil {
270 return nil, err
271 }
272 if m != nil {
273 logger.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
274 if m.Key == key && isEqual(m.Value, value) {
275 // My reservation is successful - register it. For now, support is only for 1 reservation per key
276 // per session.
277 reservationSuccessful = true
278 c.writeLock.Lock()
279 c.keyReservations[key] = m.Value
280 c.writeLock.Unlock()
281 return m.Value, nil
282 }
283 // My reservation has failed. Return the owner of that key
284 return m.Value, nil
285 }
286 return nil, nil
287}
288
289// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
290func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
291 kv := c.consul.KV()
292 var kvp consulapi.KVPair
293 var result bool
294 var err error
295
296 c.writeLock.Lock()
297 defer c.writeLock.Unlock()
298
299 for key, value := range c.keyReservations {
300 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
301 result, _, err = kv.Release(&kvp, nil)
302 if err != nil {
303 logger.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
304 return err
305 }
306 if !result {
307 logger.Errorw("cannot-release-reservation", log.Fields{"key": key})
308 }
309 delete(c.keyReservations, key)
310 }
311 return nil
312}
313
314// ReleaseReservation releases reservation for a specific key.
315func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
316 var ok bool
317 var reservedValue interface{}
318 c.writeLock.Lock()
319 defer c.writeLock.Unlock()
320 if reservedValue, ok = c.keyReservations[key]; !ok {
321 return errors.New("key-not-reserved:" + key)
322 }
323 // Release the reservation
324 kv := c.consul.KV()
325 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
326
327 result, _, er := kv.Release(&kvp, nil)
328 if er != nil {
329 return er
330 }
331 // Remove that key entry on success
332 if result {
333 delete(c.keyReservations, key)
334 return nil
335 }
336 return errors.New("key-cannot-be-unreserved")
337}
338
339// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
340// period specified when reserving the key
341func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
342 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
343
344 c.writeLock.Lock()
345 defer c.writeLock.Unlock()
346
347 // Verify the key was reserved
348 if _, ok := c.keyReservations[key]; !ok {
349 return errors.New("key-not-reserved")
350 }
351
352 if c.session == nil {
353 return errors.New("no-session-exist")
354 }
355
356 var writeOptions consulapi.WriteOptions
357 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
358 return err
359 }
360 return nil
361}
362
363// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
364// listen to receive Events.
365func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
366
367 // Create a new channel
368 ch := make(chan *Event, maxClientChannelBufferSize)
369
370 // Create a context to track this request
371 watchContext, cFunc := context.WithCancel(context.Background())
372
373 // Save the channel and context reference for later
374 c.writeLock.Lock()
375 defer c.writeLock.Unlock()
376 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
377 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
378
379 // Launch a go routine to listen for updates
380 go c.listenForKeyChange(watchContext, key, ch)
381
382 return ch
383}
384
385// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
386// may be multiple listeners on the same key. The previously created channel serves as a key
387func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
388 // First close the context
389 var ok bool
390 var watchedChannelsContexts []*channelContextMap
391 c.writeLock.Lock()
392 defer c.writeLock.Unlock()
393 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
394 logger.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
395 return
396 }
397 // Look for the channels
398 var pos = -1
399 for i, chCtxMap := range watchedChannelsContexts {
400 if chCtxMap.channel == ch {
401 logger.Debug("channel-found")
402 chCtxMap.cancel()
403 //close the channel
404 close(ch)
405 pos = i
406 break
407 }
408 }
409 // Remove that entry if present
410 if pos >= 0 {
411 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
412 }
413 logger.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
414}
415
416func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
417 if (kv1 == nil) && (kv2 == nil) {
418 return true
419 } else if (kv1 == nil) || (kv2 == nil) {
420 return false
421 }
422 // Both the KV should be non-null here
423 if kv1.Key != kv2.Key ||
424 !bytes.Equal(kv1.Value, kv2.Value) ||
425 kv1.Session != kv2.Session ||
426 kv1.LockIndex != kv2.LockIndex ||
427 kv1.ModifyIndex != kv2.ModifyIndex {
428 return false
429 }
430 return true
431}
432
433func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
434 logger.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
435
436 defer c.CloseWatch(key, ch)
divyadesai81bb7ba2020-03-11 11:45:23 +0000437 kv := c.consul.KV()
438 var queryOptions consulapi.QueryOptions
Neha Sharma87d43d72020-04-08 14:10:40 +0000439 queryOptions.WaitTime = defaultKVGetTimeout
divyadesai81bb7ba2020-03-11 11:45:23 +0000440
441 // Get the existing value, if any
442 previousKVPair, meta, err := kv.Get(key, &queryOptions)
443 if err != nil {
444 logger.Debug(err)
445 }
446 lastIndex := meta.LastIndex
447
448 // Wait for change. Push any change onto the channel and keep waiting for new update
449 //var waitOptions consulapi.QueryOptions
450 var pair *consulapi.KVPair
451 //watchContext, _ := context.WithCancel(context.Background())
452 waitOptions := queryOptions.WithContext(watchContext)
453 for {
454 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
455 waitOptions.WaitIndex = lastIndex
456 pair, meta, err = kv.Get(key, waitOptions)
457 select {
458 case <-watchContext.Done():
459 logger.Debug("done-event-received-exiting")
460 return
461 default:
462 if err != nil {
463 logger.Warnw("error-from-watch", log.Fields{"error": err})
464 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
465 } else {
466 logger.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
467 }
468 }
469 if err != nil {
470 logger.Debug(err)
471 // On error, block for 10 milliseconds to prevent endless loop
472 time.Sleep(10 * time.Millisecond)
473 } else if meta.LastIndex <= lastIndex {
474 logger.Info("no-index-change-or-negative")
475 } else {
476 logger.Debugw("update-received", log.Fields{"pair": pair})
477 if pair == nil {
478 ch <- NewEvent(DELETE, key, []byte(""), -1)
479 } else if !c.isKVEqual(pair, previousKVPair) {
480 // Push the change onto the channel if the data has changed
481 // For now just assume it's a PUT change
482 logger.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
483 ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
484 }
485 previousKVPair = pair
486 lastIndex = meta.LastIndex
487 }
488 }
489}
490
491// Close closes the KV store client
492func (c *ConsulClient) Close() {
493 var writeOptions consulapi.WriteOptions
494 // Inform any goroutine it's time to say goodbye.
495 c.writeLock.Lock()
496 defer c.writeLock.Unlock()
497 if c.doneCh != nil {
498 close(*c.doneCh)
499 }
500
501 // Clear the sessionID
502 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
503 logger.Errorw("error-closing-client", log.Fields{"error": err})
504 }
505}
506
Neha Sharma87d43d72020-04-08 14:10:40 +0000507func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
divyadesai81bb7ba2020-03-11 11:45:23 +0000508 return nil
509}
510
511func (c *ConsulClient) ReleaseLock(lockName string) error {
512 return nil
513}