blob: fdf39be67d92c7dd0c2c87a4772ce39c3304c6b5 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package kvstore
17
18import (
khenaidoo5c11af72018-07-20 17:21:05 -040019 "bytes"
khenaidoocfee5f42018-07-19 22:47:38 -040020 "context"
21 "errors"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 log "github.com/opencord/voltha-lib-go/v3/pkg/log"
khenaidoocfee5f42018-07-19 22:47:38 -040023 "sync"
24 "time"
khenaidoocfee5f42018-07-19 22:47:38 -040025 //log "ciena.com/coordinator/common"
26 consulapi "github.com/hashicorp/consul/api"
27)
28
29type channelContextMap struct {
30 ctx context.Context
31 channel chan *Event
32 cancel context.CancelFunc
33}
34
khenaidoocfee5f42018-07-19 22:47:38 -040035// ConsulClient represents the consul KV store client
36type ConsulClient struct {
37 session *consulapi.Session
38 sessionID string
39 consul *consulapi.Client
40 doneCh *chan int
41 keyReservations map[string]interface{}
42 watchedChannelsContext map[string][]*channelContextMap
43 writeLock sync.Mutex
44}
45
46// NewConsulClient returns a new client for the Consul KV store
47func NewConsulClient(addr string, timeout int) (*ConsulClient, error) {
48
49 duration := GetDuration(timeout)
50
51 config := consulapi.DefaultConfig()
52 config.Address = addr
53 config.WaitTime = duration
54 consul, err := consulapi.NewClient(config)
55 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080056 logger.Error(err)
khenaidoocfee5f42018-07-19 22:47:38 -040057 return nil, err
58 }
59
60 doneCh := make(chan int, 1)
61 wChannelsContext := make(map[string][]*channelContextMap)
62 reservations := make(map[string]interface{})
63 return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
64}
65
khenaidoob3244212019-08-27 14:32:27 -040066// IsConnectionUp returns whether the connection to the Consul KV store is up
npujar467fe752020-01-16 20:17:45 +053067func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080068 logger.Error("Unimplemented function")
khenaidoob3244212019-08-27 14:32:27 -040069 return false
70}
71
khenaidoocfee5f42018-07-19 22:47:38 -040072// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
73// wait for a response
npujar467fe752020-01-16 20:17:45 +053074func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
khenaidoocfee5f42018-07-19 22:47:38 -040075
npujar467fe752020-01-16 20:17:45 +053076 deadline, _ := ctx.Deadline()
khenaidoocfee5f42018-07-19 22:47:38 -040077 kv := c.consul.KV()
78 var queryOptions consulapi.QueryOptions
npujar467fe752020-01-16 20:17:45 +053079 queryOptions.WaitTime = GetDuration(deadline.Second())
khenaidoocfee5f42018-07-19 22:47:38 -040080 // For now we ignore meta data
81 kvps, _, err := kv.List(key, &queryOptions)
82 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080083 logger.Error(err)
khenaidoocfee5f42018-07-19 22:47:38 -040084 return nil, err
85 }
86 m := make(map[string]*KVPair)
87 for _, kvp := range kvps {
Stephane Barbarieef6650d2019-07-18 12:15:09 -040088 m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
khenaidoocfee5f42018-07-19 22:47:38 -040089 }
90 return m, nil
91}
92
93// Get returns a key-value pair for a given key. Timeout defines how long the function will
94// wait for a response
npujar467fe752020-01-16 20:17:45 +053095func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
khenaidoocfee5f42018-07-19 22:47:38 -040096
npujar467fe752020-01-16 20:17:45 +053097 deadline, _ := ctx.Deadline()
khenaidoocfee5f42018-07-19 22:47:38 -040098 kv := c.consul.KV()
99 var queryOptions consulapi.QueryOptions
npujar467fe752020-01-16 20:17:45 +0530100 queryOptions.WaitTime = GetDuration(deadline.Second())
khenaidoocfee5f42018-07-19 22:47:38 -0400101 // For now we ignore meta data
102 kvp, _, err := kv.Get(key, &queryOptions)
103 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800104 logger.Error(err)
khenaidoocfee5f42018-07-19 22:47:38 -0400105 return nil, err
106 }
107 if kvp != nil {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400108 return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
khenaidoocfee5f42018-07-19 22:47:38 -0400109 }
110
111 return nil, nil
112}
113
114// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
115// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
116// wait for a response
npujar467fe752020-01-16 20:17:45 +0530117func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400118
119 // Validate that we can create a byte array from the value as consul API expects a byte array
120 var val []byte
121 var er error
122 if val, er = ToByte(value); er != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800123 logger.Error(er)
khenaidoocfee5f42018-07-19 22:47:38 -0400124 return er
125 }
126
127 // Create a key value pair
128 kvp := consulapi.KVPair{Key: key, Value: val}
129 kv := c.consul.KV()
130 var writeOptions consulapi.WriteOptions
131 c.writeLock.Lock()
132 defer c.writeLock.Unlock()
133 _, err := kv.Put(&kvp, &writeOptions)
134 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800135 logger.Error(err)
khenaidoocfee5f42018-07-19 22:47:38 -0400136 return err
137 }
138 return nil
139}
140
141// Delete removes a key from the KV store. Timeout defines how long the function will
142// wait for a response
npujar467fe752020-01-16 20:17:45 +0530143func (c *ConsulClient) Delete(ctx context.Context, key string) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400144 kv := c.consul.KV()
145 var writeOptions consulapi.WriteOptions
146 c.writeLock.Lock()
147 defer c.writeLock.Unlock()
148 _, err := kv.Delete(key, &writeOptions)
149 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800150 logger.Error(err)
khenaidoocfee5f42018-07-19 22:47:38 -0400151 return err
152 }
153 return nil
154}
155
156func (c *ConsulClient) deleteSession() {
157 if c.sessionID != "" {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800158 logger.Debug("cleaning-up-session")
khenaidoocfee5f42018-07-19 22:47:38 -0400159 session := c.consul.Session()
160 _, err := session.Destroy(c.sessionID, nil)
161 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800162 logger.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400163 }
164 }
165 c.sessionID = ""
166 c.session = nil
167}
168
169func (c *ConsulClient) createSession(ttl int64, retries int) (*consulapi.Session, string, error) {
170 session := c.consul.Session()
171 entry := &consulapi.SessionEntry{
172 Behavior: consulapi.SessionBehaviorDelete,
173 TTL: "10s", // strconv.FormatInt(ttl, 10) + "s", // disable ttl
174 }
175
176 for {
177 id, meta, err := session.Create(entry, nil)
178 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800179 logger.Errorw("create-session-error", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400180 if retries == 0 {
181 return nil, "", err
182 }
183 } else if meta.RequestTime == 0 {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800184 logger.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
khenaidoocfee5f42018-07-19 22:47:38 -0400185 if retries == 0 {
186 return nil, "", errors.New("bad-meta-data")
187 }
188 } else if id == "" {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800189 logger.Error("create-session-nil-id")
khenaidoocfee5f42018-07-19 22:47:38 -0400190 if retries == 0 {
191 return nil, "", errors.New("ID-nil")
192 }
193 } else {
194 return session, id, nil
195 }
196 // If retry param is -1 we will retry indefinitely
197 if retries > 0 {
198 retries--
199 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800200 logger.Debug("retrying-session-create-after-a-second-delay")
khenaidoocfee5f42018-07-19 22:47:38 -0400201 time.Sleep(time.Duration(1) * time.Second)
202 }
203}
204
khenaidoocfee5f42018-07-19 22:47:38 -0400205// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
206// string types
207func isEqual(val1 interface{}, val2 interface{}) bool {
208 b1, err := ToByte(val1)
209 b2, er := ToByte(val2)
210 if err == nil && er == nil {
211 return bytes.Equal(b1, b2)
212 }
213 return val1 == val2
214}
215
216// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
217// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
218// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
219// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
220// then the value assigned to that key will be returned.
npujar467fe752020-01-16 20:17:45 +0530221func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
khenaidoocfee5f42018-07-19 22:47:38 -0400222
223 // Validate that we can create a byte array from the value as consul API expects a byte array
224 var val []byte
225 var er error
226 if val, er = ToByte(value); er != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800227 logger.Error(er)
khenaidoocfee5f42018-07-19 22:47:38 -0400228 return nil, er
229 }
230
231 // Cleanup any existing session and recreate new ones. A key is reserved against a session
232 if c.sessionID != "" {
233 c.deleteSession()
234 }
235
236 // Clear session if reservation is not successful
237 reservationSuccessful := false
238 defer func() {
239 if !reservationSuccessful {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800240 logger.Debug("deleting-session")
khenaidoocfee5f42018-07-19 22:47:38 -0400241 c.deleteSession()
242 }
243 }()
244
245 session, sessionID, err := c.createSession(ttl, -1)
246 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800247 logger.Errorw("no-session-created", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400248 return "", errors.New("no-session-created")
249 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800250 logger.Debugw("session-created", log.Fields{"session-id": sessionID})
khenaidoocfee5f42018-07-19 22:47:38 -0400251 c.sessionID = sessionID
252 c.session = session
253
254 // Try to grap the Key using the session
255 kv := c.consul.KV()
256 kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
257 result, _, err := kv.Acquire(&kvp, nil)
258 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800259 logger.Errorw("error-acquiring-keys", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400260 return nil, err
261 }
262
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800263 logger.Debugw("key-acquired", log.Fields{"key": key, "status": result})
khenaidoocfee5f42018-07-19 22:47:38 -0400264
265 // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
npujar467fe752020-01-16 20:17:45 +0530266 m, err := c.Get(ctx, key)
khenaidoocfee5f42018-07-19 22:47:38 -0400267 if err != nil {
268 return nil, err
269 }
270 if m != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800271 logger.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
khenaidoocfee5f42018-07-19 22:47:38 -0400272 if m.Key == key && isEqual(m.Value, value) {
273 // My reservation is successful - register it. For now, support is only for 1 reservation per key
274 // per session.
275 reservationSuccessful = true
276 c.writeLock.Lock()
277 c.keyReservations[key] = m.Value
278 c.writeLock.Unlock()
279 return m.Value, nil
280 }
281 // My reservation has failed. Return the owner of that key
282 return m.Value, nil
283 }
284 return nil, nil
285}
286
287// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
npujar467fe752020-01-16 20:17:45 +0530288func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400289 kv := c.consul.KV()
290 var kvp consulapi.KVPair
291 var result bool
292 var err error
293
294 c.writeLock.Lock()
295 defer c.writeLock.Unlock()
296
297 for key, value := range c.keyReservations {
298 kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
299 result, _, err = kv.Release(&kvp, nil)
300 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800301 logger.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400302 return err
303 }
304 if !result {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800305 logger.Errorw("cannot-release-reservation", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400306 }
307 delete(c.keyReservations, key)
308 }
309 return nil
310}
311
312// ReleaseReservation releases reservation for a specific key.
npujar467fe752020-01-16 20:17:45 +0530313func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400314 var ok bool
315 var reservedValue interface{}
316 c.writeLock.Lock()
317 defer c.writeLock.Unlock()
318 if reservedValue, ok = c.keyReservations[key]; !ok {
319 return errors.New("key-not-reserved:" + key)
320 }
321 // Release the reservation
322 kv := c.consul.KV()
323 kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
324
325 result, _, er := kv.Release(&kvp, nil)
326 if er != nil {
327 return er
328 }
329 // Remove that key entry on success
330 if result {
331 delete(c.keyReservations, key)
332 return nil
333 }
334 return errors.New("key-cannot-be-unreserved")
335}
336
337// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
338// period specified when reserving the key
npujar467fe752020-01-16 20:17:45 +0530339func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400340 // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
341
342 c.writeLock.Lock()
343 defer c.writeLock.Unlock()
344
345 // Verify the key was reserved
346 if _, ok := c.keyReservations[key]; !ok {
347 return errors.New("key-not-reserved")
348 }
349
350 if c.session == nil {
351 return errors.New("no-session-exist")
352 }
353
354 var writeOptions consulapi.WriteOptions
355 if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
356 return err
357 }
358 return nil
359}
360
361// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
362// listen to receive Events.
npujar467fe752020-01-16 20:17:45 +0530363func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
khenaidoocfee5f42018-07-19 22:47:38 -0400364
365 // Create a new channel
366 ch := make(chan *Event, maxClientChannelBufferSize)
367
368 // Create a context to track this request
369 watchContext, cFunc := context.WithCancel(context.Background())
370
371 // Save the channel and context reference for later
372 c.writeLock.Lock()
373 defer c.writeLock.Unlock()
374 ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
375 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
376
377 // Launch a go routine to listen for updates
378 go c.listenForKeyChange(watchContext, key, ch)
379
380 return ch
381}
382
383// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
384// may be multiple listeners on the same key. The previously created channel serves as a key
385func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
386 // First close the context
387 var ok bool
388 var watchedChannelsContexts []*channelContextMap
389 c.writeLock.Lock()
390 defer c.writeLock.Unlock()
391 if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800392 logger.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400393 return
394 }
395 // Look for the channels
396 var pos = -1
397 for i, chCtxMap := range watchedChannelsContexts {
398 if chCtxMap.channel == ch {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800399 logger.Debug("channel-found")
khenaidoocfee5f42018-07-19 22:47:38 -0400400 chCtxMap.cancel()
401 //close the channel
402 close(ch)
403 pos = i
404 break
405 }
406 }
407 // Remove that entry if present
408 if pos >= 0 {
409 c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
410 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800411 logger.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400412}
413
414func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
415 if (kv1 == nil) && (kv2 == nil) {
416 return true
417 } else if (kv1 == nil) || (kv2 == nil) {
418 return false
419 }
420 // Both the KV should be non-null here
421 if kv1.Key != kv2.Key ||
422 !bytes.Equal(kv1.Value, kv2.Value) ||
423 kv1.Session != kv2.Session ||
424 kv1.LockIndex != kv2.LockIndex ||
425 kv1.ModifyIndex != kv2.ModifyIndex {
426 return false
427 }
428 return true
429}
430
431func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800432 logger.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
khenaidoocfee5f42018-07-19 22:47:38 -0400433
434 defer c.CloseWatch(key, ch)
435 duration := GetDuration(defaultKVGetTimeout)
436 kv := c.consul.KV()
437 var queryOptions consulapi.QueryOptions
438 queryOptions.WaitTime = duration
439
440 // Get the existing value, if any
441 previousKVPair, meta, err := kv.Get(key, &queryOptions)
442 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800443 logger.Debug(err)
khenaidoocfee5f42018-07-19 22:47:38 -0400444 }
445 lastIndex := meta.LastIndex
446
447 // Wait for change. Push any change onto the channel and keep waiting for new update
448 //var waitOptions consulapi.QueryOptions
449 var pair *consulapi.KVPair
450 //watchContext, _ := context.WithCancel(context.Background())
451 waitOptions := queryOptions.WithContext(watchContext)
452 for {
453 //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
454 waitOptions.WaitIndex = lastIndex
455 pair, meta, err = kv.Get(key, waitOptions)
456 select {
457 case <-watchContext.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800458 logger.Debug("done-event-received-exiting")
khenaidoocfee5f42018-07-19 22:47:38 -0400459 return
460 default:
461 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800462 logger.Warnw("error-from-watch", log.Fields{"error": err})
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400463 ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
khenaidoocfee5f42018-07-19 22:47:38 -0400464 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800465 logger.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400466 }
467 }
468 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800469 logger.Debug(err)
khenaidoocfee5f42018-07-19 22:47:38 -0400470 // On error, block for 10 milliseconds to prevent endless loop
471 time.Sleep(10 * time.Millisecond)
472 } else if meta.LastIndex <= lastIndex {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800473 logger.Info("no-index-change-or-negative")
khenaidoocfee5f42018-07-19 22:47:38 -0400474 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800475 logger.Debugw("update-received", log.Fields{"pair": pair})
khenaidoocfee5f42018-07-19 22:47:38 -0400476 if pair == nil {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400477 ch <- NewEvent(DELETE, key, []byte(""), -1)
khenaidoocfee5f42018-07-19 22:47:38 -0400478 } else if !c.isKVEqual(pair, previousKVPair) {
479 // Push the change onto the channel if the data has changed
480 // For now just assume it's a PUT change
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800481 logger.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400482 ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
khenaidoocfee5f42018-07-19 22:47:38 -0400483 }
484 previousKVPair = pair
485 lastIndex = meta.LastIndex
486 }
487 }
488}
489
490// Close closes the KV store client
491func (c *ConsulClient) Close() {
492 var writeOptions consulapi.WriteOptions
493 // Inform any goroutine it's time to say goodbye.
494 c.writeLock.Lock()
495 defer c.writeLock.Unlock()
496 if c.doneCh != nil {
497 close(*c.doneCh)
498 }
499
500 // Clear the sessionID
501 if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800502 logger.Errorw("error-closing-client", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400503 }
504}
khenaidoobdcb8e02019-03-06 16:28:56 -0500505
npujar467fe752020-01-16 20:17:45 +0530506func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
khenaidoobdcb8e02019-03-06 16:28:56 -0500507 return nil
508}
509
khenaidoo2c6a0992019-04-29 13:46:56 -0400510func (c *ConsulClient) ReleaseLock(lockName string) error {
khenaidoobdcb8e02019-03-06 16:28:56 -0500511 return nil
khenaidoo2c6a0992019-04-29 13:46:56 -0400512}