VOL-1910 added techproile mock struct to cover openolt_flowmgr.go files
Change-Id: I0bd44890f02909da870771cc332f2a5de264020b
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
index f19f365..3af1ef2 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
@@ -127,7 +127,15 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- _, err := c.ectdAPI.Put(ctx, key, val)
+
+ var err error
+ // Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
+ // that KV key permanent instead of automatically removing it after a lease expiration
+ if leaseID, ok := c.keyReservations[key]; ok {
+ _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+ } else {
+ _, err = c.ectdAPI.Put(ctx, key, val)
+ }
cancel()
if err != nil {
switch err {
@@ -158,8 +166,8 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- // delete the keys
- if _, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix()); err != nil {
+ // delete the key
+ if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
log.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
return err
}
@@ -308,7 +316,8 @@
// listen to receive Events.
func (c *EtcdClient) Watch(key string) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
- channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
+ ctx, cancel := context.WithCancel(context.Background())
+ channel := w.Watch(ctx, key)
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
@@ -316,8 +325,6 @@
// Keep track of the created channels so they can be closed when required
channelMap := make(map[chan *Event]v3Client.Watcher)
channelMap[ch] = w
- //c.writeLock.Lock()
- //defer c.writeLock.Unlock()
channelMaps := c.addChannelMap(key, channelMap)
@@ -325,7 +332,7 @@
// json format.
log.Debugw("watched-channels", log.Fields{"len": len(channelMaps)})
// Launch a go routine to listen for updates
- go c.listenForKeyChange(channel, ch)
+ go c.listenForKeyChange(channel, ch, cancel)
return ch
@@ -392,7 +399,6 @@
if err := t.Close(); err != nil {
log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
}
- close(ch)
pos = i
break
}
@@ -406,11 +412,12 @@
log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
}
-func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
+func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event, cancel context.CancelFunc) {
log.Debug("start-listening-on-channel ...")
+ defer cancel()
+ defer close(ch)
for resp := range channel {
for _, ev := range resp.Events {
- //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
}
}