blob: c3e3999e1d4ca789f96a86508631ecdbc6490119 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "bytes"
20 "context"
21 "errors"
Scott Bakere73f91e2019-10-17 12:58:11 -070022 log "github.com/opencord/voltha-lib-go/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070023 "sync"
24 "time"
25 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
35// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
47func NewConsulClient(addr string, timeout int) (*ConsulClient, error) {
48
49 duration := GetDuration(timeout)
50
51 config := consulapi.DefaultConfig()
52 config.Address = addr
53 config.WaitTime = duration
54 consul, err := consulapi.NewClient(config)
55 if err != nil {
56 log.Error(err)
57 return nil, err
58 }
59
60 doneCh := make(chan int, 1)
61 wChannelsContext := make(map[string][]*channelContextMap)
62 reservations := make(map[string]interface{})
63 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
64}
65
66// IsConnectionUp returns whether the connection to the Consul KV store is up
67func (c *ConsulClient) IsConnectionUp(timeout int) bool {
68 log.Error("Unimplemented function")
69 return false
70}
71
72// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
73// wait for a response
74func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
75 duration := GetDuration(timeout)
76
77 kv := c.consul.KV()
78 var queryOptions consulapi.QueryOptions
79 queryOptions.WaitTime = duration
80 // For now we ignore meta data
81 kvps, _, err := kv.List(key, &queryOptions)
82 if err != nil {
83 log.Error(err)
84 return nil, err
85 }
86 m := make(map[string]*KVPair)
87 for _, kvp := range kvps {
88 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
89 }
90 return m, nil
91}
92
93// Get returns a key-value pair for a given key. Timeout defines how long the function will
94// wait for a response
95func (c *ConsulClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
96
97 duration := GetDuration(timeout)
98
99 kv := c.consul.KV()
100 var queryOptions consulapi.QueryOptions
101 queryOptions.WaitTime = duration
102 // For now we ignore meta data
103 kvp, _, err := kv.Get(key, &queryOptions)
104 if err != nil {
105 log.Error(err)
106 return nil, err
107 }
108 if kvp != nil {
109 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
110 }
111
112 return nil, nil
113}
114
115// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
116// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
117// wait for a response
118func (c *ConsulClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
119
120 // Validate that we can create a byte array from the value as consul API expects a byte array
121 var val []byte
122 var er error
123 if val, er = ToByte(value); er != nil {
124 log.Error(er)
125 return er
126 }
127
128 // Create a key value pair
129 kvp := consulapi.KVPair{Key: key, Value: val}
130 kv := c.consul.KV()
131 var writeOptions consulapi.WriteOptions
132 c.writeLock.Lock()
133 defer c.writeLock.Unlock()
134 _, err := kv.Put(&kvp, &writeOptions)
135 if err != nil {
136 log.Error(err)
137 return err
138 }
139 return nil
140}
141
142// Delete removes a key from the KV store. Timeout defines how long the function will
143// wait for a response
144func (c *ConsulClient) Delete(key string, timeout int, lock ...bool) error {
145 kv := c.consul.KV()
146 var writeOptions consulapi.WriteOptions
147 c.writeLock.Lock()
148 defer c.writeLock.Unlock()
149 _, err := kv.Delete(key, &writeOptions)
150 if err != nil {
151 log.Error(err)
152 return err
153 }
154 return nil
155}
156
157func (c *ConsulClient) deleteSession() {
158 if c.sessionID != "" {
159 log.Debug("cleaning-up-session")
160 session := c.consul.Session()
161 _, err := session.Destroy(c.sessionID, nil)
162 if err != nil {
163 log.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
164 }
165 }
166 c.sessionID = ""
167 c.session = nil
168}
169
170func (c *ConsulClient) createSession(ttl int64, retries int) (*consulapi.Session, string, error) {
171 session := c.consul.Session()
172 entry := &consulapi.SessionEntry{
173 Behavior: consulapi.SessionBehaviorDelete,
174 TTL: "10s", // strconv.FormatInt(ttl, 10) + "s", // disable ttl
175 }
176
177 for {
178 id, meta, err := session.Create(entry, nil)
179 if err != nil {
180 log.Errorw("create-session-error", log.Fields{"error": err})
181 if retries == 0 {
182 return nil, "", err
183 }
184 } else if meta.RequestTime == 0 {
185 log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
186 if retries == 0 {
187 return nil, "", errors.New("bad-meta-data")
188 }
189 } else if id == "" {
190 log.Error("create-session-nil-id")
191 if retries == 0 {
192 return nil, "", errors.New("ID-nil")
193 }
194 } else {
195 return session, id, nil
196 }
197 // If retry param is -1 we will retry indefinitely
198 if retries > 0 {
199 retries--
200 }
201 log.Debug("retrying-session-create-after-a-second-delay")
202 time.Sleep(time.Duration(1) * time.Second)
203 }
204}
205
206// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
207// string types
208func isEqual(val1 interface{}, val2 interface{}) bool {
209 b1, err := ToByte(val1)
210 b2, er := ToByte(val2)
211 if err == nil && er == nil {
212 return bytes.Equal(b1, b2)
213 }
214 return val1 == val2
215}
216
217// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
218// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
219// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
220// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
221// then the value assigned to that key will be returned.
222func (c *ConsulClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
223
224 // Validate that we can create a byte array from the value as consul API expects a byte array
225 var val []byte
226 var er error
227 if val, er = ToByte(value); er != nil {
228 log.Error(er)
229 return nil, er
230 }
231
232 // Cleanup any existing session and recreate new ones. A key is reserved against a session
233 if c.sessionID != "" {
234 c.deleteSession()
235 }
236
237 // Clear session if reservation is not successful
238 reservationSuccessful := false
239 defer func() {
240 if !reservationSuccessful {
241 log.Debug("deleting-session")
242 c.deleteSession()
243 }
244 }()
245
246 session, sessionID, err := c.createSession(ttl, -1)
247 if err != nil {
248 log.Errorw("no-session-created", log.Fields{"error": err})
249 return "", errors.New("no-session-created")
250 }
251 log.Debugw("session-created", log.Fields{"session-id": sessionID})
252 c.sessionID = sessionID
253 c.session = session
254
255 // Try to grap the Key using the session
256 kv := c.consul.KV()
257 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
258 result, _, err := kv.Acquire(&kvp, nil)
259 if err != nil {
260 log.Errorw("error-acquiring-keys", log.Fields{"error": err})
261 return nil, err
262 }
263
264 log.Debugw("key-acquired", log.Fields{"key": key, "status": result})
265
266 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
267 m, err := c.Get(key, defaultKVGetTimeout)
268 if err != nil {
269 return nil, err
270 }
271 if m != nil {
272 log.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
273 if m.Key == key && isEqual(m.Value, value) {
274 // My reservation is successful - register it. For now, support is only for 1 reservation per key
275 // per session.
276 reservationSuccessful = true
277 c.writeLock.Lock()
278 c.keyReservations[key] = m.Value
279 c.writeLock.Unlock()
280 return m.Value, nil
281 }
282 // My reservation has failed. Return the owner of that key
283 return m.Value, nil
284 }
285 return nil, nil
286}
287
288// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
289func (c *ConsulClient) ReleaseAllReservations() error {
290 kv := c.consul.KV()
291 var kvp consulapi.KVPair
292 var result bool
293 var err error
294
295 c.writeLock.Lock()
296 defer c.writeLock.Unlock()
297
298 for key, value := range c.keyReservations {
299 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
300 result, _, err = kv.Release(&kvp, nil)
301 if err != nil {
302 log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
303 return err
304 }
305 if !result {
306 log.Errorw("cannot-release-reservation", log.Fields{"key": key})
307 }
308 delete(c.keyReservations, key)
309 }
310 return nil
311}
312
313// ReleaseReservation releases reservation for a specific key.
314func (c *ConsulClient) ReleaseReservation(key string) error {
315 var ok bool
316 var reservedValue interface{}
317 c.writeLock.Lock()
318 defer c.writeLock.Unlock()
319 if reservedValue, ok = c.keyReservations[key]; !ok {
320 return errors.New("key-not-reserved:" + key)
321 }
322 // Release the reservation
323 kv := c.consul.KV()
324 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
325
326 result, _, er := kv.Release(&kvp, nil)
327 if er != nil {
328 return er
329 }
330 // Remove that key entry on success
331 if result {
332 delete(c.keyReservations, key)
333 return nil
334 }
335 return errors.New("key-cannot-be-unreserved")
336}
337
338// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
339// period specified when reserving the key
340func (c *ConsulClient) RenewReservation(key string) error {
341 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
342
343 c.writeLock.Lock()
344 defer c.writeLock.Unlock()
345
346 // Verify the key was reserved
347 if _, ok := c.keyReservations[key]; !ok {
348 return errors.New("key-not-reserved")
349 }
350
351 if c.session == nil {
352 return errors.New("no-session-exist")
353 }
354
355 var writeOptions consulapi.WriteOptions
356 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
357 return err
358 }
359 return nil
360}
361
362// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
363// listen to receive Events.
364func (c *ConsulClient) Watch(key string) chan *Event {
365
366 // Create a new channel
367 ch := make(chan *Event, maxClientChannelBufferSize)
368
369 // Create a context to track this request
370 watchContext, cFunc := context.WithCancel(context.Background())
371
372 // Save the channel and context reference for later
373 c.writeLock.Lock()
374 defer c.writeLock.Unlock()
375 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
376 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
377
378 // Launch a go routine to listen for updates
379 go c.listenForKeyChange(watchContext, key, ch)
380
381 return ch
382}
383
384// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
385// may be multiple listeners on the same key. The previously created channel serves as a key
386func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
387 // First close the context
388 var ok bool
389 var watchedChannelsContexts []*channelContextMap
390 c.writeLock.Lock()
391 defer c.writeLock.Unlock()
392 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
393 log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
394 return
395 }
396 // Look for the channels
397 var pos = -1
398 for i, chCtxMap := range watchedChannelsContexts {
399 if chCtxMap.channel == ch {
400 log.Debug("channel-found")
401 chCtxMap.cancel()
402 //close the channel
403 close(ch)
404 pos = i
405 break
406 }
407 }
408 // Remove that entry if present
409 if pos >= 0 {
410 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
411 }
412 log.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
413}
414
415func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
416 if (kv1 == nil) && (kv2 == nil) {
417 return true
418 } else if (kv1 == nil) || (kv2 == nil) {
419 return false
420 }
421 // Both the KV should be non-null here
422 if kv1.Key != kv2.Key ||
423 !bytes.Equal(kv1.Value, kv2.Value) ||
424 kv1.Session != kv2.Session ||
425 kv1.LockIndex != kv2.LockIndex ||
426 kv1.ModifyIndex != kv2.ModifyIndex {
427 return false
428 }
429 return true
430}
431
432func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
433 log.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
434
435 defer c.CloseWatch(key, ch)
436 duration := GetDuration(defaultKVGetTimeout)
437 kv := c.consul.KV()
438 var queryOptions consulapi.QueryOptions
439 queryOptions.WaitTime = duration
440
441 // Get the existing value, if any
442 previousKVPair, meta, err := kv.Get(key, &queryOptions)
443 if err != nil {
444 log.Debug(err)
445 }
446 lastIndex := meta.LastIndex
447
448 // Wait for change. Push any change onto the channel and keep waiting for new update
449 //var waitOptions consulapi.QueryOptions
450 var pair *consulapi.KVPair
451 //watchContext, _ := context.WithCancel(context.Background())
452 waitOptions := queryOptions.WithContext(watchContext)
453 for {
454 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
455 waitOptions.WaitIndex = lastIndex
456 pair, meta, err = kv.Get(key, waitOptions)
457 select {
458 case <-watchContext.Done():
459 log.Debug("done-event-received-exiting")
460 return
461 default:
462 if err != nil {
463 log.Warnw("error-from-watch", log.Fields{"error": err})
464 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
465 } else {
466 log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
467 }
468 }
469 if err != nil {
470 log.Debug(err)
471 // On error, block for 10 milliseconds to prevent endless loop
472 time.Sleep(10 * time.Millisecond)
473 } else if meta.LastIndex <= lastIndex {
474 log.Info("no-index-change-or-negative")
475 } else {
476 log.Debugw("update-received", log.Fields{"pair": pair})
477 if pair == nil {
478 ch <- NewEvent(DELETE, key, []byte(""), -1)
479 } else if !c.isKVEqual(pair, previousKVPair) {
480 // Push the change onto the channel if the data has changed
481 // For now just assume it's a PUT change
482 log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
483 ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
484 }
485 previousKVPair = pair
486 lastIndex = meta.LastIndex
487 }
488 }
489}
490
491// Close closes the KV store client
492func (c *ConsulClient) Close() {
493 var writeOptions consulapi.WriteOptions
494 // Inform any goroutine it's time to say goodbye.
495 c.writeLock.Lock()
496 defer c.writeLock.Unlock()
497 if c.doneCh != nil {
498 close(*c.doneCh)
499 }
500
501 // Clear the sessionID
502 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
503 log.Errorw("error-closing-client", log.Fields{"error": err})
504 }
505}
506
507func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
508 return nil
509}
510
511func (c *ConsulClient) ReleaseLock(lockName string) error {
512 return nil
513}