XVOL-1689 : ONU stays in DISCOVERED state
VOL-1586 : Possible race condition in openolt python adapter during onu discovery
1) gets Device in response of ChildDeviceDetected.
This avoids race and also removes the need for GetChildDevice.
2)Puts the Device Id into cache to use in future requests,
especially avoid the fail when calling GetChildDevice
in onuIndication because of race.
Change-Id: I60944a6ee0e2ffad80a31ef93f72b55b0b136284
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
index c4b5bf5..8371e09 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
@@ -231,7 +231,7 @@
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
log.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
@@ -278,7 +278,24 @@
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
}
@@ -479,6 +496,6 @@
Value: pkt,
}
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
index 3ac5c4f..7ce4414 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
@@ -183,7 +183,42 @@
}
func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Reboot_device API on the adapter
+ if err := rhp.adapter.Reboot_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
+
}
func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
@@ -233,6 +268,45 @@
}
func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+ log.Debug("Update_flows_bulk")
+ if len(args) < 4 {
+ log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ flows := &voltha.Flows{}
+ groups := &voltha.FlowGroups{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flows":
+ if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+ log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ return nil, err
+ }
+ case "groups":
+ if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+ log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/node.go b/vendor/github.com/opencord/voltha-go/db/model/node.go
index 15ea04e..207df09 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/node.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/node.go
@@ -61,9 +61,10 @@
}
type node struct {
- mutex sync.RWMutex
- Root *root
- Type interface{}
+ mutex sync.RWMutex
+ Root *root
+ Type interface{}
+
Branches map[string]*Branch
Tags map[string]Revision
Proxy *Proxy
@@ -325,7 +326,11 @@
if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
if entryAge < DATA_REFRESH_PERIOD {
- log.Debugw("using-cache-entry", log.Fields{"path": path, "hash": hash, "age": entryAge})
+ log.Debugw("using-cache-entry", log.Fields{
+ "path": path,
+ "hash": hash,
+ "age": entryAge,
+ })
return proto.Clone(entry.(Revision).GetData().(proto.Message))
} else {
log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
@@ -950,6 +955,13 @@
}
func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+ log.Debugw("node-create-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
+
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -960,48 +972,130 @@
rev := n.GetBranch(NONE).GetLatest()
partition := strings.SplitN(path, "/", 2)
name := partition[0]
+ var nodeType interface{}
+ // Node type is chosen depending on if we have reached the end of path or not
if len(partition) < 2 {
path = ""
+ nodeType = n.Type
} else {
path = partition[1]
+ nodeType = parentNode.Type
}
- field := ChildrenFields(n.Type)[name]
- if field.IsContainer {
- if path == "" {
- //log.Error("cannot proxy a container field")
- newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
- return newNode.makeProxy(path, fullPath, parentNode, exclusive)
- } else if field.Key != "" {
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
+ field := ChildrenFields(nodeType)[name]
+
+ if field != nil {
+ if field.IsContainer {
+ log.Debugw("container-field", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
+ if path == "" {
+ log.Debugw("folder-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
+ return newNode.makeProxy(path, fullPath, parentNode, exclusive)
+ } else if field.Key != "" {
+ log.Debugw("key-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+ var children []Revision
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ // Try to find a matching revision in memory
+ // If not found try the db
+ var childRev Revision
+ if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ log.Debugw("found-revision-matching-key-in-memory", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(fullPath, "", nil); revs != nil && len(revs) > 0 {
+ log.Debugw("found-revision-matching-key-in-db", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ childRev = revs[0]
+ } else {
+ log.Debugw("no-revision-matching-key", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ }
+ if childRev != nil {
+ childNode := childRev.GetNode()
+ return childNode.createProxy(path, fullPath, n, exclusive)
+ }
} else {
- path = partition[1]
- }
- keyValue := field.KeyFromStr(key)
- var children []Revision
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
- if _, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
- childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ log.Errorw("cannot-access-index-of-empty-container", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
}
} else {
- log.Error("cannot index into container with no keys")
+ log.Debugw("non-container-field", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
+ childRev := rev.GetChildren(name)[0]
+ childNode := childRev.GetNode()
+ return childNode.createProxy(path, fullPath, n, exclusive)
}
} else {
- childRev := rev.GetChildren(name)[0]
- childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ log.Debugw("field-object-is-nil", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
}
- log.Warnf("Cannot create proxy - latest rev:%s, all revs:%+v", rev.GetHash(), n.GetBranch(NONE).Revisions)
+ log.Warnw("cannot-create-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ "latest-rev": rev.GetHash(),
+ })
return nil
}
func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+ log.Debugw("node-make-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
+
r := &root{
node: n,
Callbacks: n.Root.GetCallbacks(),
@@ -1013,8 +1107,20 @@
}
if n.Proxy == nil {
+ log.Debugw("constructing-new-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
} else {
+ log.Debugw("node-has-existing-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Proxy.Node.Type).Type(),
+ "parent-node-type": reflect.ValueOf(n.Proxy.ParentNode.Type).Type(),
+ "path": n.Proxy.Path,
+ "fullPath": n.Proxy.FullPath,
+ })
if n.Proxy.Exclusive {
log.Error("node is already owned exclusively")
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
index 6075b3f..297a740 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
@@ -55,7 +55,6 @@
Branch *Branch
WeakRef string
Name string
- discarded bool
lastUpdate time.Time
}
@@ -66,14 +65,9 @@
r.Config = NewDataRevision(root, data)
r.Children = children
r.Hash = r.hashContent()
- r.discarded = false
return r
}
-func (npr *NonPersistedRevision) IsDiscarded() bool {
- return npr.discarded
-}
-
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -356,14 +350,33 @@
if nameExists {
// Check if the data has changed or not
if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+ log.Debugw("replacing-existing-child", log.Fields{
+ "old-hash": existingChildren[nameIndex].GetHash(),
+ "old-data": existingChildren[nameIndex].GetData(),
+ "new-hash": newChild.GetHash(),
+ "new-data": newChild.GetData(),
+ })
+
// replace entry
newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
updatedChildren = append(updatedChildren, newChild)
} else {
+ log.Debugw("keeping-existing-child", log.Fields{
+ "old-hash": existingChildren[nameIndex].GetHash(),
+ "old-data": existingChildren[nameIndex].GetData(),
+ "new-hash": newChild.GetHash(),
+ "new-data": newChild.GetData(),
+ })
+
// keep existing entry
updatedChildren = append(updatedChildren, existingChildren[nameIndex])
}
} else {
+ log.Debugw("adding-unknown-child", log.Fields{
+ "hash": newChild.GetHash(),
+ "data": newChild.GetData(),
+ })
+
// new entry ... just add it
updatedChildren = append(updatedChildren, newChild)
}
@@ -413,7 +426,6 @@
// Drop is used to indicate when a revision is no longer required
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "name": npr.GetName()})
- npr.discarded = true
}
// ChildDrop will remove a child entry matching the provided parameters from the current revision
@@ -458,6 +470,6 @@
// stub ... required by interface
}
-func (pr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
+func (npr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
// stub ... required by interface
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index 226fc3c..2ab91b7 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -127,34 +127,31 @@
StopWatchLoop:
for {
- if pr.IsDiscarded() {
- break StopWatchLoop
- }
+ latestRev := pr.GetBranch().GetLatest()
select {
case event, ok := <-pr.events:
if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
break StopWatchLoop
}
-
- log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
switch event.EventType {
case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
pr.Revision.Drop("", true)
break StopWatchLoop
case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
- data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+ data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+ log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
} else {
- log.Debugw("un-marshaled-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "data": data.Interface()})
+ log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
var pathLock string
var pac *proxyAccessControl
@@ -171,24 +168,26 @@
Lease: 0,
}
- if pr.GetNode().GetProxy() != nil {
+ if latestRev.GetNode().GetProxy() != nil {
//
// If a proxy exists for this revision, use it to lock access to the path
// and prevent simultaneous updates to the object in memory
//
- pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+ pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
//If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
if PAC().IsReserved(pathLock) {
log.Debugw("operation-in-progress", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
+ //continue
+
// Identify the operation type and determine if the watch event should be applied or not.
- switch pr.GetNode().GetProxy().Operation {
+ switch latestRev.GetNode().GetProxy().Operation {
case PROXY_REMOVE:
fallthrough
@@ -200,9 +199,9 @@
// Therefore, the data of the current event is most likely out-dated
// and should be ignored
log.Debugw("ignore-watch-event", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
continue
@@ -216,41 +215,45 @@
case PROXY_GET:
fallthrough
+ case PROXY_WATCH:
+ fallthrough
+
default:
log.Debugw("process-watch-event", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
}
}
// Reserve the path to prevent others to modify while we reload from persistence
- log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
- pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+ log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
+ pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
+ latestRev.GetNode().GetProxy(), pathLock)
pac.lock()
- pr.GetNode().GetProxy().Operation = PROXY_UPDATE
- pac.SetProxy(pr.GetNode().GetProxy())
+ latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
+ pac.SetProxy(latestRev.GetNode().GetProxy())
// Load changes and apply to memory
- pr.LoadFromPersistence(pr.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
- log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
pac.getProxy().Operation = PROXY_GET
pac.unlock()
PAC().ReleasePath(pathLock)
} else {
// This block should be reached only if coming from a non-proxied request
- log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
// Load changes and apply to memory
- pr.LoadFromPersistence(pr.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
}
}
default:
- log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
+ log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
}
}
}
@@ -267,19 +270,18 @@
newNPR := pr.Revision.UpdateData(data, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -294,18 +296,17 @@
newNPR := pr.Revision.UpdateChildren(name, children, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -319,18 +320,17 @@
newNPR := pr.Revision.UpdateAllChildren(children, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -388,6 +388,7 @@
if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
// A child matching the provided key exists in memory
// Verify if the data differs from what was retrieved from persistence
+ // Also check if we are treating a newer revision of the data or not
if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
log.Debugw("revision-data-is-different", log.Fields{
"key": childRev.GetHash(),
@@ -407,11 +408,11 @@
updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
- childRev.Drop(txid, false)
updatedChildRev.SetLastUpdate()
// Update cache
GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+ childRev.Drop(txid, false)
childRev.GetBranch().LatestLock.Unlock()
// END lock child
@@ -459,6 +460,7 @@
response = childRev
}
}
+
} else {
// There is no available child with that key value.
// Create a new child and update the parent revision.
@@ -478,7 +480,6 @@
childRev.SetName(typeName + "/" + keyValue)
childRev.SetupWatch(childRev.GetName())
- pr.GetBranch().Node.makeLatest(pr.GetBranch(), childRev, nil)
pr.GetBranch().LatestLock.Unlock()
// END child lock
@@ -491,7 +492,6 @@
children = append(children, childRev)
updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
-
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
// END parent lock
@@ -512,8 +512,7 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(
- path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -575,8 +574,7 @@
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
- txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
response = append(response, entry)
}
} else {
@@ -603,8 +601,7 @@
}
keyValue := field.KeyFromStr(key)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
- txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
response = append(response, entry)
}
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy.go b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
index d4a86f4..182dcdd 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
@@ -100,8 +100,12 @@
// getCallbacks returns the full list of callbacks associated to the proxy
func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
- if cb, exists := p.Callbacks[callbackType]; exists {
- return cb
+ if p != nil {
+ if cb, exists := p.Callbacks[callbackType]; exists {
+ return cb
+ }
+ } else {
+ log.Debugw("proxy-is-nil", log.Fields{"callback-type": callbackType.String()})
}
return nil
}
@@ -148,6 +152,7 @@
PROXY_UPDATE
PROXY_REMOVE
PROXY_CREATE
+ PROXY_WATCH
)
var proxyOperationTypes = []string{
@@ -157,6 +162,7 @@
"PROXY_UPDATE",
"PROXY_REMOVE",
"PROXY_CREATE",
+ "PROXY_WATCH",
}
func (t ProxyOperation) String() string {
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
index 2a5d034..a1ea6be 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
@@ -255,7 +255,7 @@
defer log.Debugw("unlocked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
- result := pac.getProxy().GetRoot().CreateProxy(path, exclusive)
+ result := pac.getProxy().ParentNode.CreateProxy(path, exclusive)
if result != nil {
return result
diff --git a/vendor/github.com/opencord/voltha-go/db/model/revision.go b/vendor/github.com/opencord/voltha-go/db/model/revision.go
index 4e606f1..cd4c5df 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/revision.go
@@ -22,7 +22,6 @@
type Revision interface {
Finalize(bool)
- IsDiscarded() bool
SetConfig(revision *DataRevision)
GetConfig() *DataRevision
Drop(txid string, includeConfig bool)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/root.go b/vendor/github.com/opencord/voltha-go/db/model/root.go
index 338ef67..5036ce1 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/root.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/root.go
@@ -91,7 +91,7 @@
dirtyNode.DeleteBranch(txid)
}
delete(r.DirtyNodes, txid)
- delete(r.node.Branches, txid)
+ r.node.DeleteBranch(txid)
}
// FoldTxBranch will merge the contents of a transaction branch with the root object
@@ -111,9 +111,8 @@
// ExecuteCallbacks will invoke all the callbacks linked to root object
func (r *root) ExecuteCallbacks() {
r.mutex.Lock()
- log.Debugf("ExecuteCallbacks has the ROOT lock : %+v", r)
defer r.mutex.Unlock()
- defer log.Debugf("ExecuteCallbacks released the ROOT lock : %+v", r)
+
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
@@ -133,36 +132,32 @@
// getCallbacks returns the available callbacks
func (r *root) GetCallbacks() []CallbackTuple {
r.mutex.Lock()
- log.Debugf("getCallbacks has the ROOT lock : %+v", r)
defer r.mutex.Unlock()
- defer log.Debugf("getCallbacks released the ROOT lock : %+v", r)
+
return r.Callbacks
}
// getCallbacks returns the available notification callbacks
func (r *root) GetNotificationCallbacks() []CallbackTuple {
r.mutex.Lock()
- log.Debugf("GetNotificationCallbacks has the ROOT lock : %+v", r)
defer r.mutex.Unlock()
- defer log.Debugf("GetNotificationCallbacks released the ROOT lock : %+v", r)
+
return r.NotificationCallbacks
}
// AddCallback inserts a new callback with its arguments
func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
r.mutex.Lock()
- log.Debugf("AddCallback has the ROOT lock : %+v", r)
defer r.mutex.Unlock()
- defer log.Debugf("AddCallback released the ROOT lock : %+v", r)
+
r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
}
// AddNotificationCallback inserts a new notification callback with its arguments
func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
r.mutex.Lock()
- log.Debugf("AddNotificationCallback has the ROOT lock : %+v", r)
defer r.mutex.Unlock()
- defer log.Debugf("AddNotificationCallback released the ROOT lock : %+v", r)
+
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
diff --git a/vendor/github.com/opencord/voltha-go/kafka/client.go b/vendor/github.com/opencord/voltha-go/kafka/client.go
old mode 100644
new mode 100755
index 3d37f6e..36c1ede
--- a/vendor/github.com/opencord/voltha-go/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/client.go
@@ -16,8 +16,9 @@
package kafka
import (
- ca "github.com/opencord/voltha-protos/go/inter_container"
"time"
+
+ ca "github.com/opencord/voltha-protos/go/inter_container"
)
const (
@@ -53,6 +54,7 @@
DefaultNumberPartitions = 3
DefaultNumberReplicas = 1
DefaultAutoCreateTopic = false
+ DefaultMetadataMaxRetry = 3
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
diff --git a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
old mode 100644
new mode 100755
index e920a83..0576da9
--- a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
@@ -18,15 +18,16 @@
import (
"errors"
"fmt"
+ "strings"
+ "sync"
+ "time"
+
scc "github.com/bsm/sarama-cluster"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
ic "github.com/opencord/voltha-protos/go/inter_container"
"gopkg.in/Shopify/sarama.v1"
- "strings"
- "sync"
- "time"
)
func init() {
@@ -73,6 +74,7 @@
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
lockOfTopicLockMap sync.RWMutex
+ metadataMaxRetry int
}
type SaramaClientOption func(*SaramaClient)
@@ -179,6 +181,12 @@
}
}
+func MetadatMaxRetries(retry int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.metadataMaxRetry = retry
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
@@ -197,6 +205,7 @@
client.numPartitions = DefaultNumberPartitions
client.numReplicas = DefaultNumberReplicas
client.autoCreateTopic = DefaultAutoCreateTopic
+ client.metadataMaxRetry = DefaultMetadataMaxRetry
for _, option := range opts {
option(client)
@@ -679,6 +688,7 @@
config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ config.Metadata.Retry.Max = sc.metadataMaxRetry
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
index 813c978..c9cd56d 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
@@ -18,6 +18,7 @@
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "os"
"reflect"
"time"
)
@@ -30,6 +31,10 @@
Id string
}
+func GetHostName() string {
+ return os.Getenv("HOSTNAME")
+}
+
//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
//response. If an error is received from a given channel then the returned error array will contain that error.
//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
index c1ca18d..3828b39 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
@@ -1082,7 +1082,7 @@
//Check match condition
//If the flow_mod match field is empty, that is a special case and indicates the flow entry matches
- if (mod.Match == nil) || (mod.Match.OxmFields == nil) {
+ if (mod.Match == nil) || (mod.Match.OxmFields == nil) || (len(mod.Match.OxmFields) == 0) {
//If we got this far and the match is empty in the flow spec, than the flow matches
return true
} // TODO : implement the flow match analysis
@@ -1156,3 +1156,11 @@
}
return len(toKeep) < len(flows), toKeep
}
+
+func ToOfpOxmField(from []*ofp.OfpOxmOfbField) []*ofp.OfpOxmField {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range from {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return matchFields
+}