blob: e4f6baf61931cd41ff86984d521e6db086046417 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package kvstore
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "bytes"
khenaidoocfee5f42018-07-19 22:47:38 -040020 "context"
21 "errors"
khenaidoo5c11af72018-07-20 17:21:05 -040022 log "github.com/opencord/voltha-go/common/log"
khenaidoocfee5f42018-07-19 22:47:38 -040023 "sync"
24 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040025 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
khenaidoocfee5f42018-07-19 22:47:38 -040035// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
47func NewConsulClient(addr string, timeout int) (*ConsulClient, error) {
48
49 duration := GetDuration(timeout)
50
51 config := consulapi.DefaultConfig()
52 config.Address = addr
53 config.WaitTime = duration
54 consul, err := consulapi.NewClient(config)
55 if err != nil {
56 log.Error(err)
57 return nil, err
58 }
59
60 doneCh := make(chan int, 1)
61 wChannelsContext := make(map[string][]*channelContextMap)
62 reservations := make(map[string]interface{})
63 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
64}
65
66// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
67// wait for a response
68func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
69 duration := GetDuration(timeout)
70
71 kv := c.consul.KV()
72 var queryOptions consulapi.QueryOptions
73 queryOptions.WaitTime = duration
74 // For now we ignore meta data
75 kvps, _, err := kv.List(key, &queryOptions)
76 if err != nil {
77 log.Error(err)
78 return nil, err
79 }
80 m := make(map[string]*KVPair)
81 for _, kvp := range kvps {
82 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0)
83 }
84 return m, nil
85}
86
87// Get returns a key-value pair for a given key. Timeout defines how long the function will
88// wait for a response
89func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
90
91 duration := GetDuration(timeout)
92
93 kv := c.consul.KV()
94 var queryOptions consulapi.QueryOptions
95 queryOptions.WaitTime = duration
96 // For now we ignore meta data
97 kvp, _, err := kv.Get(key, &queryOptions)
98 if err != nil {
99 log.Error(err)
100 return nil, err
101 }
102 if kvp != nil {
103 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0), nil
104 }
105
106 return nil, nil
107}
108
109// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
110// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
111// wait for a response
112func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
113
114 // Validate that we can create a byte array from the value as consul API expects a byte array
115 var val []byte
116 var er error
117 if val, er = ToByte(value); er != nil {
118 log.Error(er)
119 return er
120 }
121
122 // Create a key value pair
123 kvp := consulapi.KVPair{Key: key, Value: val}
124 kv := c.consul.KV()
125 var writeOptions consulapi.WriteOptions
126 c.writeLock.Lock()
127 defer c.writeLock.Unlock()
128 _, err := kv.Put(&kvp, &writeOptions)
129 if err != nil {
130 log.Error(err)
131 return err
132 }
133 return nil
134}
135
136// Delete removes a key from the KV store. Timeout defines how long the function will
137// wait for a response
138func (c *ConsulClient) Delete(key string, timeout int) error {
139 kv := c.consul.KV()
140 var writeOptions consulapi.WriteOptions
141 c.writeLock.Lock()
142 defer c.writeLock.Unlock()
143 _, err := kv.Delete(key, &writeOptions)
144 if err != nil {
145 log.Error(err)
146 return err
147 }
148 return nil
149}
150
151func (c *ConsulClient) deleteSession() {
152 if c.sessionID != "" {
153 log.Debug("cleaning-up-session")
154 session := c.consul.Session()
155 _, err := session.Destroy(c.sessionID, nil)
156 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400157 log.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400158 }
159 }
160 c.sessionID = ""
161 c.session = nil
162}
163
164func (c *ConsulClient) createSession(ttl int64, retries int) (*consulapi.Session, string, error) {
165 session := c.consul.Session()
166 entry := &consulapi.SessionEntry{
167 Behavior: consulapi.SessionBehaviorDelete,
168 TTL: "10s", // strconv.FormatInt(ttl, 10) + "s", // disable ttl
169 }
170
171 for {
172 id, meta, err := session.Create(entry, nil)
173 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400174 log.Errorw("create-session-error", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400175 if retries == 0 {
176 return nil, "", err
177 }
178 } else if meta.RequestTime == 0 {
khenaidoo5c11af72018-07-20 17:21:05 -0400179 log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
khenaidoocfee5f42018-07-19 22:47:38 -0400180 if retries == 0 {
181 return nil, "", errors.New("bad-meta-data")
182 }
183 } else if id == "" {
184 log.Error("create-session-nil-id")
185 if retries == 0 {
186 return nil, "", errors.New("ID-nil")
187 }
188 } else {
189 return session, id, nil
190 }
191 // If retry param is -1 we will retry indefinitely
192 if retries > 0 {
193 retries--
194 }
195 log.Debug("retrying-session-create-after-a-second-delay")
196 time.Sleep(time.Duration(1) * time.Second)
197 }
198}
199
khenaidoocfee5f42018-07-19 22:47:38 -0400200// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
201// string types
202func isEqual(val1 interface{}, val2 interface{}) bool {
203 b1, err := ToByte(val1)
204 b2, er := ToByte(val2)
205 if err == nil && er == nil {
206 return bytes.Equal(b1, b2)
207 }
208 return val1 == val2
209}
210
211// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
212// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
213// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
214// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
215// then the value assigned to that key will be returned.
216func (c *ConsulClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
217
218 // Validate that we can create a byte array from the value as consul API expects a byte array
219 var val []byte
220 var er error
221 if val, er = ToByte(value); er != nil {
222 log.Error(er)
223 return nil, er
224 }
225
226 // Cleanup any existing session and recreate new ones. A key is reserved against a session
227 if c.sessionID != "" {
228 c.deleteSession()
229 }
230
231 // Clear session if reservation is not successful
232 reservationSuccessful := false
233 defer func() {
234 if !reservationSuccessful {
235 log.Debug("deleting-session")
236 c.deleteSession()
237 }
238 }()
239
240 session, sessionID, err := c.createSession(ttl, -1)
241 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400242 log.Errorw("no-session-created", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400243 return "", errors.New("no-session-created")
244 }
khenaidoo5c11af72018-07-20 17:21:05 -0400245 log.Debugw("session-created", log.Fields{"session-id": sessionID})
khenaidoocfee5f42018-07-19 22:47:38 -0400246 c.sessionID = sessionID
247 c.session = session
248
249 // Try to grap the Key using the session
250 kv := c.consul.KV()
251 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
252 result, _, err := kv.Acquire(&kvp, nil)
253 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400254 log.Errorw("error-acquiring-keys", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400255 return nil, err
256 }
257
khenaidoo5c11af72018-07-20 17:21:05 -0400258 log.Debugw("key-acquired", log.Fields{"key": key, "status": result})
khenaidoocfee5f42018-07-19 22:47:38 -0400259
260 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
261 m, err := c.Get(key, defaultKVGetTimeout)
262 if err != nil {
263 return nil, err
264 }
265 if m != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400266 log.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
khenaidoocfee5f42018-07-19 22:47:38 -0400267 if m.Key == key && isEqual(m.Value, value) {
268 // My reservation is successful - register it. For now, support is only for 1 reservation per key
269 // per session.
270 reservationSuccessful = true
271 c.writeLock.Lock()
272 c.keyReservations[key] = m.Value
273 c.writeLock.Unlock()
274 return m.Value, nil
275 }
276 // My reservation has failed. Return the owner of that key
277 return m.Value, nil
278 }
279 return nil, nil
280}
281
282// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
283func (c *ConsulClient) ReleaseAllReservations() error {
284 kv := c.consul.KV()
285 var kvp consulapi.KVPair
286 var result bool
287 var err error
288
289 c.writeLock.Lock()
290 defer c.writeLock.Unlock()
291
292 for key, value := range c.keyReservations {
293 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
294 result, _, err = kv.Release(&kvp, nil)
295 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400296 log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400297 return err
298 }
299 if !result {
khenaidoo5c11af72018-07-20 17:21:05 -0400300 log.Errorw("cannot-release-reservation", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400301 }
302 delete(c.keyReservations, key)
303 }
304 return nil
305}
306
307// ReleaseReservation releases reservation for a specific key.
308func (c *ConsulClient) ReleaseReservation(key string) error {
309 var ok bool
310 var reservedValue interface{}
311 c.writeLock.Lock()
312 defer c.writeLock.Unlock()
313 if reservedValue, ok = c.keyReservations[key]; !ok {
314 return errors.New("key-not-reserved:" + key)
315 }
316 // Release the reservation
317 kv := c.consul.KV()
318 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
319
320 result, _, er := kv.Release(&kvp, nil)
321 if er != nil {
322 return er
323 }
324 // Remove that key entry on success
325 if result {
326 delete(c.keyReservations, key)
327 return nil
328 }
329 return errors.New("key-cannot-be-unreserved")
330}
331
332// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
333// period specified when reserving the key
334func (c *ConsulClient) RenewReservation(key string) error {
335 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
336
337 c.writeLock.Lock()
338 defer c.writeLock.Unlock()
339
340 // Verify the key was reserved
341 if _, ok := c.keyReservations[key]; !ok {
342 return errors.New("key-not-reserved")
343 }
344
345 if c.session == nil {
346 return errors.New("no-session-exist")
347 }
348
349 var writeOptions consulapi.WriteOptions
350 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
351 return err
352 }
353 return nil
354}
355
356// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
357// listen to receive Events.
358func (c *ConsulClient) Watch(key string) chan *Event {
359
360 // Create a new channel
361 ch := make(chan *Event, maxClientChannelBufferSize)
362
363 // Create a context to track this request
364 watchContext, cFunc := context.WithCancel(context.Background())
365
366 // Save the channel and context reference for later
367 c.writeLock.Lock()
368 defer c.writeLock.Unlock()
369 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
370 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
371
372 // Launch a go routine to listen for updates
373 go c.listenForKeyChange(watchContext, key, ch)
374
375 return ch
376}
377
378// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
379// may be multiple listeners on the same key. The previously created channel serves as a key
380func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
381 // First close the context
382 var ok bool
383 var watchedChannelsContexts []*channelContextMap
384 c.writeLock.Lock()
385 defer c.writeLock.Unlock()
386 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
khenaidoo5c11af72018-07-20 17:21:05 -0400387 log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400388 return
389 }
390 // Look for the channels
391 var pos = -1
392 for i, chCtxMap := range watchedChannelsContexts {
393 if chCtxMap.channel == ch {
394 log.Debug("channel-found")
395 chCtxMap.cancel()
396 //close the channel
397 close(ch)
398 pos = i
399 break
400 }
401 }
402 // Remove that entry if present
403 if pos >= 0 {
404 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
405 }
khenaidoo5c11af72018-07-20 17:21:05 -0400406 log.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400407}
408
409func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
410 if (kv1 == nil) && (kv2 == nil) {
411 return true
412 } else if (kv1 == nil) || (kv2 == nil) {
413 return false
414 }
415 // Both the KV should be non-null here
416 if kv1.Key != kv2.Key ||
417 !bytes.Equal(kv1.Value, kv2.Value) ||
418 kv1.Session != kv2.Session ||
419 kv1.LockIndex != kv2.LockIndex ||
420 kv1.ModifyIndex != kv2.ModifyIndex {
421 return false
422 }
423 return true
424}
425
426func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
khenaidoo5c11af72018-07-20 17:21:05 -0400427 log.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
khenaidoocfee5f42018-07-19 22:47:38 -0400428
429 defer c.CloseWatch(key, ch)
430 duration := GetDuration(defaultKVGetTimeout)
431 kv := c.consul.KV()
432 var queryOptions consulapi.QueryOptions
433 queryOptions.WaitTime = duration
434
435 // Get the existing value, if any
436 previousKVPair, meta, err := kv.Get(key, &queryOptions)
437 if err != nil {
438 log.Debug(err)
439 }
440 lastIndex := meta.LastIndex
441
442 // Wait for change. Push any change onto the channel and keep waiting for new update
443 //var waitOptions consulapi.QueryOptions
444 var pair *consulapi.KVPair
445 //watchContext, _ := context.WithCancel(context.Background())
446 waitOptions := queryOptions.WithContext(watchContext)
447 for {
448 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
449 waitOptions.WaitIndex = lastIndex
450 pair, meta, err = kv.Get(key, waitOptions)
451 select {
452 case <-watchContext.Done():
453 log.Debug("done-event-received-exiting")
454 return
455 default:
456 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400457 log.Warnw("error-from-watch", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400458 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
459 } else {
khenaidoo5c11af72018-07-20 17:21:05 -0400460 log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400461 }
462 }
463 if err != nil {
464 log.Debug(err)
465 // On error, block for 10 milliseconds to prevent endless loop
466 time.Sleep(10 * time.Millisecond)
467 } else if meta.LastIndex <= lastIndex {
468 log.Info("no-index-change-or-negative")
469 } else {
khenaidoo5c11af72018-07-20 17:21:05 -0400470 log.Debugw("update-received", log.Fields{"pair": pair})
khenaidoocfee5f42018-07-19 22:47:38 -0400471 if pair == nil {
472 ch <- NewEvent(DELETE, key, []byte(""))
473 } else if !c.isKVEqual(pair, previousKVPair) {
474 // Push the change onto the channel if the data has changed
475 // For now just assume it's a PUT change
khenaidoo5c11af72018-07-20 17:21:05 -0400476 log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
khenaidoocfee5f42018-07-19 22:47:38 -0400477 ch <- NewEvent(PUT, pair.Key, pair.Value)
478 }
479 previousKVPair = pair
480 lastIndex = meta.LastIndex
481 }
482 }
483}
484
485// Close closes the KV store client
486func (c *ConsulClient) Close() {
487 var writeOptions consulapi.WriteOptions
488 // Inform any goroutine it's time to say goodbye.
489 c.writeLock.Lock()
490 defer c.writeLock.Unlock()
491 if c.doneCh != nil {
492 close(*c.doneCh)
493 }
494
495 // Clear the sessionID
496 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400497 log.Errorw("error-closing-client", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400498 }
499}