In order to process events in the order they are received for a
given device and therefore prevent multiple updates on the same
device from occuring in parallel (specially across cores - within a
core these are controlled via semaphores), we are letting an event
(mostly an update to a device) to run through completion. This
therefore prevent the adapter to send changes to a device in
quick succession while the update is being processed.
Change-Id: I0d8a9ff5f35172e9247b483b7c0cffd4f435d321
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 5fede68..88b622e 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -30,16 +30,16 @@
)
type AdapterRequestHandlerProxy struct {
- TestMode bool
- coreInstanceId string
- deviceMgr *DeviceManager
- lDeviceMgr *LogicalDeviceManager
- adapterMgr *AdapterManager
- localDataProxy *model.Proxy
- clusterDataProxy *model.Proxy
- defaultRequestTimeout int64
+ TestMode bool
+ coreInstanceId string
+ deviceMgr *DeviceManager
+ lDeviceMgr *LogicalDeviceManager
+ adapterMgr *AdapterManager
+ localDataProxy *model.Proxy
+ clusterDataProxy *model.Proxy
+ defaultRequestTimeout int64
longRunningRequestTimeout int64
- coreInCompetingMode bool
+ coreInCompetingMode bool
}
func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
@@ -66,7 +66,7 @@
log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn := NewKVTransaction(transactionId)
if txn == nil {
- return nil, errors.New("fail-to-create-transaction")
+ return nil, errors.New("fail-to-create-transaction")
} else if txn.Acquired(timeout) {
return txn, nil
} else {
@@ -241,9 +241,9 @@
if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
return nil, status.Errorf(codes.Internal, "%s", err.Error())
} else {
- // An adapter request needs an Ack without having to wait for the update to be
- // completed. We therefore run the update in its own routine.
- go rhp.deviceMgr.updateDevice(updatedDevice)
+ if err := rhp.deviceMgr.updateDevice(updatedDevice); err != nil {
+ return nil, err
+ }
}
return new(empty.Empty), nil
@@ -366,7 +366,7 @@
}
if rhp.TestMode { // Execute only for test cases
- return &voltha.Devices{Items:nil}, nil
+ return &voltha.Devices{Items: nil}, nil
}
return rhp.deviceMgr.getAllChildDevices(pID.Id)
@@ -491,7 +491,10 @@
return nil, nil
}
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
@@ -548,7 +551,10 @@
}
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- go rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ if err := rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
@@ -609,7 +615,10 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val), voltha.OperStatus_OperStatus(operStatus.Val))
+ if err := rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
@@ -657,8 +666,9 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- // Run port creation in its own go routine
- go rhp.deviceMgr.addPort(deviceId.Id, port)
+ if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
@@ -692,7 +702,7 @@
}
}
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
- "init": init, "transactionID": transactionID.Val})
+ "init": init, "transactionID": transactionID.Val})
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
@@ -709,8 +719,9 @@
return nil, nil
}
- // Run PM config update in its own go routine
- go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+ if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
@@ -758,11 +769,12 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
+ if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}
-
func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -808,6 +820,8 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- go rhp.deviceMgr.updateImageDownload(deviceId.Id, img)
+ if err := rhp.deviceMgr.updateImageDownload(deviceId.Id, img); err != nil {
+ return nil, err
+ }
return new(empty.Empty), nil
}