VOL-1509 : Partial fix for merging issue
- Changed channel map in etcd to a sync.Map
- Changed graph boundaryPorts to sync.Map
- Added logic to check if proxy access is currently reserved
- Changed watch logic to exit when proxy access in progress
- Fixed UpdateAllChildren method
- Commented out the Drop operation again in node.go
Change-Id: I8a61798e907be0ff6b0785dcc70721708308611d
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 2caa990..639ea75 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -28,15 +28,14 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- leaderRev v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels map[string][]map[chan *Event]v3Client.Watcher
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
+ ectdAPI *v3Client.Client
+ leaderRev v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ writeLock sync.Mutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
-
+ lockToMutexLock sync.Mutex
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -51,12 +50,13 @@
log.Error(err)
return nil, err
}
- wc := make(map[string][]map[chan *Event]v3Client.Watcher)
+
reservations := make(map[string]*v3Client.LeaseID)
lockMutexMap := make(map[string]*v3Concurrency.Mutex)
lockSessionMap := make(map[string]*v3Concurrency.Session)
- return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
+ return &EtcdClient{ectdAPI: c, keyReservations: reservations, lockToMutexMap: lockMutexMap,
+ lockToSessionMap: lockSessionMap}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
@@ -69,7 +69,7 @@
// DO NOT lock by default; otherwise lock per instructed value
if len(lock) > 0 && lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -96,9 +96,9 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
// Lock by default; otherwise lock per instructed value
- if len(lock) == 0 || lock[0] {
+ if len(lock) > 0 && lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -136,7 +136,7 @@
// Lock by default; otherwise lock per instructed value
if len(lock) == 0 || lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -173,7 +173,7 @@
// Lock by default; otherwise lock per instructed value
if len(lock) == 0 || lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -364,9 +364,10 @@
channelMap[ch] = w
//c.writeLock.Lock()
//defer c.writeLock.Unlock()
- c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
- log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
+ channelMaps := c.addChannelMap(key, channelMap)
+
+ log.Debugw("watched-channels", log.Fields{"channels": channelMaps})
// Launch a go routine to listen for updates
go c.listenForKeyChange(channel, ch)
@@ -374,6 +375,41 @@
}
+func (c *EtcdClient) addChannelMap(key string, channelMap map[chan *Event]v3Client.Watcher) []map[chan *Event]v3Client.Watcher {
+ var channels interface{}
+ var exists bool
+
+ if channels, exists = c.watchedChannels.Load(key); exists {
+ channels = append(channels.([]map[chan *Event]v3Client.Watcher), channelMap)
+ } else {
+ channels = []map[chan *Event]v3Client.Watcher{channelMap}
+ }
+ c.watchedChannels.Store(key, channels)
+
+ return channels.([]map[chan *Event]v3Client.Watcher)
+}
+
+func (c *EtcdClient) removeChannelMap(key string, pos int) []map[chan *Event]v3Client.Watcher {
+ var channels interface{}
+ var exists bool
+
+ if channels, exists = c.watchedChannels.Load(key); exists {
+ channels = append(channels.([]map[chan *Event]v3Client.Watcher)[:pos], channels.([]map[chan *Event]v3Client.Watcher)[pos+1:]...)
+ c.watchedChannels.Store(key, channels)
+ }
+
+ return channels.([]map[chan *Event]v3Client.Watcher)
+}
+
+func (c *EtcdClient) getChannelMaps(key string) ([]map[chan *Event]v3Client.Watcher, bool) {
+ var channels interface{}
+ var exists bool
+
+ channels, exists = c.watchedChannels.Load(key)
+
+ return channels.([]map[chan *Event]v3Client.Watcher), exists
+}
+
// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
// may be multiple listeners on the same key. The previously created channel serves as a key
func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
@@ -383,7 +419,7 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- if watchedChannels, ok = c.watchedChannels[key]; !ok {
+ if watchedChannels, ok = c.getChannelMaps(key); !ok {
log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
return
}
@@ -401,11 +437,13 @@
break
}
}
+
+ channelMaps, _ := c.getChannelMaps(key)
// Remove that entry if present
if pos >= 0 {
- c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
+ channelMaps = c.removeChannelMap(key, pos)
}
- log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
+ log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
}
func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
@@ -466,12 +504,11 @@
return lock, session
}
-
-func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
+func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+ mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
return err
}
@@ -480,7 +517,7 @@
return nil
}
-func (c *EtcdClient) ReleaseLock(lockName string) error {
+func (c *EtcdClient) ReleaseLock(lockName string) error {
lock, session := c.getLock(lockName)
var err error
if lock != nil {
@@ -496,4 +533,4 @@
c.deleteLockName(lockName)
return err
-}
\ No newline at end of file
+}