[VOL-1462] Sync data between two voltha cores in the same pair
This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID. Two cores can therefore send the same
packet and let the OFAgent discard the duplicate. The work in
OFAgent remains.
5) Cleanups
Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index d4d3c69..0be4e12 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -139,7 +139,7 @@
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
// Load the adapters
- if adaptersIf := aMgr.clusterDataProxy.Get("/adapters", 0, false, ""); adaptersIf != nil {
+ if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -153,7 +153,7 @@
}
// Load the device types
- if deviceTypesIf := aMgr.clusterDataProxy.Get("/device_types", 0, false, ""); deviceTypesIf != nil {
+ if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -169,6 +169,31 @@
}
}
+
+//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
+ // Update the adapters
+ if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
+ for _, adapterIf := range adaptersIf.([]interface{}) {
+ if adapter, ok := adapterIf.(*voltha.Adapter); ok {
+ log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
+ aMgr.updateAdapter(adapter)
+ }
+ }
+ }
+ // Update the device types
+ if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
+ dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+ for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
+ if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
+ log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+ aMgr.updateDeviceType(dType)
+ }
+ }
+ }
+}
+
+
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
@@ -232,7 +257,9 @@
defer aMgr.lockAdaptersMap.Unlock()
for _, adapterAgent := range aMgr.adapterAgents {
if a := adapterAgent.getAdapter(); a != nil {
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ if a.Id != SENTINEL_ADAPTER_ID { // don't report the sentinel
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ }
}
}
return result, nil
@@ -270,7 +297,7 @@
defer aMgr.lockAdaptersMap.Unlock()
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; !exist {
+ if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; exist {
adapterAgent.updateDeviceType(deviceType)
} else {
aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 98cc688..5fede68 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,6 +22,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -36,9 +37,14 @@
adapterMgr *AdapterManager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
+ defaultRequestTimeout int64
+ longRunningRequestTimeout int64
+ coreInCompetingMode bool
}
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+ aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
+ defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
@@ -46,17 +52,43 @@
proxy.clusterDataProxy = cdProxy
proxy.localDataProxy = ldProxy
proxy.adapterMgr = aMgr
+ proxy.coreInCompetingMode = incompetingMode
+ proxy.defaultRequestTimeout = defaultRequestTimeout
+ proxy.longRunningRequestTimeout = longRunningRequestTimeout
return &proxy
}
+func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := rhp.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn := NewKVTransaction(transactionId)
+ if txn == nil {
+ return nil, errors.New("fail-to-create-transaction")
+ } else if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
+ return rhp.coreInCompetingMode
+}
+
func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
adapter := &voltha.Adapter{}
deviceTypes := &voltha.DeviceTypes{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "adapter":
@@ -69,9 +101,27 @@
log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // Update our adapters in memory
+ go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
@@ -80,17 +130,40 @@
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: pID.Id}, nil
@@ -125,17 +198,40 @@
}
func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
device := &voltha.Device{}
- if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+ log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return new(empty.Empty), nil
@@ -154,32 +250,56 @@
}
func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) < 1 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: pID.Id}, nil
}
- return nil, nil
+ return rhp.deviceMgr.GetDevice(pID.Id)
}
func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
pt := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -192,9 +312,14 @@
log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
+ log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val, "transactionID": transactionID.Val})
if rhp.TestMode { // Execute only for test cases
aPort := &voltha.Port{Label: "test_port"}
allPorts := &voltha.Ports{}
@@ -204,31 +329,54 @@
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) != 1 {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetChildDevices", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
- return &voltha.Device{Id: pID.Id}, nil
+ return &voltha.Devices{Items:nil}, nil
}
- //TODO: Complete
- return nil, nil
+
+ return rhp.deviceMgr.getAllChildDevices(pID.Id)
}
// ChildDeviceDetected is invoked when a child device is detected. The following
// parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 4 {
+ if len(args) < 5 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -238,6 +386,7 @@
portNo := &ic.IntType{}
dt := &ic.StrType{}
chnlId := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "parent_device_id":
@@ -260,10 +409,26 @@
log.Warnw("cannot-unmarshal-channel-id", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
- "deviceType": dt.Val, "channelId": chnlId.Val})
+ "deviceType": dt.Val, "channelId": chnlId.Val, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -275,7 +440,7 @@
}
func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -283,6 +448,7 @@
deviceId := &voltha.ID{}
operStatus := &ic.IntType{}
connStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -300,9 +466,27 @@
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -312,7 +496,7 @@
}
func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -320,6 +504,7 @@
deviceId := &voltha.ID{}
operStatus := &ic.IntType{}
connStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -337,9 +522,27 @@
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -350,7 +553,7 @@
}
func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -359,6 +562,7 @@
portType := &ic.IntType{}
portNo := &ic.IntType{}
operStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -381,10 +585,27 @@
log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
return nil, err
}
-
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+ log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus,
+ "portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -393,13 +614,14 @@
}
func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
port := &voltha.Port{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -412,9 +634,25 @@
log.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
+ log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -426,13 +664,14 @@
}
func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
init := &ic.BoolType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_pm_config":
@@ -445,10 +684,26 @@
log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
- "init": init})
+ "init": init, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -461,7 +716,7 @@
}
func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 3 {
+ if len(args) < 4 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -469,6 +724,7 @@
deviceId := &voltha.ID{}
portNo := &ic.IntType{}
packet := &ic.Packet{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -486,14 +742,23 @@
log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
return nil, err
}
-
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
+ log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet,
+ "transactionID": transactionID.Val})
+
+ // For performance reason, we do not compete for packet-in. We process it and send the packet in. later in the
+ // processing flow the duplicate packet will be discarded
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
+ go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
return new(empty.Empty), nil
}
@@ -506,6 +771,7 @@
}
deviceId := &voltha.ID{}
img := &voltha.ImageDownload{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -518,9 +784,27 @@
log.Warnw("cannot-unmarshal-imgaeDownload", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img})
+ log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img,
+ "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c81141b..838235d 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -154,9 +154,11 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
- aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
- requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
+func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+ ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+ ) error {
+ requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
log.Info("request-handlers")
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 8bf8664..116e2bb 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -54,10 +54,10 @@
cloned := (proto.Clone(device)).(*voltha.Device)
if cloned.Id == "" {
cloned.Id = CreateDeviceId()
+ cloned.AdminState = voltha.AdminState_PREPROVISIONED
+ cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+ cloned.Flows = &ofp.Flows{Items: nil}
}
- cloned.AdminState = voltha.AdminState_PREPROVISIONED
- cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
- cloned.Flows = &ofp.Flows{Items: nil}
if !device.GetRoot() && device.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
@@ -74,15 +74,29 @@
return &agent
}
-// start save the device to the data model and registers for callbacks on that device
-func (agent *DeviceAgent) start(ctx context.Context) {
+// start save the device to the data model and registers for callbacks on that device if loadFromdB is false. Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies.
+func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
- // Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
- log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
+ if loadFromdB {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ agent.lastData = proto.Clone(d).(*voltha.Device)
+ }
+ } else {
+ log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
+ return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+ }
+ log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+ } else {
+ // Add the initial device to the local model
+ if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+ log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ }
}
+
agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
@@ -97,6 +111,7 @@
agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
log.Debug("device-agent-started")
+ return nil
}
// stop stops the device agent. Not much to do for now
@@ -112,7 +127,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -124,7 +139,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -808,7 +823,7 @@
}
//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapterAgents
+//to the adapters
func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -842,7 +857,7 @@
}
groups := device.FlowGroups
- // Send update to adapterAgents
+ // Send update to adapters
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
@@ -882,7 +897,7 @@
}
//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapterAgents
+//to the adapters
func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -916,11 +931,9 @@
}
flows := device.Flows
- // Send update to adapterAgents
- // TODO: Check whether the device supports incremental flow changes
- // Assume false for test
- acceptsAddRemoveFlowUpdates := false
- if !acceptsAddRemoveFlowUpdates {
+ // Send update to adapters
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index cd662ac..88b0c7d 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,7 +34,7 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
+ adapterMgr *AdapterManager
logicalDeviceMgr *LogicalDeviceManager
kafkaICProxy *kafka.InterContainerProxy
stateTransitions *TransitionMap
@@ -52,7 +52,7 @@
deviceMgr.kafkaICProxy = kafkaICProxy
deviceMgr.coreInstanceId = coreInstanceId
deviceMgr.clusterDataProxy = cdProxy
- deviceMgr.adapterMgr= adapterMgr
+ deviceMgr.adapterMgr = adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
@@ -96,16 +96,27 @@
delete(dMgr.deviceAgents, agent.deviceId)
}
+// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- // TODO If the device is not in memory it needs to be loaded first
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+ dMgr.lockDeviceAgentsMap.Unlock()
return agent
+ } else {
+ // Try to load into memory - loading will also create the device agent
+ dMgr.lockDeviceAgentsMap.Unlock()
+ if err := dMgr.load(deviceId); err == nil {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+ return agent
+ }
+ }
}
return nil
}
+// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
@@ -122,7 +133,7 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
- agent.start(ctx)
+ agent.start(ctx, false)
sendResponse(ctx, ch, agent.lastData)
}
@@ -133,8 +144,6 @@
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.enableDevice(ctx)
log.Debugw("EnableDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
}
sendResponse(ctx, ch, res)
@@ -181,6 +190,7 @@
sendResponse(ctx, ch, res)
}
+// GetDevice will returns a device, either from memory or from the dB, if present
func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
log.Debugw("GetDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -189,6 +199,13 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
+func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ _, exist := dMgr.deviceAgents[id]
+ return exist
+}
+
func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
device, err := dMgr.GetDevice(id)
if err != nil {
@@ -203,10 +220,15 @@
result := &voltha.Devices{}
if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
- if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
- agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
+ // If device is not in memory then set it up
+ if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
+ agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
}
result.Items = append(result.Items, device.(*voltha.Device))
}
@@ -214,6 +236,97 @@
return result, nil
}
+// loadDevice loads the deviceId in memory, if not present
+func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ // Sanity check
+ if deviceId == "" {
+ return nil, status.Error(codes.InvalidArgument, "deviceId empty")
+ }
+ if !dMgr.IsDeviceInCache(deviceId) {
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return nil, err
+ }
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent, nil
+ }
+ return nil, status.Error(codes.NotFound, deviceId) // This should nto happen
+}
+
+// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+ log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
+ if device.Root {
+ // Scenario A
+ if device.ParentId != "" {
+ // Load logical device if needed.
+ if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+ log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
+ }
+ } else {
+ log.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
+ }
+ // Load all child devices, if needed
+ if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+ for _, childDeviceId := range childDeviceIds {
+ if _, err := dMgr.loadDevice(childDeviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ return err
+ }
+ }
+ log.Debugw("loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
+ } else {
+ log.Debugw("no-child-to-load", log.Fields{"deviceId": device.Id})
+ }
+ }
+ return nil
+}
+
+// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children. Loading
+// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
+// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
+// and the proceed with the request.
+func (dMgr *DeviceManager) load(deviceId string) error {
+ log.Debug("load...")
+ // First load the device - this may fail in case the device was deleted intentionally by the other core
+ var dAgent *DeviceAgent
+ var err error
+ if dAgent, err = dMgr.loadDevice(deviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ // Get the loaded device details
+ var device *voltha.Device
+ if device, err = dAgent.getDevice(); err != nil {
+ return err
+ }
+
+ // If the device is in Pre-provisioning or deleted state stop here
+ if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
+ return nil
+ }
+
+ // Now we face two scenarios
+ if device.Root {
+ // Load all children as well as the parent of this device (logical_device)
+ if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+ log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ log.Debugw("successfully-loaded-parent-and-children", log.Fields{"deviceId": deviceId})
+ } else {
+ // Scenario B - use the parentId of that device (root device) to trigger the loading
+ if device.ParentId != "" {
+ return dMgr.load(device.ParentId)
+ }
+ }
+ return nil
+}
+
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
log.Debug("ListDeviceIDs")
@@ -230,17 +343,17 @@
reconciled := 0
for _, id := range ids.Items {
// Act on the device only if its not present in the agent map
- if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ if !dMgr.IsDeviceInCache(id.Id) {
// Device Id not in memory
log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Load device from model
- if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
- agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
- reconciled += 1
+ // Load device from dB
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id:id.Id}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
+ agent.stop(nil)
} else {
- log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ dMgr.addDeviceAgentToMap(agent)
+ reconciled += 1
}
} else {
reconciled += 1
@@ -393,7 +506,7 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
+ agent.start(nil, false)
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
@@ -430,7 +543,7 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, packet []byte) error {
+func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, transactionId string, packet []byte) error {
log.Debugw("PacketIn", log.Fields{"deviceId": deviceId, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
@@ -444,7 +557,7 @@
return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
}
- if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, packet); err != nil {
+ if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, transactionId, packet); err != nil {
return err
}
return nil
@@ -576,6 +689,23 @@
return childDeviceIds, nil
}
+//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDevices(parentDeviceId string) (*voltha.Devices, error) {
+ log.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ if parentDevice, err := dMgr.GetDevice(parentDeviceId); err == nil {
+ childDevices := make([]*voltha.Device, 0)
+ if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+ for _, deviceId := range childDeviceIds {
+ if d, e := dMgr.GetDevice(deviceId); e == nil && d != nil {
+ childDevices = append(childDevices, d)
+ }
+ }
+ }
+ return &voltha.Devices{Items: childDevices}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", parentDeviceId)
+}
+
func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
@@ -585,7 +715,6 @@
return nil
}
-
func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
@@ -661,7 +790,6 @@
sendResponse(ctx, ch, res)
}
-
func (dMgr *DeviceManager) updateImageDownload(deviceId string, img *voltha.ImageDownload) error {
log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -691,7 +819,6 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-
func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
log.Info("activateDevice")
return nil
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 877fcb4..f72d615 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,8 +35,6 @@
//TODO: Move this Tag into the proto file
const OF_CONTROLLER_TAG= "voltha_backend_name"
-//const MAX_RESPONSE_TIME = int64(500) // milliseconds
-
const (
IMAGE_DOWNLOAD = iota
CANCEL_IMAGE_DOWNLOAD = iota
@@ -44,6 +42,15 @@
REVERT_IMAGE = iota
)
+
+type deviceID struct {
+ id string
+}
+
+type logicalDeviceID struct {
+ id string
+}
+
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
@@ -118,7 +125,7 @@
return handler.coreInCompetingMode
}
-func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -130,6 +137,20 @@
} else if txn.Acquired(timeout) {
return txn, nil
} else {
+ if id != nil {
+ // The id can either be a device Id or a logical device id.
+ if dId, ok := id.(*deviceID); ok {
+ // Since this core has not processed this request, let's load the device, along with its extended
+ // family (parents and children) in memory. This will keep this core in-sync with its paired core as
+ // much as possible. The watch feature in the core model will ensure that the contents of those objects in
+ // memory are in sync.
+ time.Sleep(2 * time.Second)
+ go handler.deviceMgr.load(dId.id)
+ } else if ldId, ok := id.(*logicalDeviceID); ok {
+ // This will load the logical device along with its children and grandchildren
+ go handler.logicalDeviceMgr.load(ldId.id)
+ }
+ }
return nil, errors.New("failed-to-seize-request")
}
}
@@ -173,7 +194,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -194,7 +215,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -216,7 +237,7 @@
if handler.competeForTransaction() {
if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -239,7 +260,7 @@
if handler.competeForTransaction() {
if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -324,7 +345,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -361,7 +382,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -382,7 +403,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -403,7 +424,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -424,7 +445,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -446,7 +467,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
@@ -537,7 +558,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -670,7 +691,8 @@
return nil
}
-func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+func (handler *APIHandler) sendPacketIn(deviceId string, transationId string, packet *openflow_13.OfpPacketIn) {
+ // TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
// Enqueue the packet
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 7875da7..bda249f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -66,63 +66,75 @@
}
// start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
- log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- //Build the logical device based on information retrieved from the device adapter
- var switchCap *ic.SwitchCapability
- var err error
- if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
- log.Errorw("error-creating-logical-device", log.Fields{"error": err})
- return err
- }
-
- ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
-
- // Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
- var datapathID uint64
- if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
- log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
- return err
- }
- ld.DatapathId = datapathID
- ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
- ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
- ld.Flows = &ofp.Flows{Items: nil}
- ld.FlowGroups = &ofp.FlowGroups{Items: nil}
-
- //Add logical ports to the logical device based on the number of NNI ports discovered
- //First get the default port capability - TODO: each NNI port may have different capabilities,
- //hence. may need to extract the port by the NNI port id defined by the adapter during device
- //creation
- var nniPorts *voltha.Ports
- if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
- log.Errorw("error-creating-logical-port", log.Fields{"error": err})
- }
- var portCap *ic.PortCapability
- for _, port := range nniPorts.Items {
- log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId, "loadFromdB": loadFromdB})
+ var ld *voltha.LogicalDevice
+ if !loadFromdB {
+ //Build the logical device based on information retrieved from the device adapter
+ var switchCap *ic.SwitchCapability
+ var err error
+ if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
return err
}
- portCap.Port.RootPort = true
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- lp.DeviceId = agent.rootDeviceId
- lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
- lp.OfpPort.PortNo = port.PortNo
- lp.OfpPort.Name = lp.Id
- lp.DevicePortNo = port.PortNo
- ld.Ports = append(ld.Ports, lp)
+
+ ld = &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+ // Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+ var datapathID uint64
+ if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+ log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+ return err
+ }
+ ld.DatapathId = datapathID
+ ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+ ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
+ ld.Flows = &ofp.Flows{Items: nil}
+ ld.FlowGroups = &ofp.FlowGroups{Items: nil}
+
+ //Add logical ports to the logical device based on the number of NNI ports discovered
+ //First get the default port capability - TODO: each NNI port may have different capabilities,
+ //hence. may need to extract the port by the NNI port id defined by the adapter during device
+ //creation
+ var nniPorts *voltha.Ports
+ if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
+ log.Errorw("error-creating-logical-port", log.Fields{"error": err})
+ }
+ var portCap *ic.PortCapability
+ for _, port := range nniPorts.Items {
+ log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+ log.Errorw("error-creating-logical-device", log.Fields{"error": err})
+ return err
+ }
+ portCap.Port.RootPort = true
+ lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ lp.DeviceId = agent.rootDeviceId
+ lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
+ lp.OfpPort.PortNo = port.PortNo
+ lp.OfpPort.Name = lp.Id
+ lp.DevicePortNo = port.PortNo
+ ld.Ports = append(ld.Ports, lp)
+ }
+ agent.lockLogicalDevice.Lock()
+ //defer agent.lockLogicalDevice.Unlock()
+ // Save the logical device
+ if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
+ log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
+ agent.lockLogicalDevice.Unlock()
+ } else {
+ // load from dB - the logical may not exist at this time. On error, just return and the calling function
+ // will destroy this agent.
+ var err error
+ if ld, err = agent.GetLogicalDevice(); err != nil {
+ log.Warnw("failed-to-load-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ return err
+ }
}
agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
- // Save the logical device
- if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
- log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- } else {
- log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- }
-
agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
false)
@@ -1033,9 +1045,9 @@
}
}
-func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
- log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
+func (agent *LogicalDeviceAgent) packetIn(port uint32, transactionId string, packet []byte) {
+ log.Debugw("packet-in", log.Fields{"port": port, "packet": packet, "transactionId": transactionId})
packetIn := fd.MkPacketIn(port, packet)
- agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+ agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 18bc30a..b4dc7ea 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -113,7 +113,7 @@
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
agent = newLogicalDeviceAgent(
@@ -124,7 +124,7 @@
ldMgr.clusterDataProxy,
)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.start(nil)
+ go agent.start(nil, true)
}
result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
}
@@ -166,12 +166,32 @@
agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.start(ctx)
+ go agent.start(ctx, false)
log.Debug("creating-logical-device-ends")
return &id, nil
}
+// load loads a logical device manager in memory
+func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
+ // a create logical device callback from occurring at the same time.
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+ // Logical device not in memory - create a temp logical device Agent and let it load from memory
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return err
+ }
+ ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ }
+ // TODO: load the child device
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
@@ -365,10 +385,10 @@
}
}
-func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, packet []byte) error {
+func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, transactionId string, packet []byte) error {
log.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceId, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(logicalDeviceId); agent != nil {
- agent.packetIn(port, packet)
+ agent.packetIn(port, transactionId, packet)
} else {
log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceId})
}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index a6b90aa..f702633 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -497,7 +497,7 @@
return 0
}
if md <= 0xffffffff {
- log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+ log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
return md
}
return md & 0xffffffff
@@ -512,7 +512,7 @@
return 0
}
if md <= 0xffffffff {
- log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+ log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
return md
}
return (md >> 32) & 0xffffffff
@@ -1256,20 +1256,17 @@
return deviceRules
}
- var ingressDevice *voltha.Device
- var err error
- if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
- log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID})
- return deviceRules
- }
-
- isDownstream := ingressDevice.Root
- isUpstream := !isDownstream
-
// Process controller bound flow
if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
deviceRules = fd.processControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
} else {
+ var ingressDevice *voltha.Device
+ var err error
+ if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
+ log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID, "flow": flow})
+ return deviceRules
+ }
+ isUpstream := !ingressDevice.Root
if isUpstream {
deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
} else if HasNextTable(flow) {