blob: c2cd841cd9e51cfddea364999ac7470a9b904dd3 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "bytes"
20 "context"
21 "errors"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 log "github.com/opencord/voltha-lib-go/v3/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070023 "sync"
24 "time"
25 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
35// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
Neha Sharma94f16a92020-06-26 04:17:55 +000047func NewConsulClient(ctx context.Context, addr string, timeout time.Duration) (*ConsulClient, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -070048 config := consulapi.DefaultConfig()
49 config.Address = addr
Neha Sharma130ac6d2020-04-08 08:46:32 +000050 config.WaitTime = timeout
Scott Baker2c1c4822019-10-16 11:02:41 -070051 consul, err := consulapi.NewClient(config)
52 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000053 logger.Error(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -070054 return nil, err
55 }
56
57 doneCh := make(chan int, 1)
58 wChannelsContext := make(map[string][]*channelContextMap)
59 reservations := make(map[string]interface{})
60 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
61}
62
63// IsConnectionUp returns whether the connection to the Consul KV store is up
npujar5bf737f2020-01-16 19:35:25 +053064func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
Neha Sharma94f16a92020-06-26 04:17:55 +000065 logger.Error(ctx, "Unimplemented function")
Scott Baker2c1c4822019-10-16 11:02:41 -070066 return false
67}
68
69// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
70// wait for a response
npujar5bf737f2020-01-16 19:35:25 +053071func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -070072
npujar5bf737f2020-01-16 19:35:25 +053073 deadline, _ := ctx.Deadline()
Scott Baker2c1c4822019-10-16 11:02:41 -070074 kv := c.consul.KV()
75 var queryOptions consulapi.QueryOptions
Neha Sharma130ac6d2020-04-08 08:46:32 +000076 // Substract current time from deadline to get the waitTime duration
77 queryOptions.WaitTime = time.Until(deadline)
78
Scott Baker2c1c4822019-10-16 11:02:41 -070079 // For now we ignore meta data
80 kvps, _, err := kv.List(key, &queryOptions)
81 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000082 logger.Error(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -070083 return nil, err
84 }
85 m := make(map[string]*KVPair)
86 for _, kvp := range kvps {
87 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
88 }
89 return m, nil
90}
91
92// Get returns a key-value pair for a given key. Timeout defines how long the function will
93// wait for a response
npujar5bf737f2020-01-16 19:35:25 +053094func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -070095
npujar5bf737f2020-01-16 19:35:25 +053096 deadline, _ := ctx.Deadline()
Scott Baker2c1c4822019-10-16 11:02:41 -070097 kv := c.consul.KV()
98 var queryOptions consulapi.QueryOptions
Neha Sharma130ac6d2020-04-08 08:46:32 +000099 // Substract current time from deadline to get the waitTime duration
100 queryOptions.WaitTime = time.Until(deadline)
101
Scott Baker2c1c4822019-10-16 11:02:41 -0700102 // For now we ignore meta data
103 kvp, _, err := kv.Get(key, &queryOptions)
104 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000105 logger.Error(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700106 return nil, err
107 }
108 if kvp != nil {
109 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
110 }
111
112 return nil, nil
113}
114
115// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
116// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
117// wait for a response
npujar5bf737f2020-01-16 19:35:25 +0530118func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700119
120 // Validate that we can create a byte array from the value as consul API expects a byte array
121 var val []byte
122 var er error
123 if val, er = ToByte(value); er != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000124 logger.Error(ctx, er)
Scott Baker2c1c4822019-10-16 11:02:41 -0700125 return er
126 }
127
128 // Create a key value pair
129 kvp := consulapi.KVPair{Key: key, Value: val}
130 kv := c.consul.KV()
131 var writeOptions consulapi.WriteOptions
132 c.writeLock.Lock()
133 defer c.writeLock.Unlock()
134 _, err := kv.Put(&kvp, &writeOptions)
135 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000136 logger.Error(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700137 return err
138 }
139 return nil
140}
141
142// Delete removes a key from the KV store. Timeout defines how long the function will
143// wait for a response
npujar5bf737f2020-01-16 19:35:25 +0530144func (c *ConsulClient) Delete(ctx context.Context, key string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700145 kv := c.consul.KV()
146 var writeOptions consulapi.WriteOptions
147 c.writeLock.Lock()
148 defer c.writeLock.Unlock()
149 _, err := kv.Delete(key, &writeOptions)
150 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000151 logger.Error(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700152 return err
153 }
154 return nil
155}
156
Neha Sharma94f16a92020-06-26 04:17:55 +0000157func (c *ConsulClient) deleteSession(ctx context.Context) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700158 if c.sessionID != "" {
Neha Sharma94f16a92020-06-26 04:17:55 +0000159 logger.Debug(ctx, "cleaning-up-session")
Scott Baker2c1c4822019-10-16 11:02:41 -0700160 session := c.consul.Session()
161 _, err := session.Destroy(c.sessionID, nil)
162 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000163 logger.Errorw(ctx, "error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700164 }
165 }
166 c.sessionID = ""
167 c.session = nil
168}
169
Neha Sharma94f16a92020-06-26 04:17:55 +0000170func (c *ConsulClient) createSession(ctx context.Context, ttl time.Duration, retries int) (*consulapi.Session, string, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700171 session := c.consul.Session()
172 entry := &consulapi.SessionEntry{
173 Behavior: consulapi.SessionBehaviorDelete,
Neha Sharma130ac6d2020-04-08 08:46:32 +0000174 TTL: ttl.String(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700175 }
176
177 for {
178 id, meta, err := session.Create(entry, nil)
179 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000180 logger.Errorw(ctx, "create-session-error", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700181 if retries == 0 {
182 return nil, "", err
183 }
184 } else if meta.RequestTime == 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000185 logger.Errorw(ctx, "create-session-bad-meta-data", log.Fields{"meta-data": meta})
Scott Baker2c1c4822019-10-16 11:02:41 -0700186 if retries == 0 {
187 return nil, "", errors.New("bad-meta-data")
188 }
189 } else if id == "" {
Neha Sharma94f16a92020-06-26 04:17:55 +0000190 logger.Error(ctx, "create-session-nil-id")
Scott Baker2c1c4822019-10-16 11:02:41 -0700191 if retries == 0 {
192 return nil, "", errors.New("ID-nil")
193 }
194 } else {
195 return session, id, nil
196 }
197 // If retry param is -1 we will retry indefinitely
198 if retries > 0 {
199 retries--
200 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000201 logger.Debug(ctx, "retrying-session-create-after-a-second-delay")
Scott Baker2c1c4822019-10-16 11:02:41 -0700202 time.Sleep(time.Duration(1) * time.Second)
203 }
204}
205
206// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
207// string types
208func isEqual(val1 interface{}, val2 interface{}) bool {
209 b1, err := ToByte(val1)
210 b2, er := ToByte(val2)
211 if err == nil && er == nil {
212 return bytes.Equal(b1, b2)
213 }
214 return val1 == val2
215}
216
217// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
218// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
219// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
220// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
221// then the value assigned to that key will be returned.
Neha Sharma130ac6d2020-04-08 08:46:32 +0000222func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700223
224 // Validate that we can create a byte array from the value as consul API expects a byte array
225 var val []byte
226 var er error
227 if val, er = ToByte(value); er != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000228 logger.Error(ctx, er)
Scott Baker2c1c4822019-10-16 11:02:41 -0700229 return nil, er
230 }
231
232 // Cleanup any existing session and recreate new ones. A key is reserved against a session
233 if c.sessionID != "" {
Neha Sharma94f16a92020-06-26 04:17:55 +0000234 c.deleteSession(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700235 }
236
237 // Clear session if reservation is not successful
238 reservationSuccessful := false
239 defer func() {
240 if !reservationSuccessful {
Neha Sharma94f16a92020-06-26 04:17:55 +0000241 logger.Debug(ctx, "deleting-session")
242 c.deleteSession(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700243 }
244 }()
245
Neha Sharma94f16a92020-06-26 04:17:55 +0000246 session, sessionID, err := c.createSession(ctx, ttl, -1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700247 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000248 logger.Errorw(ctx, "no-session-created", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700249 return "", errors.New("no-session-created")
250 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000251 logger.Debugw(ctx, "session-created", log.Fields{"session-id": sessionID})
Scott Baker2c1c4822019-10-16 11:02:41 -0700252 c.sessionID = sessionID
253 c.session = session
254
255 // Try to grap the Key using the session
256 kv := c.consul.KV()
257 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
258 result, _, err := kv.Acquire(&kvp, nil)
259 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000260 logger.Errorw(ctx, "error-acquiring-keys", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700261 return nil, err
262 }
263
Neha Sharma94f16a92020-06-26 04:17:55 +0000264 logger.Debugw(ctx, "key-acquired", log.Fields{"key": key, "status": result})
Scott Baker2c1c4822019-10-16 11:02:41 -0700265
266 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
npujar5bf737f2020-01-16 19:35:25 +0530267 m, err := c.Get(ctx, key)
Scott Baker2c1c4822019-10-16 11:02:41 -0700268 if err != nil {
269 return nil, err
270 }
271 if m != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000272 logger.Debugw(ctx, "response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700273 if m.Key == key && isEqual(m.Value, value) {
274 // My reservation is successful - register it. For now, support is only for 1 reservation per key
275 // per session.
276 reservationSuccessful = true
277 c.writeLock.Lock()
278 c.keyReservations[key] = m.Value
279 c.writeLock.Unlock()
280 return m.Value, nil
281 }
282 // My reservation has failed. Return the owner of that key
283 return m.Value, nil
284 }
285 return nil, nil
286}
287
288// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
npujar5bf737f2020-01-16 19:35:25 +0530289func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 kv := c.consul.KV()
291 var kvp consulapi.KVPair
292 var result bool
293 var err error
294
295 c.writeLock.Lock()
296 defer c.writeLock.Unlock()
297
298 for key, value := range c.keyReservations {
299 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
300 result, _, err = kv.Release(&kvp, nil)
301 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000302 logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700303 return err
304 }
305 if !result {
Neha Sharma94f16a92020-06-26 04:17:55 +0000306 logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700307 }
308 delete(c.keyReservations, key)
309 }
310 return nil
311}
312
313// ReleaseReservation releases reservation for a specific key.
npujar5bf737f2020-01-16 19:35:25 +0530314func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700315 var ok bool
316 var reservedValue interface{}
317 c.writeLock.Lock()
318 defer c.writeLock.Unlock()
319 if reservedValue, ok = c.keyReservations[key]; !ok {
320 return errors.New("key-not-reserved:" + key)
321 }
322 // Release the reservation
323 kv := c.consul.KV()
324 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
325
326 result, _, er := kv.Release(&kvp, nil)
327 if er != nil {
328 return er
329 }
330 // Remove that key entry on success
331 if result {
332 delete(c.keyReservations, key)
333 return nil
334 }
335 return errors.New("key-cannot-be-unreserved")
336}
337
338// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
339// period specified when reserving the key
npujar5bf737f2020-01-16 19:35:25 +0530340func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
342
343 c.writeLock.Lock()
344 defer c.writeLock.Unlock()
345
346 // Verify the key was reserved
347 if _, ok := c.keyReservations[key]; !ok {
348 return errors.New("key-not-reserved")
349 }
350
351 if c.session == nil {
352 return errors.New("no-session-exist")
353 }
354
355 var writeOptions consulapi.WriteOptions
356 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
357 return err
358 }
359 return nil
360}
361
362// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
363// listen to receive Events.
divyadesai8bf96862020-02-07 12:24:26 +0000364func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
Scott Baker2c1c4822019-10-16 11:02:41 -0700365
366 // Create a new channel
367 ch := make(chan *Event, maxClientChannelBufferSize)
368
369 // Create a context to track this request
370 watchContext, cFunc := context.WithCancel(context.Background())
371
372 // Save the channel and context reference for later
373 c.writeLock.Lock()
374 defer c.writeLock.Unlock()
375 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
376 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
377
378 // Launch a go routine to listen for updates
379 go c.listenForKeyChange(watchContext, key, ch)
380
381 return ch
382}
383
384// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
385// may be multiple listeners on the same key. The previously created channel serves as a key
Neha Sharma94f16a92020-06-26 04:17:55 +0000386func (c *ConsulClient) CloseWatch(ctx context.Context, key string, ch chan *Event) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700387 // First close the context
388 var ok bool
389 var watchedChannelsContexts []*channelContextMap
390 c.writeLock.Lock()
391 defer c.writeLock.Unlock()
392 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000393 logger.Errorw(ctx, "key-has-no-watched-context-or-channel", log.Fields{"key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700394 return
395 }
396 // Look for the channels
397 var pos = -1
398 for i, chCtxMap := range watchedChannelsContexts {
399 if chCtxMap.channel == ch {
Neha Sharma94f16a92020-06-26 04:17:55 +0000400 logger.Debug(ctx, "channel-found")
Scott Baker2c1c4822019-10-16 11:02:41 -0700401 chCtxMap.cancel()
402 //close the channel
403 close(ch)
404 pos = i
405 break
406 }
407 }
408 // Remove that entry if present
409 if pos >= 0 {
410 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
411 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000412 logger.Debugw(ctx, "watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
Scott Baker2c1c4822019-10-16 11:02:41 -0700413}
414
415func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
416 if (kv1 == nil) && (kv2 == nil) {
417 return true
418 } else if (kv1 == nil) || (kv2 == nil) {
419 return false
420 }
421 // Both the KV should be non-null here
422 if kv1.Key != kv2.Key ||
423 !bytes.Equal(kv1.Value, kv2.Value) ||
424 kv1.Session != kv2.Session ||
425 kv1.LockIndex != kv2.LockIndex ||
426 kv1.ModifyIndex != kv2.ModifyIndex {
427 return false
428 }
429 return true
430}
431
Neha Sharma94f16a92020-06-26 04:17:55 +0000432func (c *ConsulClient) listenForKeyChange(ctx context.Context, key string, ch chan *Event) {
433 logger.Debugw(ctx, "start-watching-channel", log.Fields{"key": key, "channel": ch})
Scott Baker2c1c4822019-10-16 11:02:41 -0700434
Neha Sharma94f16a92020-06-26 04:17:55 +0000435 defer c.CloseWatch(ctx, key, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700436 kv := c.consul.KV()
437 var queryOptions consulapi.QueryOptions
Neha Sharma130ac6d2020-04-08 08:46:32 +0000438 queryOptions.WaitTime = defaultKVGetTimeout
Scott Baker2c1c4822019-10-16 11:02:41 -0700439
440 // Get the existing value, if any
441 previousKVPair, meta, err := kv.Get(key, &queryOptions)
442 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000443 logger.Debug(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700444 }
445 lastIndex := meta.LastIndex
446
447 // Wait for change. Push any change onto the channel and keep waiting for new update
448 //var waitOptions consulapi.QueryOptions
449 var pair *consulapi.KVPair
450 //watchContext, _ := context.WithCancel(context.Background())
Neha Sharma94f16a92020-06-26 04:17:55 +0000451 waitOptions := queryOptions.WithContext(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700452 for {
453 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
454 waitOptions.WaitIndex = lastIndex
455 pair, meta, err = kv.Get(key, waitOptions)
456 select {
Neha Sharma94f16a92020-06-26 04:17:55 +0000457 case <-ctx.Done():
458 logger.Debug(ctx, "done-event-received-exiting")
Scott Baker2c1c4822019-10-16 11:02:41 -0700459 return
460 default:
461 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000462 logger.Warnw(ctx, "error-from-watch", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700463 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
464 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000465 logger.Debugw(ctx, "index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700466 }
467 }
468 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000469 logger.Debug(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700470 // On error, block for 10 milliseconds to prevent endless loop
471 time.Sleep(10 * time.Millisecond)
472 } else if meta.LastIndex <= lastIndex {
Neha Sharma94f16a92020-06-26 04:17:55 +0000473 logger.Info(ctx, "no-index-change-or-negative")
Scott Baker2c1c4822019-10-16 11:02:41 -0700474 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000475 logger.Debugw(ctx, "update-received", log.Fields{"pair": pair})
Scott Baker2c1c4822019-10-16 11:02:41 -0700476 if pair == nil {
477 ch <- NewEvent(DELETE, key, []byte(""), -1)
478 } else if !c.isKVEqual(pair, previousKVPair) {
479 // Push the change onto the channel if the data has changed
480 // For now just assume it's a PUT change
Neha Sharma94f16a92020-06-26 04:17:55 +0000481 logger.Debugw(ctx, "pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700482 ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
483 }
484 previousKVPair = pair
485 lastIndex = meta.LastIndex
486 }
487 }
488}
489
490// Close closes the KV store client
Neha Sharma94f16a92020-06-26 04:17:55 +0000491func (c *ConsulClient) Close(ctx context.Context) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700492 var writeOptions consulapi.WriteOptions
493 // Inform any goroutine it's time to say goodbye.
494 c.writeLock.Lock()
495 defer c.writeLock.Unlock()
496 if c.doneCh != nil {
497 close(*c.doneCh)
498 }
499
500 // Clear the sessionID
501 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000502 logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700503 }
504}
505
Neha Sharma130ac6d2020-04-08 08:46:32 +0000506func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700507 return nil
508}
509
510func (c *ConsulClient) ReleaseLock(lockName string) error {
511 return nil
512}