VOL-1509 : Partial fix for merging issue
- Changed channel map in etcd to a sync.Map
- Changed graph boundaryPorts to sync.Map
- Added logic to check if proxy access is currently reserved
- Changed watch logic to exit when proxy access in progress
- Fixed UpdateAllChildren method
- Commented out the Drop operation again in node.go
Change-Id: I8a61798e907be0ff6b0785dcc70721708308611d
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 2caa990..639ea75 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -28,15 +28,14 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- leaderRev v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels map[string][]map[chan *Event]v3Client.Watcher
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
+ ectdAPI *v3Client.Client
+ leaderRev v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ writeLock sync.Mutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
-
+ lockToMutexLock sync.Mutex
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -51,12 +50,13 @@
log.Error(err)
return nil, err
}
- wc := make(map[string][]map[chan *Event]v3Client.Watcher)
+
reservations := make(map[string]*v3Client.LeaseID)
lockMutexMap := make(map[string]*v3Concurrency.Mutex)
lockSessionMap := make(map[string]*v3Concurrency.Session)
- return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
+ return &EtcdClient{ectdAPI: c, keyReservations: reservations, lockToMutexMap: lockMutexMap,
+ lockToSessionMap: lockSessionMap}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
@@ -69,7 +69,7 @@
// DO NOT lock by default; otherwise lock per instructed value
if len(lock) > 0 && lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -96,9 +96,9 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
// Lock by default; otherwise lock per instructed value
- if len(lock) == 0 || lock[0] {
+ if len(lock) > 0 && lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -136,7 +136,7 @@
// Lock by default; otherwise lock per instructed value
if len(lock) == 0 || lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -173,7 +173,7 @@
// Lock by default; otherwise lock per instructed value
if len(lock) == 0 || lock[0] {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu := v3Concurrency.NewMutex(session, "/lock"+key)
mu.Lock(context.Background())
defer mu.Unlock(context.Background())
defer session.Close()
@@ -364,9 +364,10 @@
channelMap[ch] = w
//c.writeLock.Lock()
//defer c.writeLock.Unlock()
- c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
- log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
+ channelMaps := c.addChannelMap(key, channelMap)
+
+ log.Debugw("watched-channels", log.Fields{"channels": channelMaps})
// Launch a go routine to listen for updates
go c.listenForKeyChange(channel, ch)
@@ -374,6 +375,41 @@
}
+func (c *EtcdClient) addChannelMap(key string, channelMap map[chan *Event]v3Client.Watcher) []map[chan *Event]v3Client.Watcher {
+ var channels interface{}
+ var exists bool
+
+ if channels, exists = c.watchedChannels.Load(key); exists {
+ channels = append(channels.([]map[chan *Event]v3Client.Watcher), channelMap)
+ } else {
+ channels = []map[chan *Event]v3Client.Watcher{channelMap}
+ }
+ c.watchedChannels.Store(key, channels)
+
+ return channels.([]map[chan *Event]v3Client.Watcher)
+}
+
+func (c *EtcdClient) removeChannelMap(key string, pos int) []map[chan *Event]v3Client.Watcher {
+ var channels interface{}
+ var exists bool
+
+ if channels, exists = c.watchedChannels.Load(key); exists {
+ channels = append(channels.([]map[chan *Event]v3Client.Watcher)[:pos], channels.([]map[chan *Event]v3Client.Watcher)[pos+1:]...)
+ c.watchedChannels.Store(key, channels)
+ }
+
+ return channels.([]map[chan *Event]v3Client.Watcher)
+}
+
+func (c *EtcdClient) getChannelMaps(key string) ([]map[chan *Event]v3Client.Watcher, bool) {
+ var channels interface{}
+ var exists bool
+
+ channels, exists = c.watchedChannels.Load(key)
+
+ return channels.([]map[chan *Event]v3Client.Watcher), exists
+}
+
// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
// may be multiple listeners on the same key. The previously created channel serves as a key
func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
@@ -383,7 +419,7 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- if watchedChannels, ok = c.watchedChannels[key]; !ok {
+ if watchedChannels, ok = c.getChannelMaps(key); !ok {
log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
return
}
@@ -401,11 +437,13 @@
break
}
}
+
+ channelMaps, _ := c.getChannelMaps(key)
// Remove that entry if present
if pos >= 0 {
- c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
+ channelMaps = c.removeChannelMap(key, pos)
}
- log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
+ log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
}
func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
@@ -466,12 +504,11 @@
return lock, session
}
-
-func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
+func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+ mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
return err
}
@@ -480,7 +517,7 @@
return nil
}
-func (c *EtcdClient) ReleaseLock(lockName string) error {
+func (c *EtcdClient) ReleaseLock(lockName string) error {
lock, session := c.getLock(lockName)
var err error
if lock != nil {
@@ -496,4 +533,4 @@
c.deleteLockName(lockName)
return err
-}
\ No newline at end of file
+}
diff --git a/db/model/node.go b/db/model/node.go
index 707fe75..aeac883 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -560,7 +560,7 @@
// FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
// FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- branch.GetLatest().Drop(branch.Txid, false)
+ //branch.GetLatest().Drop(branch.Txid, false)
n.makeLatest(branch, rev, changes)
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index cb8c9ca..1b9325d 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -313,14 +313,13 @@
npr.mutex.Lock()
defer npr.mutex.Unlock()
- newRev := &NonPersistedRevision{}
+ newRev := npr
newRev.Config = npr.Config
newRev.Hash = npr.Hash
newRev.Branch = branch
newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
- newRev.Children[entryName] = make([]Revision, len(childrenEntry))
- copy(newRev.Children[entryName], childrenEntry)
+ for entryName, childrenEntry := range children {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
newRev.Finalize(false)
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 0ecb5ef..f335216 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -40,6 +40,7 @@
mutex sync.RWMutex `json:"-"`
isStored bool
isWatched bool
+ watchName string
}
// NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -96,8 +97,9 @@
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key})
+ log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ pr.watchName = key
pr.events = pr.kvStore.CreateWatch(key)
pr.isWatched = true
@@ -107,7 +109,61 @@
}
}
+//func (pr *PersistedRevision) mergeWithMemory(pacBlock bool) Revision {
+// if pair, err := pr.kvStore.Get(pr.GetHash()); err != nil {
+// log.Debugw("merge-with-memory--error-occurred", log.Fields{"hash": pr.GetHash(), "error": err})
+// } else if pair == nil {
+// log.Debugw("merge-with-memory--no-data-to-merge", log.Fields{"hash": pr.GetHash()})
+// } else {
+// data := reflect.New(reflect.TypeOf(pr.GetData()).Elem()).Interface()
+//
+// if err := proto.Unmarshal(pair.Value.([]byte), data.(proto.Message)); err != nil {
+// log.Errorw("merge-with-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+// } else {
+// if pr.GetNode().GetProxy() != nil && pacBlock {
+// var pac *proxyAccessControl
+// var pathLock string
+//
+// pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+// log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+//
+// pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+// pac.SetProxy(pr.GetNode().GetProxy())
+// pac.lock()
+//
+// defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+// defer pac.unlock()
+// defer PAC().ReleasePath(pathLock)
+// }
+// //Prepare the transaction
+// branch := pr.GetBranch()
+// //latest := branch.GetLatest()
+// txidBin, _ := uuid.New().MarshalBinary()
+// txid := hex.EncodeToString(txidBin)[:12]
+//
+// makeBranch := func(node *node) *Branch {
+// return node.MakeBranch(txid)
+// }
+//
+// //Apply the update in a transaction branch
+// updatedRev := pr.GetNode().Update("", data, false, txid, makeBranch)
+// updatedRev.SetHash(pr.GetHash())
+//
+// //Merge the transaction branch in memory
+// if mergedRev, _ := pr.GetNode().MergeBranch(txid, false); mergedRev != nil {
+// branch.SetLatest(mergedRev)
+// return mergedRev
+// }
+// }
+// }
+//
+// return nil
+//}
+
func (pr *PersistedRevision) updateInMemory(data interface{}) {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
var pac *proxyAccessControl
var pathLock string
@@ -116,8 +172,28 @@
// and prevent simultaneous updates to the object in memory
//
if pr.GetNode().GetProxy() != nil {
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+ // If the proxy already has a request in progress, then there is no need to process the watch
+ log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ if PAC().IsReserved(pathLock) {
+ switch pr.GetNode().GetRoot().GetProxy().Operation {
+ case PROXY_ADD:
+ fallthrough
+ case PROXY_REMOVE:
+ fallthrough
+ case PROXY_UPDATE:
+ log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ return
+ default:
+ log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
+ }
+ } else {
+ log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ }
+
+ log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+
pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
pac.SetProxy(pr.GetNode().GetProxy())
pac.lock()
@@ -153,9 +229,6 @@
if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
branch.SetLatest(mergedRev)
}
-
- // The transaction branch must be deleted to free-up memory
- //latest.GetNode().GetRoot().DeleteTxBranch(txid)
}
func (pr *PersistedRevision) startWatching() {
@@ -166,40 +239,40 @@
select {
case event, ok := <-pr.events:
if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
break StopWatchLoop
}
- log.Debugw("received-event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.watchName})
switch event.EventType {
case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
pr.Revision.Drop("", true)
break StopWatchLoop
case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
- if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
- log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ if dataPair, err := pr.kvStore.Get(pr.watchName); err != nil || dataPair == nil {
+ log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
pr.updateInMemory(data.Interface())
}
}
default:
- log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+ log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "type": event.EventType})
}
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
}
func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
diff --git a/db/model/proxy.go b/db/model/proxy.go
index be48ded..86d426a 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -55,13 +55,14 @@
// Proxy holds the information for a specific location with the data model
type Proxy struct {
sync.RWMutex
- Root *root
- Node *node
- ParentNode *node
- Path string
- FullPath string
- Exclusive bool
- Callbacks map[CallbackType]map[string]*CallbackTuple
+ Root *root
+ Node *node
+ ParentNode *node
+ Path string
+ FullPath string
+ Exclusive bool
+ Callbacks map[CallbackType]map[string]*CallbackTuple
+ Operation ProxyOperation
}
// NewProxy instantiates a new proxy to a specific location
@@ -71,13 +72,13 @@
fullPath = ""
}
p := &Proxy{
- Root: root,
- Node: node,
- ParentNode: parentNode,
- Exclusive: exclusive,
- Path: path,
- FullPath: fullPath,
- Callbacks: callbacks,
+ Root: root,
+ Node: node,
+ ParentNode: parentNode,
+ Exclusive: exclusive,
+ Path: path,
+ FullPath: fullPath,
+ Callbacks: callbacks,
}
return p
}
@@ -136,6 +137,18 @@
delete(p.Callbacks[callbackType], funcHash)
}
+// CallbackType is an enumerated value to express when a callback should be executed
+type ProxyOperation uint8
+
+// Enumerated list of callback types
+const (
+ PROXY_GET ProxyOperation = iota
+ PROXY_LIST
+ PROXY_ADD
+ PROXY_UPDATE
+ PROXY_REMOVE
+)
+
// parseForControlledPath verifies if a proxy path matches a pattern
// for locations that need to be access controlled.
func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
@@ -183,7 +196,6 @@
return rv
}
-
// Get will retrieve information from the data model at the specified path location
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
@@ -228,8 +240,12 @@
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_UPDATE
pac.SetProxy(p)
+ log.Debugw("proxy-operation--update", log.Fields{"operation":p.Operation})
+
return pac.Update(fullPath, data, strict, txid, controlled)
}
@@ -257,8 +273,12 @@
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_ADD
pac.SetProxy(p)
+ log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+
return pac.Add(fullPath, data, txid, controlled)
}
@@ -284,8 +304,12 @@
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_ADD
pac.SetProxy(p)
+ log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+
return pac.Add(fullPath, data, txid, controlled)
}
@@ -311,8 +335,12 @@
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_REMOVE
pac.SetProxy(p)
+ log.Debugw("proxy-operation--remove", log.Fields{"operation":p.Operation})
+
return pac.Remove(fullPath, txid, controlled)
}
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 295f153..66d3222 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -24,7 +24,7 @@
type singletonProxyAccessControl struct {
sync.RWMutex
- cache sync.Map
+ cache sync.Map
reservedCount int
}
@@ -39,6 +39,17 @@
return instanceProxyAccessControl
}
+// IsReserved will verify if access control is active for a specific path within the model
+func (singleton *singletonProxyAccessControl) IsReserved(pathLock string) bool {
+ singleton.Lock()
+ defer singleton.Unlock()
+
+ _, exists := singleton.cache.Load(pathLock)
+ log.Debugw("is-reserved", log.Fields{"pathLock": pathLock, "exists": exists})
+
+ return exists
+}
+
// ReservePath will apply access control for a specific path within the model
func (singleton *singletonProxyAccessControl) ReservePath(path string, proxy *Proxy, pathLock string) *proxyAccessControl {
singleton.Lock()
@@ -47,7 +58,7 @@
if pac, exists := singleton.cache.Load(pathLock); !exists {
log.Debugf("Creating new PAC entry for path:%s pathLock:%s", path, pathLock)
newPac := NewProxyAccessControl(proxy, pathLock)
- singleton.cache.Store(pathLock,newPac)
+ singleton.cache.Store(pathLock, newPac)
return newPac
} else {
log.Debugf("Re-using existing PAC entry for path:%s pathLock:%s", path, pathLock)
@@ -162,9 +173,9 @@
func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -177,9 +188,9 @@
func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -191,10 +202,11 @@
func (pac *proxyAccessControl) Update(path string, data interface{}, strict bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
result := pac.getProxy().GetRoot().Update(path, data, strict, txid, nil)
if result != nil {
@@ -207,10 +219,11 @@
func (pac *proxyAccessControl) Add(path string, data interface{}, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--add", log.Fields{"path": path, "fullPath": pac.Path})
defer pac.unlock()
- defer log.Debugw("unlocked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--add", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
result := pac.getProxy().GetRoot().Add(path, data, txid, nil)
if result != nil {
@@ -223,9 +236,10 @@
func (pac *proxyAccessControl) Remove(path string, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
return pac.getProxy().GetRoot().Remove(path, txid, nil)
}
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index f977be0..f87d8ce 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -124,6 +124,9 @@
var help string
+ help = fmt.Sprintf("RW instance id")
+ flag.StringVar(&(cf.InstanceID), "instance-id", default_InstanceID, help)
+
help = fmt.Sprintf("RW core endpoint address")
flag.StringVar(&(cf.RWCoreEndpoint), "vcore-endpoint", default_RWCoreEndpoint, help)
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index 9acda6d..d7192da 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -24,6 +24,7 @@
"github.com/opencord/voltha-go/protos/voltha"
"strconv"
"strings"
+ "sync"
)
func init() {
@@ -65,7 +66,7 @@
logicalPorts []*voltha.LogicalPort
RootPorts map[uint32]uint32
Routes map[OFPortLink][]RouteHop
- boundaryPorts map[string]uint32
+ boundaryPorts sync.Map
}
func NewDeviceGraph(getDevice GetDeviceFunc) *DeviceGraph {
@@ -76,6 +77,9 @@
}
func (dg *DeviceGraph) ComputeRoutes(lps []*voltha.LogicalPort) {
+ if dg == nil {
+ return
+ }
dg.logicalPorts = lps
// Set the root ports
dg.RootPorts = make(map[uint32]uint32)
@@ -85,9 +89,14 @@
}
}
// set the boundary ports
- dg.boundaryPorts = make(map[string]uint32)
+ dg.boundaryPorts.Range(func(key interface{}, value interface{}) bool {
+ dg.boundaryPorts.Delete(key)
+ return true
+ })
+ //dg.boundaryPorts = sync.Map{}
+
for _, lp := range lps {
- dg.boundaryPorts[fmt.Sprintf("%s:%d", lp.DeviceId, lp.DevicePortNo)] = lp.OfpPort.PortNo
+ dg.boundaryPorts.Store(fmt.Sprintf("%s:%d", lp.DeviceId, lp.DevicePortNo), lp.OfpPort.PortNo)
}
dg.Routes = make(map[OFPortLink][]RouteHop)
@@ -103,7 +112,7 @@
}
func (dg *DeviceGraph) addDevice(device *voltha.Device, g goraph.Graph, devicesAdded *map[string]string, portsAdded *map[string]string,
- boundaryPorts map[string]uint32) goraph.Graph {
+ boundaryPorts sync.Map) goraph.Graph {
if device == nil {
return g
@@ -150,27 +159,34 @@
paths := make(map[OFPortLink][]RouteHop)
var err error
var hop RouteHop
- for source, sourcePort := range dg.boundaryPorts {
- for target, targetPort := range dg.boundaryPorts {
+
+ dg.boundaryPorts.Range(func(src, srcPort interface{}) bool {
+ source := src.(string)
+ sourcePort := srcPort.(uint32)
+
+ dg.boundaryPorts.Range(func(dst, dstPort interface{}) bool {
+ target := dst.(string)
+ targetPort := dstPort.(uint32)
+
if source == target {
- continue
+ return true
}
//Ignore NNI - NNI Routes
if dg.IsRootPort(sourcePort) && dg.IsRootPort(targetPort) {
- continue
+ return true
}
//Ignore UNI - UNI Routes
if !dg.IsRootPort(sourcePort) && !dg.IsRootPort(targetPort) {
- continue
+ return true
}
if pathIds, _, err = goraph.Dijkstra(dg.GGraph, goraph.StringID(source), goraph.StringID(target)); err != nil {
log.Errorw("no-path", log.Fields{"source": source, "target": target, "error": err})
- continue
+ return true
}
if len(pathIds)%3 != 0 {
- continue
+ return true
}
var deviceId string
var ingressPort uint32
@@ -191,8 +207,10 @@
copy(tmp, path)
path = nil
paths[OFPortLink{Ingress: sourcePort, Egress: targetPort}] = tmp
- }
- }
+ return true
+ })
+ return true
+ })
return paths
}