VOL-2180 context changes in voltha-go
Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.
Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 96fceb3..02d0b7a 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -122,12 +122,12 @@
}
//// Create the proxies
- aMgr.adapterProxy, err = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/adapters", false)
+ aMgr.adapterProxy, err = aMgr.clusterDataProxy.CreateProxy(ctx, "/adapters", false)
if err != nil {
log.Errorw("Failed-to-create-adapter-proxy", log.Fields{"error": err})
return err
}
- aMgr.deviceTypeProxy, err = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/device_types", false)
+ aMgr.deviceTypeProxy, err = aMgr.clusterDataProxy.CreateProxy(ctx, "/device_types", false)
if err != nil {
log.Errorw("Failed-to-create-device-proxy", log.Fields{"error": err})
return err
@@ -190,7 +190,7 @@
}
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(adapter *voltha.Adapter) {
+func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
@@ -198,7 +198,7 @@
if adapterAgent.getAdapter() != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
go func() {
- err := aMgr.deviceMgr.adapterRestarted(adapter)
+ err := aMgr.deviceMgr.adapterRestarted(ctx, adapter)
if err != nil {
log.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
}
@@ -208,7 +208,7 @@
}
// Update the adapters
- adaptersIf, err := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, "")
+ adaptersIf, err := aMgr.clusterDataProxy.List(ctx, "/adapters", 0, false, "")
if err != nil {
log.Errorw("failed-to-list-adapters-from-cluster-proxy", log.Fields{"error": err})
return
@@ -381,7 +381,7 @@
if aMgr.getAdapter(adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
go func() {
- err := aMgr.deviceMgr.adapterRestarted(adapter)
+ err := aMgr.deviceMgr.adapterRestarted(context.Background(), adapter)
if err != nil {
log.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
}
@@ -444,7 +444,7 @@
}
//adapterUpdated is a callback invoked when an adapter change has been noticed
-func (aMgr *AdapterManager) adapterUpdated(args ...interface{}) interface{} {
+func (aMgr *AdapterManager) adapterUpdated(ctx context.Context, args ...interface{}) interface{} {
log.Debugw("updateAdapter-callback", log.Fields{"argsLen": len(args)})
var previousData *voltha.Adapters
@@ -477,7 +477,7 @@
}
//deviceTypesUpdated is a callback invoked when a device type change has been noticed
-func (aMgr *AdapterManager) deviceTypesUpdated(args ...interface{}) interface{} {
+func (aMgr *AdapterManager) deviceTypesUpdated(ctx context.Context, args ...interface{}) interface{} {
log.Debugw("deviceTypesUpdated-callback", log.Fields{"argsLen": len(args)})
var previousData *voltha.DeviceTypes
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 86a6431..ef767bd 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -35,11 +35,11 @@
TestMode bool
deviceTopicRegistered bool
corePairTopic string
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
}
// NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
return &AdapterProxy{
kafkaICProxy: kafkaProxy,
corePairTopic: corePairTopic,
@@ -409,7 +409,7 @@
return nil, nil
}
-func (ap *AdapterProxy) packetOut(deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
+func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
log.Debugw("packetOut", log.Fields{"deviceId": deviceID})
toTopic := ap.getAdapterTopic(deviceType)
rpc := "receive_packet_out"
@@ -432,13 +432,13 @@
// TODO: Do we need to wait for an ACK on a packet Out?
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
log.Debugw("packetOut", log.Fields{"deviceid": deviceID, "success": success})
return unPackResponse(rpc, deviceID, success, result)
}
// UpdateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
+func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
@@ -462,13 +462,13 @@
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
// UpdateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("UpdateFlowsIncremental",
log.Fields{
"deviceId": device.Id,
@@ -500,7 +500,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -521,7 +521,7 @@
}
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 1827b5a..3989142 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -40,8 +40,8 @@
)
var (
- coreKafkaICProxy *kafka.InterContainerProxy
- adapterKafkaICProxy *kafka.InterContainerProxy
+ coreKafkaICProxy kafka.InterContainerProxy
+ adapterKafkaICProxy kafka.InterContainerProxy
kc kafka.Client
adapterReqHandler *com.RequestHandlerProxy
adapter *cm.Adapter
@@ -60,12 +60,9 @@
kc = lm.NewKafkaClient()
// Setup core inter-container proxy and core request handler
- if coreKafkaICProxy, err = kafka.NewInterContainerProxy(
+ coreKafkaICProxy = kafka.NewInterContainerProxy(
kafka.MsgClient(kc),
- kafka.DefaultTopic(&kafka.Topic{Name: coreName})); err != nil || coreKafkaICProxy == nil {
- log.Fatalw("Failure-creating-core-intercontainerProxy", log.Fields{"error": err})
-
- }
+ kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
if err = coreKafkaICProxy.Start(); err != nil {
log.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
}
@@ -77,12 +74,10 @@
adapterCoreProxy := com.NewCoreProxy(nil, adapterName, coreName)
adapter = cm.NewAdapter(adapterCoreProxy)
adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
- if adapterKafkaICProxy, err = kafka.NewInterContainerProxy(
+ adapterKafkaICProxy = kafka.NewInterContainerProxy(
kafka.MsgClient(kc),
kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
- kafka.RequestHandlerInterface(adapterReqHandler)); err != nil || adapterKafkaICProxy == nil {
- log.Fatalw("Failure-creating-adapter-intercontainerProxy", log.Fields{"error": err})
- }
+ kafka.RequestHandlerInterface(adapterReqHandler))
if err = adapterKafkaICProxy.Start(); err != nil {
log.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
}
@@ -172,18 +167,18 @@
outPort := uint32(1)
packet, err := getRandomBytes(50)
assert.Nil(t, err)
- err = ap.packetOut(adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+ err = ap.packetOut(context.Background(), adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
assert.Nil(t, err)
}
func testFlowUpdates(t *testing.T) {
ap := NewAdapterProxy(coreKafkaICProxy, coreName)
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
- err := ap.UpdateFlowsBulk(d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
+ err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
assert.Nil(t, err)
flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
- err = ap.UpdateFlowsIncremental(d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+ err = ap.UpdateFlowsIncremental(context.Background(), d, flowChanges, groupChanges, &voltha.FlowMetadata{})
assert.Nil(t, err)
}
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 07db0be..dbdcc31 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -66,7 +66,7 @@
}
// This is a helper function that attempts to acquire the request by using the device ownership model
-func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
timeout := rhp.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -80,13 +80,13 @@
var err error
if devID != "" {
var ownedByMe bool
- if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: devID}); err != nil {
+ if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: devID}); err != nil {
log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devID, "error": err})
return nil, kafka.ErrorTransactionInvalidId
}
- acquired, err = txn.Acquired(timeout, ownedByMe)
+ acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
} else {
- acquired, err = txn.Acquired(timeout)
+ acquired, err = txn.Acquired(ctx, timeout)
}
if err == nil && acquired {
log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
@@ -135,16 +135,16 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, "")
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, "")
if err != nil {
if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionID})
// Update our adapters in memory
- go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+ go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(context.TODO(), adapter)
}
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -181,12 +181,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -194,7 +194,7 @@
}
// Get the device via the device manager
- device, err := rhp.deviceMgr.GetDevice(pID.Id)
+ device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -230,12 +230,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, device.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
log.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
@@ -243,7 +243,7 @@
return new(empty.Empty), nil
}
go func() {
- err := rhp.deviceMgr.updateDeviceUsingAdapterData(device)
+ err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device)
if err != nil {
log.Errorw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
}
@@ -297,18 +297,18 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: pID.Id}, nil
}
- return rhp.deviceMgr.GetChildDevice(pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
+ return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
}
// GetChildDeviceWithProxyAddress returns details of child device with proxy address
@@ -339,18 +339,18 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, proxyAddress.DeviceId)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: proxyAddress.DeviceId}, nil
}
- return rhp.deviceMgr.GetChildDeviceWithProxyAddress(proxyAddress)
+ return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
}
// GetPorts returns the ports information of the device based on the port type.
@@ -391,12 +391,12 @@
}
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
return rhp.deviceMgr.getPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
@@ -430,19 +430,19 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return &voltha.Devices{Items: nil}, nil
}
- return rhp.deviceMgr.getAllChildDevices(pID.Id)
+ return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
}
// ChildDeviceDetected is invoked when a child device is detected. The following
@@ -513,18 +513,18 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- device, err := rhp.deviceMgr.childDeviceDetected(pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+ device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
if err != nil {
log.Errorw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
return nil, err
@@ -573,12 +573,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -586,7 +586,7 @@
}
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
go func() {
- err := rhp.deviceMgr.updateDeviceStatus(deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val))
if err != nil {
log.Errorw("unable-to-update-device-status", log.Fields{"error": err})
@@ -636,12 +636,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -650,7 +650,7 @@
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
go func() {
- err := rhp.deviceMgr.updateChildrenStatus(deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val))
if err != nil {
log.Errorw("unable-to-update-children-status", log.Fields{"error": err})
@@ -697,12 +697,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -710,7 +710,7 @@
}
go func() {
- err := rhp.deviceMgr.updatePortsState(deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
+ err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
if err != nil {
log.Errorw("unable-to-update-ports-state", log.Fields{"error": err})
}
@@ -765,12 +765,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -778,7 +778,7 @@
}
go func() {
- err := rhp.deviceMgr.updatePortState(deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
voltha.OperStatus_Types(operStatus.Val))
if err != nil {
log.Errorw("unable-to-update-port-state", log.Fields{"error": err})
@@ -819,12 +819,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -832,7 +832,7 @@
}
go func() {
- err := rhp.deviceMgr.deleteAllPorts(deviceID.Id)
+ err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id)
if err != nil {
log.Errorw("unable-to-delete-ports", log.Fields{"error": err})
}
@@ -869,12 +869,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -882,7 +882,7 @@
}
go func() {
- err := rhp.deviceMgr.childDevicesLost(parentDeviceID.Id)
+ err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id)
if err != nil {
log.Errorw("unable-to-disable-child-devices", log.Fields{"error": err})
}
@@ -919,19 +919,19 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.childDevicesDetected(parentDeviceID.Id); err != nil {
+ if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
log.Errorw("child-devices-dection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
return nil, err
}
@@ -972,19 +972,19 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
go func() {
- err := rhp.deviceMgr.addPort(deviceID.Id, port)
+ err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port)
if err != nil {
log.Errorw("unable-to-add-port", log.Fields{"error": err})
}
@@ -1021,12 +1021,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pmConfigs.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -1034,7 +1034,7 @@
}
go func() {
- err := rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
+ err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs)
if err != nil {
log.Errorw("unable-to-initialize-pm-configs", log.Fields{"error": err})
}
@@ -1085,18 +1085,18 @@
// TODO: If this adds too much latencies then needs to remove transaction and let OFAgent filter out
// duplicates.
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
go func() {
- err := rhp.deviceMgr.PacketIn(deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
+ err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
if err != nil {
log.Errorw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
}
@@ -1139,19 +1139,19 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
go func() {
- err := rhp.deviceMgr.updateImageDownload(deviceID.Id, img)
+ err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img)
if err != nil {
log.Errorw("unable-to-update-image-download", log.Fields{"error": err})
}
@@ -1189,12 +1189,12 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
if err != nil {
log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
@@ -1203,7 +1203,7 @@
// Run it in its own routine
go func() {
- err := rhp.deviceMgr.reconcileChildDevices(parentDeviceID.Id)
+ err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id)
if err != nil {
log.Errorw("unable-to-reconcile-child-devices", log.Fields{"error": err})
}
@@ -1246,21 +1246,21 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+ txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
if err != nil {
log.Debugw("DeviceReasonUpdate: Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
return nil, err
}
- defer txn.Close()
+ defer txn.Close(context.TODO())
}
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- // Run it in its own routine
+ // Run it in its own routine (w/ background context)
go func() {
- err := rhp.deviceMgr.updateDeviceReason(deviceID.Id, reason.Val)
+ err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val)
if err != nil {
log.Errorw("unable-to-update-device-reason", log.Fields{"error": err})
}
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 33e92d5..1128cde 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -124,13 +124,9 @@
func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
var err error
var adapter adapters.IAdapter
- adapterKafkaICProxy, err := kafka.NewInterContainerProxy(
+ adapterKafkaICProxy := kafka.NewInterContainerProxy(
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
- if err != nil || adapterKafkaICProxy == nil {
- log.Errorw("Failure-creating-adapter-intercontainerProxy", log.Fields{"error": err, "adapter": adapterName})
- return nil, err
- }
adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
var adapterReqHandler *com.RequestHandlerProxy
switch adapterType {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 228148c..23f4bb2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -44,7 +44,7 @@
grpcNBIAPIHandler *APIHandler
adapterMgr *AdapterManager
config *config.RWCoreFlags
- kmp *kafka.InterContainerProxy
+ kmp kafka.InterContainerProxy
clusterDataRoot model.Root
localDataRoot model.Root
clusterDataProxy *model.Proxy
@@ -120,12 +120,12 @@
}
var err error
- core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
+ core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(ctx, "/", false)
if err != nil {
probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
return fmt.Errorf("Failed to create cluster data proxy")
}
- core.localDataProxy, err = core.localDataRoot.CreateProxy(context.Background(), "/", false)
+ core.localDataProxy, err = core.localDataRoot.CreateProxy(ctx, "/", false)
if err != nil {
probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
return fmt.Errorf("Failed to create local data proxy")
@@ -133,9 +133,7 @@
// core.kmp must be created before deviceMgr and adapterMgr, as they will make
// private copies of the poiner to core.kmp.
- if err := core.initKafkaManager(ctx); err != nil {
- log.Fatal("Failed-to-init-kafka-manager")
- }
+ core.initKafkaManager(ctx)
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
@@ -221,32 +219,26 @@
*/
probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
log.Info("grpc-server-started")
- core.grpcServer.Start(context.Background())
+ core.grpcServer.Start(ctx)
probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
}
// Initialize the kafka manager, but we will start it later
-func (core *Core) initKafkaManager(ctx context.Context) error {
+func (core *Core) initKafkaManager(ctx context.Context) {
log.Infow("initialize-kafka-manager", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
// create the proxy
- var err error
- if core.kmp, err = kafka.NewInterContainerProxy(
+ core.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerHost(core.config.KafkaAdapterHost),
kafka.InterContainerPort(core.config.KafkaAdapterPort),
kafka.MsgClient(core.kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
- log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
- return err
- }
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic}))
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
-
- return nil
}
/*
@@ -364,14 +356,9 @@
func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
- // Get timeout in seconds with 1 second set as minimum
- timeout := int(core.config.DefaultCoreTimeout / 1000)
- if timeout < 1 {
- timeout = 1
- }
count := 0
for {
- if !core.kvClient.IsConnectionUp(timeout) {
+ if !core.kvClient.IsConnectionUp(ctx) {
log.Info("KV-store-unreachable")
if maxRetries != -1 {
if count >= maxRetries {
@@ -495,7 +482,7 @@
// The Liveness check will push Live state to same channel which this routine is
// reading and processing. This, do it asynchronously to avoid blocking for
// backend response and avoid any possibility of deadlock
- go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+ go core.backend.PerformLivenessCheck(ctx)
}
}
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4053b46..913f9e4 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -170,12 +170,12 @@
}
// Load the most recent state from the KVStore for the device.
-func (agent *DeviceAgent) reconcileWithKVStore() {
+func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
- device, err := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, "")
+ device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
if err != nil {
log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
return
@@ -238,7 +238,7 @@
cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
@@ -258,16 +258,16 @@
return nil
}
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
+ if err := agent.adapterProxy.UpdateFlowsBulk(ctx, device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceID, "error": err})
response.Error(err)
}
response.Done()
}
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
+ if err := agent.adapterProxy.UpdateFlowsIncremental(ctx, device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceID, "error": err})
response.Error(err)
}
@@ -333,7 +333,7 @@
return newGroups, groupsToDelete, updatedAllGroups
}
-func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
@@ -377,7 +377,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
+ go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
@@ -389,13 +389,13 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+ go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
}
// store the changed data
device.Flows = &voltha.Flows{Items: updatedAllFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
- if err := agent.updateDeviceWithoutLock(device); err != nil {
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
@@ -404,8 +404,8 @@
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.addFlowsAndGroupsToAdapter(newFlows, newGroups, flowMetadata)
+func (agent *DeviceAgent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
if err != nil {
return err
}
@@ -416,7 +416,7 @@
return nil
}
-func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -476,7 +476,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
return coreutils.DoneResponse(), nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
+ go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -487,13 +487,13 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+ go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
}
// store the changed data
device.Flows = &voltha.Flows{Items: flowsToKeep}
device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
- if err := agent.updateDeviceWithoutLock(device); err != nil {
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
@@ -503,8 +503,8 @@
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.deleteFlowsAndGroupsFromAdapter(flowsToDel, groupsToDel, flowMetadata)
+func (agent *DeviceAgent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
if err != nil {
return err
}
@@ -514,7 +514,7 @@
return nil
}
-func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -551,7 +551,7 @@
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
+ go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
} else {
var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
@@ -606,13 +606,13 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+ go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
}
// store the updated data
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- if err := agent.updateDeviceWithoutLock(device); err != nil {
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
@@ -621,8 +621,8 @@
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.updateFlowsAndGroupsToAdapter(updatedFlows, updatedGroups, flowMetadata)
+func (agent *DeviceAgent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
if err != nil {
return err
}
@@ -653,7 +653,7 @@
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
if err := agent.adapterProxy.DisableDevice(ctx, proto.Clone(cloned).(*voltha.Device)); err != nil {
@@ -663,7 +663,7 @@
return nil
}
-func (agent *DeviceAgent) updateAdminState(adminState voltha.AdminState_Types) error {
+func (agent *DeviceAgent) updateAdminState(ctx context.Context, adminState voltha.AdminState_Types) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("updateAdminState", log.Fields{"id": agent.deviceID})
@@ -676,7 +676,7 @@
}
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned.AdminState = adminState
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
return nil
@@ -721,13 +721,14 @@
// Set the state to deleted after we receive an Ack - this will trigger some background process to clean up
// the device as well as its association with the logical device
cloned.AdminState = voltha.AdminState_DELETED
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
// If this is a child device then remove the associated peer ports on the parent device
if !cloned.Root {
go func() {
- err := agent.deviceMgr.deletePeerPorts(cloned.ParentId, cloned.Id)
+ // since the caller does not wait for this for complete, use background context
+ err := agent.deviceMgr.deletePeerPorts(context.Background(), cloned.ParentId, cloned.Id)
if err != nil {
log.Errorw("unable-to-delete-peer-ports", log.Fields{"error": err})
}
@@ -736,7 +737,7 @@
return nil
}
-func (agent *DeviceAgent) setParentID(device *voltha.Device, parentID string) error {
+func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
@@ -744,7 +745,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.ParentId = parentID
// Store the device
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
return nil
@@ -758,7 +759,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
// Send the request to the adapter
@@ -769,7 +770,7 @@
return nil
}
-func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
@@ -777,7 +778,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
if err != nil {
return status.Errorf(codes.Internal, "%s", agent.deviceID)
@@ -828,7 +829,7 @@
cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
}
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return nil, err
}
// Send the request to the adapter
@@ -873,7 +874,7 @@
if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
// Set the device to Enabled
cloned.AdminState = voltha.AdminState_ENABLED
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return nil, err
}
// Send the request to the adapter
@@ -907,7 +908,7 @@
}
// Set the device to downloading_image
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return nil, err
}
@@ -942,7 +943,7 @@
}
}
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return nil, err
}
@@ -967,7 +968,7 @@
return resp, nil
}
-func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error {
+func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceID})
@@ -991,7 +992,7 @@
cloned.AdminState = voltha.AdminState_ENABLED
}
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
return nil
@@ -1023,7 +1024,7 @@
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
log.Debugw("getPorts", log.Fields{"id": agent.deviceID, "portType": portType})
ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.GetDevice(agent.deviceID); device != nil {
+ if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
for _, port := range device.Ports {
if port.Type == portType {
ports.Items = append(ports.Items, port)
@@ -1037,7 +1038,7 @@
// parent device
func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceID})
- device, err := agent.deviceMgr.GetDevice(agent.deviceID)
+ device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
if device == nil {
return nil, err
}
@@ -1053,7 +1054,7 @@
// device
func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceID})
- device, err := agent.deviceMgr.GetDevice(agent.deviceID)
+ device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
if device == nil {
return nil, err
}
@@ -1065,14 +1066,14 @@
return portCap, nil
}
-func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
+func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
// If deviceType=="" then we must have taken ownership of this device.
// Fixes VOL-2226 where a core would take ownership and have stale data
if agent.deviceType == "" {
- agent.reconcileWithKVStore()
+ agent.reconcileWithKVStore(ctx)
}
// Send packet to adapter
- if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceID, outPort, packet); err != nil {
+ if err := agent.adapterProxy.packetOut(ctx, agent.deviceType, agent.deviceID, outPort, packet); err != nil {
log.Debugw("packet-out-error", log.Fields{
"id": agent.deviceID,
"error": err,
@@ -1084,7 +1085,7 @@
}
// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
-func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
//// Run this callback in its own go routine
go func(args ...interface{}) interface{} {
var previous *voltha.Device
@@ -1103,8 +1104,8 @@
log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
return nil
}
- // Perform the state transition in it's own go routine
- if err := agent.deviceMgr.processTransition(previous, current); err != nil {
+ // Perform the state transition in it's own go routine (since the caller doesn't wait for this, use a background context)
+ if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
"previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
}
@@ -1127,7 +1128,7 @@
cloned.Reason = device.Reason
return cloned, nil
}
-func (agent *DeviceAgent) updateDeviceUsingAdapterData(device *voltha.Device) error {
+func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceId": device.Id})
@@ -1137,16 +1138,16 @@
return status.Errorf(codes.Internal, "%s", err.Error())
}
cloned := proto.Clone(updatedDevice).(*voltha.Device)
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
+func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1163,10 +1164,10 @@
}
log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) enablePorts() error {
+func (agent *DeviceAgent) enablePorts(ctx context.Context) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1177,10 +1178,10 @@
port.OperStatus = voltha.OperStatus_ACTIVE
}
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) disablePorts() error {
+func (agent *DeviceAgent) disablePorts(ctx context.Context) error {
log.Debugw("disablePorts", log.Fields{"deviceid": agent.deviceID})
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1190,10 +1191,10 @@
port.OperStatus = voltha.OperStatus_UNKNOWN
}
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
// Work only on latest data
@@ -1217,10 +1218,10 @@
}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) deleteAllPorts() error {
+func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1240,10 +1241,10 @@
cloned.Ports = []*voltha.Port{}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) addPort(port *voltha.Port) error {
+func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
@@ -1269,10 +1270,10 @@
}
cloned.Ports = append(cloned.Ports, cp)
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+func (agent *DeviceAgent) addPeerPort(ctx context.Context, port *voltha.Port_PeerPort) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("addPeerPort")
@@ -1289,10 +1290,10 @@
}
}
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) deletePeerPorts(deviceID string) error {
+func (agent *DeviceAgent) deletePeerPorts(ctx context.Context, deviceID string) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("deletePeerPorts")
@@ -1311,11 +1312,11 @@
}
// Store the device with updated peer ports
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
// TODO: A generic device update by attribute
-func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
if value == nil {
@@ -1345,7 +1346,7 @@
log.Debugw("update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
// Save the data
- if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
}
@@ -1367,8 +1368,8 @@
//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
// It is an internal helper function.
-func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
if err != nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
@@ -1383,7 +1384,7 @@
return nil
}
-func (agent *DeviceAgent) updateDeviceReason(reason string) error {
+func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1391,5 +1392,5 @@
cloned.Reason = reason
log.Debugw("updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
// Store the device
- return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index 0da55bd..be5fdb1 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -169,7 +169,7 @@
deviceToUpdate.MacAddress = macAddress
deviceToUpdate.Vlan = vlan
deviceToUpdate.Reason = reason
- err := da.updateDeviceUsingAdapterData(deviceToUpdate)
+ err := da.updateDeviceUsingAdapterData(context.Background(), deviceToUpdate)
assert.Nil(t, err)
localWG.Done()
}()
@@ -177,7 +177,7 @@
// Update the device status routine
localWG.Add(1)
go func() {
- err := da.updateDeviceStatus(voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+ err := da.updateDeviceStatus(context.Background(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
assert.Nil(t, err)
localWG.Done()
}()
@@ -185,7 +185,7 @@
// Add a port routine
localWG.Add(1)
go func() {
- err := da.addPort(portToAdd)
+ err := da.addPort(context.Background(), portToAdd)
assert.Nil(t, err)
localWG.Done()
}()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 4dff5a6..6d87143 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -44,7 +44,7 @@
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
clusterDataProxy *model.Proxy
coreInstanceID string
@@ -116,13 +116,13 @@
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
-func (dMgr *DeviceManager) getDeviceAgent(deviceID string) *DeviceAgent {
+func (dMgr *DeviceManager) getDeviceAgent(ctx context.Context, deviceID string) *DeviceAgent {
agent, ok := dMgr.deviceAgents.Load(deviceID)
if ok {
return agent.(*DeviceAgent)
}
// Try to load into memory - loading will also create the device agent and set the device ownership
- err := dMgr.load(deviceID)
+ err := dMgr.load(ctx, deviceID)
if err == nil {
agent, ok = dMgr.deviceAgents.Load(deviceID)
if !ok {
@@ -130,7 +130,7 @@
}
// Register this device for ownership tracking
go func() {
- _, err = dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: deviceID})
+ _, err = dMgr.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: deviceID})
if err != nil {
log.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
}
@@ -155,7 +155,7 @@
}
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
- deviceExist, err := dMgr.isParentDeviceExist(device)
+ deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
log.Errorf("Failed to fetch parent device info")
sendResponse(ctx, ch, err)
@@ -186,7 +186,7 @@
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("enableDevice", log.Fields{"deviceid": id})
var res interface{}
- if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.enableDevice(ctx)
log.Debugw("EnableDevice-result", log.Fields{"result": res})
} else {
@@ -199,7 +199,7 @@
func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("disableDevice", log.Fields{"deviceid": id})
var res interface{}
- if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.disableDevice(ctx)
log.Debugw("disableDevice-result", log.Fields{"result": res})
} else {
@@ -212,7 +212,7 @@
func (dMgr *DeviceManager) rebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("rebootDevice", log.Fields{"deviceid": id})
var res interface{}
- if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.rebootDevice(ctx)
log.Debugw("rebootDevice-result", log.Fields{"result": res})
} else {
@@ -224,7 +224,7 @@
func (dMgr *DeviceManager) deleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("deleteDevice", log.Fields{"deviceid": id})
var res interface{}
- if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.deleteDevice(ctx)
log.Debugw("deleteDevice-result", log.Fields{"result": res})
} else {
@@ -236,12 +236,12 @@
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
// This function is called only in the Core that does not own this device. In the Core that owns this device then a
// deletion deletion also includes removal of any reference of this device.
-func (dMgr *DeviceManager) stopManagingDevice(id string) {
+func (dMgr *DeviceManager) stopManagingDevice(ctx context.Context, id string) {
log.Infow("stopManagingDevice", log.Fields{"deviceId": id})
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if root, _ := dMgr.IsRootDevice(id); root {
// stop managing the logical device
- ldeviceID := dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(id)
+ ldeviceID := dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
if ldeviceID != "" { // Can happen if logical device agent was already stopped
err := dMgr.core.deviceOwnership.AbandonDevice(ldeviceID)
if err != nil {
@@ -250,8 +250,8 @@
}
// We do not need to stop the child devices as this is taken care by the state machine.
}
- if agent := dMgr.getDeviceAgent(id); agent != nil {
- agent.stop(context.TODO())
+ if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
+ agent.stop(ctx)
dMgr.deleteDeviceAgentFromMap(agent)
// Abandon the device ownership
err := dMgr.core.deviceOwnership.AbandonDevice(id)
@@ -263,29 +263,29 @@
}
// RunPostDeviceDelete removes any reference of this device
-func (dMgr *DeviceManager) RunPostDeviceDelete(cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
log.Infow("RunPostDeviceDelete", log.Fields{"deviceId": cDevice.Id})
- dMgr.stopManagingDevice(cDevice.Id)
+ dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
}
// GetDevice will returns a device, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
+func (dMgr *DeviceManager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
log.Debugw("GetDevice", log.Fields{"deviceid": id})
- if agent := dMgr.getDeviceAgent(id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDevice(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
// GetChildDevice will return a device, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetChildDevice(parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
+func (dMgr *DeviceManager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
log.Debugw("GetChildDevice", log.Fields{"parentDeviceid": parentDeviceID, "serialNumber": serialNumber,
"parentPortNo": parentPortNo, "onuId": onuID})
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -300,7 +300,7 @@
var foundChildDevice *voltha.Device
for _, childDeviceID := range childDeviceIds {
var found bool
- if searchDevice, err := dMgr.GetDevice(childDeviceID); err == nil {
+ if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
@@ -342,12 +342,12 @@
}
// GetChildDeviceWithProxyAddress will return a device based on proxy address
-func (dMgr *DeviceManager) GetChildDeviceWithProxyAddress(proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
+func (dMgr *DeviceManager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
log.Debugw("GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(proxyAddress.DeviceId); err != nil {
+ if parentDevice, err = dMgr.GetDevice(ctx, proxyAddress.DeviceId); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -361,7 +361,7 @@
var foundChildDevice *voltha.Device
for _, childDeviceID := range childDeviceIds {
- if searchDevice, err := dMgr.GetDevice(childDeviceID); err == nil {
+ if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
if searchDevice.ProxyAddress == proxyAddress {
foundChildDevice = searchDevice
break
@@ -395,10 +395,10 @@
}
// ListDevices retrieves the latest devices from the data model
-func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
+func (dMgr *DeviceManager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- devices, err := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, "")
+ devices, err := dMgr.clusterDataProxy.List(ctx, "/devices", 0, false, "")
if err != nil {
log.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
@@ -409,9 +409,9 @@
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if _, err := agent.start(context.TODO(), nil); err != nil {
+ if _, err := agent.start(ctx, nil); err != nil {
log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
- agent.stop(context.TODO())
+ agent.stop(ctx)
} else {
dMgr.addDeviceAgentToMap(agent)
}
@@ -424,9 +424,9 @@
}
//isParentDeviceExist checks whether device is already preprovisioned.
-func (dMgr *DeviceManager) isParentDeviceExist(newDevice *voltha.Device) (bool, error) {
+func (dMgr *DeviceManager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
- devices, err := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, "")
+ devices, err := dMgr.clusterDataProxy.List(ctx, "/devices", 0, false, "")
if err != nil {
log.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
return false, err
@@ -448,8 +448,8 @@
}
//getDeviceFromModelretrieves the device data from the model.
-func (dMgr *DeviceManager) getDeviceFromModel(deviceID string) (*voltha.Device, error) {
- device, err := dMgr.clusterDataProxy.Get(context.Background(), "/devices/"+deviceID, 0, false, "")
+func (dMgr *DeviceManager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
+ device, err := dMgr.clusterDataProxy.Get(ctx, "/devices/"+deviceID, 0, false, "")
if err != nil {
log.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
return nil, err
@@ -463,7 +463,7 @@
}
// loadDevice loads the deviceID in memory, if not present
-func (dMgr *DeviceManager) loadDevice(deviceID string) (*DeviceAgent, error) {
+func (dMgr *DeviceManager) loadDevice(ctx context.Context, deviceID string) (*DeviceAgent, error) {
if deviceID == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
@@ -475,12 +475,12 @@
dMgr.deviceLoadingInProgress[deviceID] = []chan int{make(chan int, 1)}
dMgr.devicesLoadingLock.Unlock()
// Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err = dMgr.getDeviceFromModel(deviceID); err == nil {
+ if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
log.Debugw("loading-device", log.Fields{"deviceId": deviceID})
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if _, err = agent.start(context.TODO(), nil); err != nil {
+ if _, err = agent.start(ctx, nil); err != nil {
log.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
- agent.stop(context.TODO())
+ agent.stop(ctx)
} else {
dMgr.addDeviceAgentToMap(agent)
}
@@ -513,13 +513,13 @@
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
-func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device) error {
log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
if device.Root {
// Scenario A
if device.ParentId != "" {
// Load logical device if needed.
- if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+ if err := dMgr.logicalDeviceMgr.load(ctx, device.ParentId); err != nil {
log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
}
} else {
@@ -528,7 +528,7 @@
// Load all child devices, if needed
if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
for _, childDeviceID := range childDeviceIds {
- if _, err := dMgr.loadDevice(childDeviceID); err != nil {
+ if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceID, "error": err})
return err
}
@@ -545,12 +545,12 @@
// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
// and the proceed with the request.
-func (dMgr *DeviceManager) load(deviceID string) error {
+func (dMgr *DeviceManager) load(ctx context.Context, deviceID string) error {
log.Debug("load...")
// First load the device - this may fail in case the device was deleted intentionally by the other core
var dAgent *DeviceAgent
var err error
- if dAgent, err = dMgr.loadDevice(deviceID); err != nil {
+ if dAgent, err = dMgr.loadDevice(ctx, deviceID); err != nil {
return err
}
// Get the loaded device details
@@ -564,7 +564,7 @@
// Now we face two scenarios
if device.Root {
// Load all children as well as the parent of this device (logical_device)
- if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+ if err := dMgr.loadRootDeviceParentAndChildren(ctx, device); err != nil {
log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceID})
return err
}
@@ -572,7 +572,7 @@
} else {
// Scenario B - use the parentId of that device (root device) to trigger the loading
if device.ParentId != "" {
- return dMgr.load(device.ParentId)
+ return dMgr.load(ctx, device.ParentId)
}
}
return nil
@@ -595,7 +595,7 @@
reconciled := 0
var err error
for _, id := range ids.Items {
- if err = dMgr.load(id.Id); err != nil {
+ if err = dMgr.load(ctx, id.Id); err != nil {
log.Warnw("failure-reconciling-device", log.Fields{"deviceId": id.Id, "error": err})
} else {
reconciled++
@@ -619,7 +619,7 @@
}
// adapterRestarted is invoked whenever an adapter is restarted
-func (dMgr *DeviceManager) adapterRestarted(adapter *voltha.Adapter) error {
+func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
log.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
// Let's reconcile the device managed by this Core only
@@ -631,11 +631,11 @@
responses := make([]utils.Response, 0)
for _, rootDeviceID := range rootDeviceIds {
- if rootDevice, _ := dMgr.getDeviceFromModel(rootDeviceID); rootDevice != nil {
+ if rootDevice, _ := dMgr.getDeviceFromModel(ctx, rootDeviceID); rootDevice != nil {
if rootDevice.Adapter == adapter.Id {
if isOkToReconcile(rootDevice) {
log.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
- responses = append(responses, dMgr.sendReconcileDeviceRequest(rootDevice))
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
} else {
log.Debugw("not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
}
@@ -643,11 +643,11 @@
childManagedByAdapter:
for _, port := range rootDevice.Ports {
for _, peer := range port.Peers {
- if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
+ if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
if childDevice.Adapter == adapter.Id {
if isOkToReconcile(childDevice) {
log.Debugw("reconciling-child-device", log.Fields{"childId": childDevice.Id})
- responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
} else {
log.Debugw("not-reconciling-child-device", log.Fields{"childId": childDevice.Id, "state": childDevice.AdminState})
}
@@ -673,14 +673,14 @@
return nil
}
-func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device) utils.Response {
+func (dMgr *DeviceManager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
// point of creating a device agent (if the device is not being managed by this Core) before sending the request
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
// the adapter_proxy.
response := utils.NewResponse()
go func(device *voltha.Device) {
- if err := dMgr.adapterProxy.ReconcileDevice(context.Background(), device); err != nil {
+ if err := dMgr.adapterProxy.ReconcileDevice(ctx, device); err != nil {
log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
}
@@ -690,13 +690,13 @@
return response
}
-func (dMgr *DeviceManager) reconcileChildDevices(parentDeviceID string) error {
- if parentDevice, _ := dMgr.getDeviceFromModel(parentDeviceID); parentDevice != nil {
+func (dMgr *DeviceManager) reconcileChildDevices(ctx context.Context, parentDeviceID string) error {
+ if parentDevice, _ := dMgr.getDeviceFromModel(ctx, parentDeviceID); parentDevice != nil {
responses := make([]utils.Response, 0)
for _, port := range parentDevice.Ports {
for _, peer := range port.Peers {
- if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
- responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
+ if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
}
}
}
@@ -708,25 +708,25 @@
return nil
}
-func (dMgr *DeviceManager) updateDeviceUsingAdapterData(device *voltha.Device) error {
+func (dMgr *DeviceManager) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceid": device.Id, "device": device})
- if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
- return agent.updateDeviceUsingAdapterData(device)
+ if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
+ return agent.updateDeviceUsingAdapterData(ctx, device)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
-func (dMgr *DeviceManager) addPort(deviceID string, port *voltha.Port) error {
- agent := dMgr.getDeviceAgent(deviceID)
+func (dMgr *DeviceManager) addPort(ctx context.Context, deviceID string, port *voltha.Port) error {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent != nil {
- if err := agent.addPort(port); err != nil {
+ if err := agent.addPort(ctx, port); err != nil {
return err
}
// Setup peer ports
meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
for _, peerPort := range port.Peers {
- if agent := dMgr.getDeviceAgent(peerPort.DeviceId); agent != nil {
- if err := agent.addPeerPort(meAsPeer); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
+ if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
log.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
return err
}
@@ -735,9 +735,9 @@
// Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
// then a logical port will be added to the logical device and the device graph generated. If the port is a
// PON port then only the device graph will be generated.
- if device, err := dMgr.GetDevice(deviceID); err == nil {
+ if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
go func() {
- err = dMgr.logicalDeviceMgr.updateLogicalPort(device, port)
+ err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port)
if err != nil {
log.Errorw("unable-to-update-logical-port", log.Fields{"error": err})
}
@@ -751,34 +751,34 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deletePeerPorts(fromDeviceID string, deviceID string) error {
+func (dMgr *DeviceManager) deletePeerPorts(ctx context.Context, fromDeviceID string, deviceID string) error {
log.Debugw("deletePeerPorts", log.Fields{"fromDeviceId": fromDeviceID, "deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(fromDeviceID); agent != nil {
- return agent.deletePeerPorts(deviceID)
+ if agent := dMgr.getDeviceAgent(ctx, fromDeviceID); agent != nil {
+ return agent.deletePeerPorts(ctx, deviceID)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) addFlowsAndGroups(deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *DeviceManager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.addFlowsAndGroups(flows, groups, flowMetadata)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *DeviceManager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.deleteFlowsAndGroups(flows, groups, flowMetadata)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateFlowsAndGroups(deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *DeviceManager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.updateFlowsAndGroups(flows, groups, flowMetadata)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
@@ -789,7 +789,7 @@
var res interface{}
if pmConfigs.Id == "" {
res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
- } else if agent := dMgr.getDeviceAgent(pmConfigs.Id); agent != nil {
+ } else if agent := dMgr.getDeviceAgent(ctx, pmConfigs.Id); agent != nil {
res = agent.updatePmConfigs(ctx, pmConfigs)
} else {
res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
@@ -798,18 +798,18 @@
}
// initPmConfigs initialize the pm configs as defined by the adapter.
-func (dMgr *DeviceManager) initPmConfigs(deviceID string, pmConfigs *voltha.PmConfigs) error {
+func (dMgr *DeviceManager) initPmConfigs(ctx context.Context, deviceID string, pmConfigs *voltha.PmConfigs) error {
if pmConfigs.Id == "" {
return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
}
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.initPmConfigs(pmConfigs)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.initPmConfigs(ctx, pmConfigs)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.listPmConfigs(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -817,7 +817,7 @@
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getSwitchCapability(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -825,7 +825,7 @@
func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
log.Debugw("getPorts", log.Fields{"deviceid": deviceID, "portType": portType})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -833,25 +833,25 @@
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceID string, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getPortCapability(ctx, portNo)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateDeviceStatus(deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+func (dMgr *DeviceManager) updateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
log.Debugw("updateDeviceStatus", log.Fields{"deviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.updateDeviceStatus(operStatus, connStatus)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.updateDeviceStatus(ctx, operStatus, connStatus)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateChildrenStatus(deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+func (dMgr *DeviceManager) updateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
log.Debugw("updateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(deviceID); err != nil {
+ if parentDevice, err = dMgr.GetDevice(ctx, deviceID); err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -862,8 +862,8 @@
log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
}
for _, childDeviceID := range childDeviceIds {
- if agent := dMgr.getDeviceAgent(childDeviceID); agent != nil {
- if err = agent.updateDeviceStatus(operStatus, connStatus); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+ if err = agent.updateDeviceStatus(ctx, operStatus, connStatus); err != nil {
return status.Errorf(codes.Aborted, "childDevice:%s, error:%s", childDeviceID, err.Error())
}
}
@@ -871,16 +871,16 @@
return nil
}
-func (dMgr *DeviceManager) updatePortState(deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (dMgr *DeviceManager) updatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
log.Debugw("updatePortState", log.Fields{"deviceid": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- if err := agent.updatePortState(portType, portNo, operStatus); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
log.Errorw("updating-port-state-failed", log.Fields{"deviceid": deviceID, "portNo": portNo, "error": err})
return err
}
// Notify the logical device manager to change the port state
go func() {
- err := dMgr.logicalDeviceMgr.updatePortState(deviceID, portNo, operStatus)
+ err := dMgr.logicalDeviceMgr.updatePortState(context.Background(), deviceID, portNo, operStatus)
if err != nil {
log.Errorw("unable-to-update-port-state", log.Fields{"error": err})
}
@@ -889,18 +889,18 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deleteAllPorts(deviceID string) error {
+func (dMgr *DeviceManager) deleteAllPorts(ctx context.Context, deviceID string) error {
log.Debugw("DeleteAllPorts", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- if err := agent.deleteAllPorts(); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ if err := agent.deleteAllPorts(ctx); err != nil {
return err
}
// Notify the logical device manager to remove all logical ports, if needed.
// At this stage the device itself may gave been deleted already at a deleteAllPorts
// typically is part of a device deletion phase.
- if device, err := dMgr.GetDevice(deviceID); err == nil {
+ if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
go func() {
- err = dMgr.logicalDeviceMgr.deleteAllLogicalPorts(device)
+ err = dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, device)
if err != nil {
log.Errorw("unable-to-delete-logical-ports", log.Fields{"error": err})
}
@@ -915,21 +915,21 @@
}
//updatePortsState updates all ports on the device
-func (dMgr *DeviceManager) updatePortsState(deviceID string, state voltha.OperStatus_Types) error {
+func (dMgr *DeviceManager) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
log.Debugw("updatePortsState", log.Fields{"deviceid": deviceID})
var adminState voltha.AdminState_Types
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
switch state {
case voltha.OperStatus_ACTIVE:
adminState = voltha.AdminState_ENABLED
- if err := agent.enablePorts(); err != nil {
+ if err := agent.enablePorts(ctx); err != nil {
log.Warnw("enable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
return err
}
case voltha.OperStatus_UNKNOWN:
adminState = voltha.AdminState_DISABLED
- if err := agent.disablePorts(); err != nil {
+ if err := agent.disablePorts(ctx); err != nil {
log.Warnw("disable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
return err
}
@@ -937,12 +937,12 @@
return status.Error(codes.Unimplemented, "state-change-not-implemented")
}
// Notify the logical device about the state change
- device, err := dMgr.GetDevice(deviceID)
+ device, err := dMgr.GetDevice(ctx, deviceID)
if err != nil {
log.Warnw("non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
return err
}
- if err := dMgr.logicalDeviceMgr.updatePortsState(device, adminState); err != nil {
+ if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, adminState); err != nil {
log.Warnw("failed-updating-ports-state", log.Fields{"deviceId": deviceID, "error": err})
return err
}
@@ -951,13 +951,13 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) childDeviceDetected(parentDeviceID string, parentPortNo int64, deviceType string,
+func (dMgr *DeviceManager) childDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
log.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
if deviceType == "" && vendorID != "" {
log.Debug("device-type-is-nil-fetching-device-type")
- deviceTypesIf, err := dMgr.adapterMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
+ deviceTypesIf, err := dMgr.adapterMgr.clusterDataProxy.List(ctx, "/device_types", 0, false, "")
if err != nil {
log.Errorw("failed-to-get-device-type-info", log.Fields{"error": err})
return nil, err
@@ -992,13 +992,13 @@
childDevice.Root = false
//Get parent device type
- parent, err := dMgr.GetDevice(parentDeviceID)
+ parent, err := dMgr.GetDevice(ctx, parentDeviceID)
if err != nil {
log.Error("no-parent-found", log.Fields{"parentId": parentDeviceID})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
- if device, err := dMgr.GetChildDevice(parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
+ if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
log.Warnw("child-device-exists", log.Fields{"parentId": parentDeviceID, "serialNumber": serialNumber})
return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
}
@@ -1008,7 +1008,7 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
- childDevice, err = agent.start(context.TODO(), childDevice)
+ childDevice, err = agent.start(ctx, childDevice)
if err != nil {
log.Error("error-starting-child")
return nil, err
@@ -1016,15 +1016,15 @@
// Since this Core has handled this request then it therefore owns this child device. Set the
// ownership of this device to this Core
- _, err = dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: agent.deviceID})
+ _, err = dMgr.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: agent.deviceID})
if err != nil {
log.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
}
// Activate the child device
- if agent = dMgr.getDeviceAgent(agent.deviceID); agent != nil {
+ if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
- err := agent.enableDevice(context.TODO())
+ err := agent.enableDevice(context.Background())
if err != nil {
log.Errorw("unable-to-enable-device", log.Fields{"error": err})
}
@@ -1042,7 +1042,7 @@
return childDevice, nil
}
-func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {
+func (dMgr *DeviceManager) processTransition(ctx context.Context, previous *voltha.Device, current *voltha.Device) error {
// This will be triggered on every update to the device.
handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
if handlers == nil {
@@ -1052,7 +1052,7 @@
log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
for _, handler := range handlers {
log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
- if err := handler(current); err != nil {
+ if err := handler(ctx, current); err != nil {
log.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
return err
}
@@ -1060,21 +1060,21 @@
return nil
}
-func (dMgr *DeviceManager) packetOut(deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
+func (dMgr *DeviceManager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
log.Debugw("packetOut", log.Fields{"deviceId": deviceID, "outPort": outPort})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.packetOut(outPort, packet)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.packetOut(ctx, outPort, packet)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
// PacketIn receives packet from adapter
-func (dMgr *DeviceManager) PacketIn(deviceID string, port uint32, transactionID string, packet []byte) error {
+func (dMgr *DeviceManager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
log.Debugw("PacketIn", log.Fields{"deviceId": deviceID, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
var err error
- if device, err = dMgr.GetDevice(deviceID); err != nil {
+ if device, err = dMgr.GetDevice(ctx, deviceID); err != nil {
log.Errorw("device-not-found", log.Fields{"deviceId": deviceID})
return err
}
@@ -1083,22 +1083,22 @@
return status.Errorf(codes.FailedPrecondition, "%s", deviceID)
}
- if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, transactionID, packet); err != nil {
+ if err := dMgr.logicalDeviceMgr.packetIn(ctx, device.ParentId, port, transactionID, packet); err != nil {
return err
}
return nil
}
-func (dMgr *DeviceManager) setParentID(device *voltha.Device, parentID string) error {
+func (dMgr *DeviceManager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
- if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
- return agent.setParentID(device, parentID)
+ if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
+ return agent.setParentID(ctx, device, parentID)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
// CreateLogicalDevice creates logical device in core
-func (dMgr *DeviceManager) CreateLogicalDevice(cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
log.Info("CreateLogicalDevice")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
@@ -1106,7 +1106,7 @@
return nil
}
var err error
- if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(context.TODO(), cDevice); err != nil {
+ if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(ctx, cDevice); err != nil {
log.Warnw("createlogical-device-error", log.Fields{"device": cDevice})
return err
}
@@ -1114,30 +1114,30 @@
}
// DeleteLogicalDevice deletes logical device from core
-func (dMgr *DeviceManager) DeleteLogicalDevice(cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
log.Info("DeleteLogicalDevice")
var err error
- if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(context.TODO(), cDevice); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
return err
}
// Remove the logical device Id from the parent device
logicalID := ""
- dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalID)
+ dMgr.UpdateDeviceAttribute(ctx, cDevice.Id, "ParentId", logicalID)
return nil
}
// DeleteLogicalPort removes the logical port associated with a device
-func (dMgr *DeviceManager) DeleteLogicalPort(device *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
log.Info("deleteLogicalPort")
var err error
// Get the logical port associated with this device
var lPortID *voltha.LogicalPortId
- if lPortID, err = dMgr.logicalDeviceMgr.getLogicalPortID(device); err != nil {
+ if lPortID, err = dMgr.logicalDeviceMgr.getLogicalPortID(ctx, device); err != nil {
log.Warnw("getLogical-port-error", log.Fields{"deviceId": device.Id, "error": err})
return err
}
- if err = dMgr.logicalDeviceMgr.deleteLogicalPort(context.TODO(), lPortID); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalPort(ctx, lPortID); err != nil {
log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": device.Id})
return err
}
@@ -1145,47 +1145,47 @@
}
// DeleteLogicalPorts removes the logical ports associated with that deviceId
-func (dMgr *DeviceManager) DeleteLogicalPorts(device *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, device *voltha.Device) error {
log.Info("deleteLogicalPorts")
- if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(device.Id); err != nil {
+ if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, device.Id); err != nil {
log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": device.Id})
return err
}
return nil
}
-func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+func (dMgr *DeviceManager) getParentDevice(ctx context.Context, childDevice *voltha.Device) *voltha.Device {
// Sanity check
if childDevice.Root {
// childDevice is the parent device
return childDevice
}
- parentDevice, _ := dMgr.GetDevice(childDevice.ParentId)
+ parentDevice, _ := dMgr.GetDevice(ctx, childDevice.ParentId)
return parentDevice
}
//childDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
-func (dMgr *DeviceManager) childDevicesLost(parentDeviceID string) error {
+func (dMgr *DeviceManager) childDevicesLost(ctx context.Context, parentDeviceID string) error {
log.Debug("childDevicesLost")
var err error
var parentDevice *voltha.Device
- if parentDevice, err = dMgr.GetDevice(parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
log.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
- return dMgr.DisableAllChildDevices(parentDevice)
+ return dMgr.DisableAllChildDevices(ctx, parentDevice)
}
//childDevicesDetected is invoked by an adapter when child devices are found, typically after after a
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
-func (dMgr *DeviceManager) childDevicesDetected(parentDeviceID string) error {
+func (dMgr *DeviceManager) childDevicesDetected(ctx context.Context, parentDeviceID string) error {
log.Debug("childDevicesDetected")
var err error
var parentDevice *voltha.Device
var childDeviceIds []string
- if parentDevice, err = dMgr.GetDevice(parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
log.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
@@ -1198,10 +1198,10 @@
}
allChildEnableRequestSent := true
for _, childDeviceID := range childDeviceIds {
- if agent := dMgr.getDeviceAgent(childDeviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
// Run the children re-registration in its own routine
go func() {
- err = agent.enableDevice(context.TODO())
+ err = agent.enableDevice(ctx)
if err != nil {
log.Errorw("unable-to-enable-device", log.Fields{"error": err})
}
@@ -1224,7 +1224,7 @@
*/
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
-func (dMgr *DeviceManager) DisableAllChildDevices(parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
log.Debug("DisableAllChildDevices")
var childDeviceIds []string
var err error
@@ -1236,8 +1236,8 @@
}
allChildDisable := true
for _, childDeviceID := range childDeviceIds {
- if agent := dMgr.getDeviceAgent(childDeviceID); agent != nil {
- if err = agent.disableDevice(context.TODO()); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+ if err = agent.disableDevice(ctx); err != nil {
log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceID, "error": err.Error()})
allChildDisable = false
}
@@ -1250,7 +1250,7 @@
}
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllChildDevices(parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
log.Debug("DeleteAllChildDevices")
var childDeviceIds []string
var err error
@@ -1262,8 +1262,8 @@
}
allChildDeleted := true
for _, childDeviceID := range childDeviceIds {
- if agent := dMgr.getDeviceAgent(childDeviceID); agent != nil {
- if err = agent.deleteDevice(context.TODO()); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+ if err = agent.deleteDevice(ctx); err != nil {
log.Errorw("failure-delete-device", log.Fields{"deviceId": childDeviceID, "error": err.Error()})
allChildDeleted = false
}
@@ -1293,13 +1293,13 @@
}
//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *DeviceManager) getAllChildDevices(parentDeviceID string) (*voltha.Devices, error) {
+func (dMgr *DeviceManager) getAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
log.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
- if parentDevice, err := dMgr.GetDevice(parentDeviceID); err == nil {
+ if parentDevice, err := dMgr.GetDevice(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
for _, deviceID := range childDeviceIds {
- if d, e := dMgr.GetDevice(deviceID); e == nil && d != nil {
+ if d, e := dMgr.GetDevice(ctx, deviceID); e == nil && d != nil {
childDevices = append(childDevices, d)
}
}
@@ -1310,9 +1310,9 @@
}
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
-func (dMgr *DeviceManager) SetupUNILogicalPorts(cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
- if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(context.TODO(), cDevice); err != nil {
+ if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
return err
}
@@ -1323,7 +1323,7 @@
log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.downloadImage(ctx, img); err != nil {
log.Debugw("downloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
@@ -1338,7 +1338,7 @@
log.Debugw("cancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.cancelImageDownload(ctx, img); err != nil {
log.Debugw("cancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
@@ -1353,7 +1353,7 @@
log.Debugw("activateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.activateImage(ctx, img); err != nil {
log.Debugw("activateImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
@@ -1368,7 +1368,7 @@
log.Debugw("revertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.revertImage(ctx, img); err != nil {
log.Debugw("revertImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
@@ -1383,7 +1383,7 @@
log.Debugw("getImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.getImageDownloadStatus(ctx, img); err != nil {
log.Debugw("getImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
@@ -1394,10 +1394,10 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) updateImageDownload(deviceID string, img *voltha.ImageDownload) error {
+func (dMgr *DeviceManager) updateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- if err := agent.updateImageDownload(img); err != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ if err := agent.updateImageDownload(ctx, img); err != nil {
log.Debugw("updateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
return err
}
@@ -1409,7 +1409,7 @@
func (dMgr *DeviceManager) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
log.Debugw("getImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
return agent.getImageDownload(ctx, img)
}
return nil, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1417,23 +1417,23 @@
func (dMgr *DeviceManager) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
log.Debugw("listImageDownloads", log.Fields{"deviceID": deviceID})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.listImageDownloads(ctx, deviceID)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
// SetAdminStateToEnable sets admin state of device to enabled
-func (dMgr *DeviceManager) SetAdminStateToEnable(cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) SetAdminStateToEnable(ctx context.Context, cDevice *voltha.Device) error {
log.Info("SetAdminStateToEnable")
- if agent := dMgr.getDeviceAgent(cDevice.Id); agent != nil {
- return agent.updateAdminState(voltha.AdminState_ENABLED)
+ if agent := dMgr.getDeviceAgent(ctx, cDevice.Id); agent != nil {
+ return agent.updateAdminState(ctx, voltha.AdminState_ENABLED)
}
return status.Errorf(codes.NotFound, "%s", cDevice.Id)
}
// NotifyInvalidTransition notifies about invalid transition
-func (dMgr *DeviceManager) NotifyInvalidTransition(pcDevice *voltha.Device) error {
+func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, pcDevice *voltha.Device) error {
log.Errorw("NotifyInvalidTransition", log.Fields{
"device": pcDevice.Id,
"adminState": pcDevice.AdminState,
@@ -1451,15 +1451,15 @@
}
// UpdateDeviceAttribute updates value of particular device attribute
-func (dMgr *DeviceManager) UpdateDeviceAttribute(deviceID string, attribute string, value interface{}) {
+func (dMgr *DeviceManager) UpdateDeviceAttribute(ctx context.Context, deviceID string, attribute string, value interface{}) {
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
- agent.(*DeviceAgent).updateDeviceAttribute(attribute, value)
+ agent.(*DeviceAgent).updateDeviceAttribute(ctx, attribute, value)
}
}
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetParentDeviceID(deviceID string) string {
- if device, _ := dMgr.GetDevice(deviceID); device != nil {
+func (dMgr *DeviceManager) GetParentDeviceID(ctx context.Context, deviceID string) string {
+ if device, _ := dMgr.GetDevice(ctx, deviceID); device != nil {
log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
return device.ParentId
}
@@ -1471,7 +1471,7 @@
"PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
"Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
var res interface{}
- if agent := dMgr.getDeviceAgent(simulatereq.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
res = agent.simulateAlarm(ctx, simulatereq)
log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
}
@@ -1479,10 +1479,10 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) updateDeviceReason(deviceID string, reason string) error {
+func (dMgr *DeviceManager) updateDeviceReason(ctx context.Context, deviceID string, reason string) error {
log.Debugw("updateDeviceReason", log.Fields{"deviceid": deviceID, "reason": reason})
- if agent := dMgr.getDeviceAgent(deviceID); agent != nil {
- return agent.updateDeviceReason(reason)
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ return agent.updateDeviceReason(ctx, reason)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 687efcb..b616a90 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -92,11 +92,11 @@
log.Info("deviceOwnership-stopped")
}
-func (da *DeviceOwnership) tryToReserveKey(id string) bool {
+func (da *DeviceOwnership) tryToReserveKey(ctx context.Context, id string) bool {
var currOwner string
//Try to reserve the key
kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
- value, err := da.kvClient.Reserve(kvKey, da.instanceID, da.reservationTimeout)
+ value, err := da.kvClient.Reserve(ctx, kvKey, da.instanceID, da.reservationTimeout)
if err != nil {
log.Errorw("error", log.Fields{"error": err, "id": id, "instanceId": da.instanceID})
}
@@ -109,17 +109,17 @@
return false
}
-func (da *DeviceOwnership) renewReservation(id string) bool {
+func (da *DeviceOwnership) renewReservation(ctx context.Context, id string) bool {
// Try to reserve the key
kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
- if err := da.kvClient.RenewReservation(kvKey); err != nil {
+ if err := da.kvClient.RenewReservation(ctx, kvKey); err != nil {
log.Errorw("reservation-renewal-error", log.Fields{"error": err, "instance": da.instanceID})
return false
}
return true
}
-func (da *DeviceOwnership) monitorOwnership(id string, chnl chan int) {
+func (da *DeviceOwnership) monitorOwnership(ctx context.Context, id string, chnl chan int) {
log.Debugw("start-device-monitoring", log.Fields{"id": id})
op := "starting"
exit := false
@@ -144,7 +144,7 @@
if deviceOwned && ownedByMe {
// Device owned; renew reservation
op = "renew"
- if da.renewReservation(id) {
+ if da.renewReservation(ctx, id) {
log.Debugw("reservation-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
} else {
log.Debugw("reservation-not-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
@@ -152,7 +152,7 @@
} else {
// Device not owned or not owned by me; try to seize ownership
op = "retry"
- if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
+ if err := da.setOwnership(id, da.tryToReserveKey(ctx, id)); err != nil {
log.Errorw("unexpected-error", log.Fields{"error": err})
}
}
@@ -197,13 +197,13 @@
// OwnedByMe returns whether this Core instance active owns this device. This function will automatically
// trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(id interface{}) (bool, error) {
+func (da *DeviceOwnership) OwnedByMe(ctx context.Context, id interface{}) (bool, error) {
// Retrieve the ownership key based on the id
var ownershipKey string
var err error
var idStr string
var cache bool
- if ownershipKey, idStr, cache, err = da.getOwnershipKey(id); err != nil {
+ if ownershipKey, idStr, cache, err = da.getOwnershipKey(ctx, id); err != nil {
log.Warnw("no-ownershipkey", log.Fields{"error": err})
return false, err
}
@@ -227,7 +227,7 @@
return ownedByMe, nil
}
// Not owned by me or maybe nobody else. Try to reserve it
- reservedByMe := da.tryToReserveKey(ownershipKey)
+ reservedByMe := da.tryToReserveKey(ctx, ownershipKey)
myChnl := make(chan int)
da.deviceMapLock.Lock()
@@ -238,7 +238,7 @@
da.deviceMapLock.Unlock()
log.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
- go da.monitorOwnership(ownershipKey, myChnl)
+ go da.monitorOwnership(context.Background(), ownershipKey, myChnl)
return reservedByMe, nil
}
@@ -298,7 +298,7 @@
// getOwnershipKey returns the ownership key that the id param uses. Ownership key is the parent
// device Id of a child device or the rootdevice of a logical device. This function also returns the
// id in string format of the id param via the ref output as well as if the data was retrieved from cache
-func (da *DeviceOwnership) getOwnershipKey(id interface{}) (ownershipKey string, ref string, cached bool, err error) {
+func (da *DeviceOwnership) getOwnershipKey(ctx context.Context, id interface{}) (ownershipKey string, ref string, cached bool, err error) {
if id == nil {
return "", "", false, status.Error(codes.InvalidArgument, "nil-id")
@@ -313,7 +313,7 @@
if val, exist := da.deviceToKeyMap[dID.ID]; exist {
return val, dID.ID, true, nil
}
- if device, _ = da.deviceMgr.GetDevice(dID.ID); device == nil {
+ if device, _ = da.deviceMgr.GetDevice(ctx, dID.ID); device == nil {
return "", dID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
}
if device.Root {
@@ -325,7 +325,7 @@
if val, exist := da.deviceToKeyMap[ldID.ID]; exist {
return val, ldID.ID, true, nil
}
- if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldID.ID); lDevice == nil {
+ if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ctx, ldID.ID); lDevice == nil {
return "", ldID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
}
return lDevice.RootDeviceId, ldID.ID, false, nil
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index d280270..7756bf4 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -17,6 +17,7 @@
package core
import (
+ "context"
"github.com/opencord/voltha-go/rw_core/coreif"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -39,7 +40,7 @@
}
// TransitionHandler function type which takes device as input parameter
-type TransitionHandler func(*voltha.Device) error
+type TransitionHandler func(context.Context, *voltha.Device) error
// Transition represent transition related attributes
type Transition struct {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 0378260..fd54a50 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -150,13 +150,13 @@
var acquired bool
if id != nil {
var ownedByMe bool
- if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+ if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(ctx, id); err != nil {
log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
return nil, errorIDNotFound
}
- acquired, err = txn.Acquired(timeout, ownedByMe)
+ acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
} else {
- acquired, err = txn.Acquired(timeout)
+ acquired, err = txn.Acquired(ctx, timeout)
}
if err == nil && acquired {
log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
@@ -252,9 +252,9 @@
if err != nil {
return &voltha.LogicalPort{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
- return handler.logicalDeviceMgr.getLogicalPort(id)
+ return handler.logicalDeviceMgr.getLogicalPort(ctx, id)
}
// EnableLogicalDevicePort enables logical device port
@@ -269,7 +269,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -290,7 +290,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -311,7 +311,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -332,7 +332,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -344,7 +344,7 @@
// GetDevice must be implemented in the read-only containers - should it also be implemented here?
func (handler *APIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
log.Debugw("GetDevice-request", log.Fields{"id": id})
- return handler.deviceMgr.GetDevice(id.Id)
+ return handler.deviceMgr.GetDevice(ctx, id.Id)
}
// GetDevice must be implemented in the read-only containers - should it also be implemented here?
@@ -352,7 +352,7 @@
// ListDevices retrieves the latest devices from the data model
func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
log.Debug("ListDevices")
- devices, err := handler.deviceMgr.ListDevices()
+ devices, err := handler.deviceMgr.ListDevices(ctx)
if err != nil {
log.Errorw("Failed to list devices", log.Fields{"error": err})
return nil, err
@@ -392,9 +392,9 @@
if err != nil {
return &voltha.LogicalDevice{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
- return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
+ return handler.logicalDeviceMgr.getLogicalDevice(ctx, id.Id)
}
// ListLogicalDevices returns the list of all logical devices
@@ -405,14 +405,14 @@
if err != nil {
return &voltha.LogicalDevices{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
if handler.isOFControllerRequest(ctx) {
// Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
// only logical devices managed/monitored by this Core.
return handler.logicalDeviceMgr.listManagedLogicalDevices()
}
}
- return handler.logicalDeviceMgr.listLogicalDevices()
+ return handler.logicalDeviceMgr.listLogicalDevices(ctx)
}
// ListAdapters returns the contents of all adapters known to the system
@@ -429,7 +429,7 @@
if err != nil {
return &openflow_13.Flows{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
}
@@ -442,7 +442,7 @@
if err != nil {
return &openflow_13.FlowGroups{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
}
@@ -455,7 +455,7 @@
if err != nil {
return &voltha.LogicalPorts{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
}
@@ -477,7 +477,7 @@
if err != nil {
return &voltha.Device{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -491,7 +491,7 @@
return nil, err
}
if d, ok := res.(*voltha.Device); ok {
- _, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: d.Id})
+ _, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: d.Id})
if err != nil {
log.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
}
@@ -519,7 +519,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -540,7 +540,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -561,7 +561,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -581,14 +581,14 @@
txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
if err != nil {
if err == errorTransactionNotAcquired {
- if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: id.Id}); !ownedByMe && err == nil {
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: id.Id}); !ownedByMe && err == nil {
// Remove the device in memory
- handler.deviceMgr.stopManagingDevice(id.Id)
+ handler.deviceMgr.stopManagingDevice(ctx, id.Id)
}
}
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -605,10 +605,10 @@
if err != nil {
return &voltha.Ports{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
- device, err := handler.deviceMgr.GetDevice(id.Id)
+ device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
return &voltha.Ports{}, err
}
@@ -625,10 +625,10 @@
if err != nil {
return &openflow_13.Flows{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
- device, err := handler.deviceMgr.GetDevice(id.Id)
+ device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
return &openflow_13.Flows{}, err
}
@@ -641,7 +641,7 @@
func (handler *APIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
- if device, _ := handler.deviceMgr.GetDevice(id.Id); device != nil {
+ if device, _ := handler.deviceMgr.GetDevice(ctx, id.Id); device != nil {
return device.GetFlowGroups(), nil
}
return &voltha.FlowGroups{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -712,7 +712,7 @@
if err != nil {
return &common.OperationResp{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
@@ -808,7 +808,7 @@
if err != nil {
return failedresponse, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -871,7 +871,7 @@
// GetImages returns all images for a specific device entry
func (handler *APIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
log.Debugw("GetImages", log.Fields{"deviceid": id.Id})
- device, err := handler.deviceMgr.GetDevice(id.Id)
+ device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
return &voltha.Images{}, err
}
@@ -889,7 +889,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -906,7 +906,7 @@
if err != nil {
return &voltha.PmConfigs{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
return handler.deviceMgr.listPmConfigs(ctx, id.Id)
}
@@ -947,14 +947,14 @@
return &voltha.SelfTestResponse{}, errors.New("UnImplemented")
}
-func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
+func (handler *APIHandler) forwardPacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
// request. For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
- if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{ID: packet.Id}); ownedByMe && err == nil {
- if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id); agent != nil {
- agent.packetOut(packet.PacketOut)
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.LogicalDeviceID{ID: packet.Id}); ownedByMe && err == nil {
+ if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
+ agent.packetOut(ctx, packet.PacketOut)
} else {
log.Errorf("No logical device agent present", log.Fields{"logicaldeviceID": packet.Id})
}
@@ -985,7 +985,7 @@
continue
}
- handler.forwardPacketOut(packet)
+ handler.forwardPacketOut(packets.Context(), packet)
}
log.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
@@ -1142,7 +1142,7 @@
if err != nil {
return &openflow_13.Meters{}, err // TODO: Return empty meter entry
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
}
@@ -1176,7 +1176,7 @@
failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
return failedresponse, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
@@ -1198,7 +1198,7 @@
if err != nil {
return &empty.Empty{}, err
}
- defer txn.Close()
+ defer txn.Close(ctx)
}
ch := make(chan interface{})
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 9df697d..b0404c4 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -421,7 +421,7 @@
assert.Nil(t, err)
// Verify that all onu devices are disabled as well
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(oltDevice.Id)
+ onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
@@ -453,7 +453,7 @@
assert.Nil(t, err)
// Verify that all onu devices are enabled as well
- onuDevices, err = nb.core.deviceMgr.getAllChildDevices(oltDevice.Id)
+ onuDevices, err = nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
@@ -492,7 +492,7 @@
assert.Nil(t, err)
// Verify that all onu devices are disabled as well
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(oltDevice.Id)
+ onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 3fd8740..80dcfd7 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -142,7 +142,7 @@
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
agent.lockLogicalDevice.Lock()
- logicalDevice, err := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
if err != nil {
return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
}
@@ -207,7 +207,7 @@
// Setup the device graph - run it in its own routine
if loadFromdB {
- go agent.generateDeviceGraph()
+ go agent.generateDeviceGraph(context.Background())
}
return nil
}
@@ -283,13 +283,13 @@
}
//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(ctx context.Context, flows *ofp.Flows) error {
ld := agent.getLogicalDeviceWithoutLock()
log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
ld.Flows = flows
- if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
return err
}
@@ -297,13 +297,13 @@
}
//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
-func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(ctx context.Context, meters *ofp.Meters) error {
ld := agent.getLogicalDeviceWithoutLock()
log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
ld.Meters = meters
- if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
return err
}
@@ -311,13 +311,13 @@
}
//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(ctx context.Context, flowGroups *ofp.FlowGroups) error {
ld := agent.getLogicalDeviceWithoutLock()
log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
ld.FlowGroups = flowGroups
- if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
return err
}
@@ -330,22 +330,22 @@
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
}
-func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalDeviceAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(device, port); err != nil {
+ if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, true)
} else if port.Type == voltha.Port_ETHERNET_UNI {
- if _, err = agent.addUNILogicalPort(device, port); err != nil {
+ if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, false)
} else {
// Update the device graph to ensure all routes on the logical device have been calculated
- if err = agent.updateRoutes(device, port); err != nil {
+ if err = agent.updateRoutes(ctx, device, port); err != nil {
log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
return err
}
@@ -359,13 +359,13 @@
func (agent *LogicalDeviceAgent) setupLogicalPorts(ctx context.Context) error {
log.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// First add any NNI ports which could have been missing
- if err := agent.setupNNILogicalPorts(context.TODO(), agent.rootDeviceID); err != nil {
+ if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
log.Errorw("error-setting-up-NNI-ports", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
return err
}
// Now, set up the UNI ports if needed.
- children, err := agent.deviceMgr.getAllChildDevices(agent.rootDeviceID)
+ children, err := agent.deviceMgr.getAllChildDevices(ctx, agent.rootDeviceID)
if err != nil {
log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
return err
@@ -375,7 +375,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(child *voltha.Device) {
- if err = agent.setupUNILogicalPorts(context.TODO(), child); err != nil {
+ if err = agent.setupUNILogicalPorts(ctx, child); err != nil {
log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
@@ -396,7 +396,7 @@
var err error
var device *voltha.Device
- if device, err = agent.deviceMgr.GetDevice(deviceID); err != nil {
+ if device, err = agent.deviceMgr.GetDevice(ctx, deviceID); err != nil {
log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
return err
}
@@ -404,7 +404,7 @@
//Get UNI port number
for _, port := range device.Ports {
if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(device, port); err != nil {
+ if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
agent.addLogicalPortToMap(port.PortNo, true)
@@ -414,7 +414,7 @@
}
// updatePortState updates the port state of the device
-func (agent *LogicalDeviceAgent) updatePortState(deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
log.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -430,7 +430,7 @@
cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
}
// Update the logical device
- if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
log.Errorw("error-updating-logical-device", log.Fields{"error": err})
return err
}
@@ -441,7 +441,7 @@
}
// updatePortsState updates the ports state related to the device
-func (agent *LogicalDeviceAgent) updatePortsState(device *voltha.Device, state voltha.AdminState_Types) error {
+func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -462,7 +462,7 @@
}
}
// Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
return err
}
@@ -478,7 +478,7 @@
//Get UNI port number
for _, port := range childDevice.Ports {
if port.Type == voltha.Port_ETHERNET_UNI {
- if added, err = agent.addUNILogicalPort(childDevice, port); err != nil {
+ if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
if added {
@@ -490,7 +490,7 @@
}
// deleteAllLogicalPorts deletes all logical ports associated with this device
-func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(device *voltha.Device) error {
+func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -507,7 +507,7 @@
if len(updateLogicalPorts) < len(cloned.Ports) {
cloned.Ports = updateLogicalPorts
// Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
return err
}
@@ -518,8 +518,8 @@
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
-func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
if err != nil {
log.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
@@ -534,7 +534,7 @@
//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
//that device graph was generated.
-func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded(ctx context.Context) error {
ld := agent.GetLogicalDevice()
agent.lockDeviceGraph.Lock()
defer agent.lockDeviceGraph.Unlock()
@@ -542,7 +542,7 @@
return nil
}
log.Debug("Generation of device graph required")
- agent.generateDeviceGraph()
+ agent.generateDeviceGraph(ctx)
return nil
}
@@ -552,16 +552,16 @@
if flow == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
return err
}
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
- return agent.flowAdd(flow)
+ return agent.flowAdd(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_DELETE:
- return agent.flowDelete(flow)
+ return agent.flowDelete(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
- return agent.flowDeleteStrict(flow)
+ return agent.flowDeleteStrict(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_MODIFY:
return agent.flowModify(flow)
case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
@@ -577,16 +577,16 @@
if groupMod == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
return err
}
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
- return agent.groupAdd(groupMod)
+ return agent.groupAdd(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_DELETE:
- return agent.groupDelete(groupMod)
+ return agent.groupDelete(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_MODIFY:
- return agent.groupModify(groupMod)
+ return agent.groupModify(ctx, groupMod)
}
return status.Errorf(codes.Internal,
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
@@ -598,23 +598,23 @@
if meterMod == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
return err
}
switch meterMod.GetCommand() {
case ofp.OfpMeterModCommand_OFPMC_ADD:
- return agent.meterAdd(meterMod)
+ return agent.meterAdd(ctx, meterMod)
case ofp.OfpMeterModCommand_OFPMC_DELETE:
- return agent.meterDelete(meterMod)
+ return agent.meterDelete(ctx, meterMod)
case ofp.OfpMeterModCommand_OFPMC_MODIFY:
- return agent.meterModify(meterMod)
+ return agent.meterModify(ctx, meterMod)
}
return status.Errorf(codes.Internal,
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, meterMod.GetCommand())
}
-func (agent *LogicalDeviceAgent) meterAdd(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
log.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
if meterMod == nil {
return nil
@@ -641,7 +641,7 @@
meterEntry := fu.MeterEntryFromMeterMod(meterMod)
meters = append(meters, meterEntry)
//Update model
- if err := agent.updateLogicalDeviceMetersWithoutLock(&ofp.Meters{Items: meters}); err != nil {
+ if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, &ofp.Meters{Items: meters}); err != nil {
log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -649,7 +649,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) meterDelete(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
log.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
if meterMod == nil {
return nil
@@ -685,7 +685,7 @@
if lDevice.Meters != nil {
metersToUpdate = &ofp.Meters{Items: meters}
}
- if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -694,7 +694,7 @@
}
if changedFow {
//Update model
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: updatedFlows}); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: updatedFlows}); err != nil {
log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -705,7 +705,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) meterModify(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
log.Debug("meterModify")
if meterMod == nil {
return nil
@@ -736,7 +736,7 @@
if lDevice.Meters != nil {
metersToUpdate = &ofp.Meters{Items: meters}
}
- if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -794,7 +794,7 @@
}
//flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
log.Debugw("flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
@@ -859,16 +859,16 @@
log.Error("Meter-referred-in-flows-not-present")
return err
}
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+ deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.addDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+ if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
// Update model
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -880,7 +880,7 @@
}
if changedMeterStats {
//Update model
- if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -923,7 +923,7 @@
}
//flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
log.Debug("flowDelete")
if mod == nil {
return nil
@@ -975,15 +975,15 @@
log.Error("Meter-referred-in-flows-not-present")
return errors.New("Meter-referred-in-flows-not-present")
}
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+ if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -993,7 +993,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
@@ -1001,7 +1001,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
@@ -1015,7 +1015,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -1023,7 +1023,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
@@ -1037,7 +1037,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -1045,7 +1045,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
@@ -1060,7 +1060,7 @@
}
//flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDeleteStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
log.Debug("flowDeleteStrict")
if mod == nil {
return nil
@@ -1102,7 +1102,7 @@
if lDevice.Meters != nil {
metersToUpdate = &ofp.Meters{Items: meters}
}
- if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1114,15 +1114,15 @@
log.Error("meter-referred-in-flows-not-present")
return err
}
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+ if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1140,7 +1140,7 @@
return errors.New("flowModifyStrict not implemented")
}
-func (agent *LogicalDeviceAgent) groupAdd(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
log.Debug("groupAdd")
if groupMod == nil {
return nil
@@ -1161,12 +1161,12 @@
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
log.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
- if err := agent.addDeviceFlowsAndGroups(deviceRules, &voltha.FlowMetadata{}); err != nil {
+ if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
- if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+ if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1176,7 +1176,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) groupDelete(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
log.Debug("groupDelete")
if groupMod == nil {
return nil
@@ -1205,23 +1205,23 @@
groupsChanged = true
}
if flowsChanged || groupsChanged {
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+ deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
+ if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
}
if groupsChanged {
- if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+ if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
if flowsChanged {
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1229,7 +1229,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) groupModify(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
log.Debug("groupModify")
if groupMod == nil {
return nil
@@ -1257,13 +1257,13 @@
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
log.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(deviceRules, &voltha.FlowMetadata{}); err != nil {
+ if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
return err
}
//lDevice.FlowGroups.Items = groups
- if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+ if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1272,7 +1272,7 @@
}
// deleteLogicalPort removes the logical port
-func (agent *LogicalDeviceAgent) deleteLogicalPort(lPort *voltha.LogicalPort) error {
+func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1290,18 +1290,18 @@
logicalDevice.Ports[len(logicalDevice.Ports)-1] = nil
logicalDevice.Ports = logicalDevice.Ports[:len(logicalDevice.Ports)-1]
log.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.updateLogicalDeviceWithoutLock(logicalDevice); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
// Reset the logical device graph
- go agent.generateDeviceGraph()
+ go agent.generateDeviceGraph(context.Background())
}
return nil
}
// deleteLogicalPorts removes the logical ports associated with that deviceId
-func (agent *LogicalDeviceAgent) deleteLogicalPorts(deviceID string) error {
+func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1314,18 +1314,18 @@
}
logicalDevice.Ports = updatedLPorts
log.Debugw("updated-logical-ports", log.Fields{"ports": updatedLPorts})
- if err := agent.updateLogicalDeviceWithoutLock(logicalDevice); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
// Reset the logical device graph
- go agent.generateDeviceGraph()
+ go agent.generateDeviceGraph(context.Background())
return nil
}
// enableLogicalPort enables the logical port
-func (agent *LogicalDeviceAgent) enableLogicalPort(lPortID string) error {
+func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1340,13 +1340,13 @@
}
if index >= 0 {
logicalDevice.Ports[index].OfpPort.Config = logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDeviceWithoutLock(logicalDevice)
+ return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
}
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
// disableLogicalPort disabled the logical port
-func (agent *LogicalDeviceAgent) disableLogicalPort(lPortID string) error {
+func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1361,7 +1361,7 @@
}
if index >= 0 {
logicalDevice.Ports[index].OfpPort.Config = (logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDeviceWithoutLock(logicalDevice)
+ return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
}
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
@@ -1472,7 +1472,7 @@
}
//updateRoutes rebuilds the device graph if not done already
-func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "device": device.Id, "port": port})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1483,25 +1483,25 @@
lDevice := agent.getLogicalDeviceWithoutLock()
//TODO: Find a better way to refresh only missing routes
- agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+ agent.deviceGraph.ComputeRoutes(ctx, lDevice.Ports)
agent.deviceGraph.Print()
return nil
}
//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
-func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+func (agent *LogicalDeviceAgent) updateDeviceGraph(ctx context.Context, lp *voltha.LogicalPort) {
log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
- agent.deviceGraph.AddPort(lp)
+ agent.deviceGraph.AddPort(ctx, lp)
agent.deviceGraph.Print()
}
//generateDeviceGraph regenerates the device graph
-func (agent *LogicalDeviceAgent) generateDeviceGraph() {
+func (agent *LogicalDeviceAgent) generateDeviceGraph(ctx context.Context) {
log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -1511,7 +1511,7 @@
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
- agent.deviceGraph.ComputeRoutes(ld.Ports)
+ agent.deviceGraph.ComputeRoutes(ctx, ld.Ports)
agent.deviceGraph.Print()
}
@@ -1555,7 +1555,7 @@
// portUpdated is invoked when a port is updated on the logical device. Until
// the POST_ADD notification is fixed, we will use the logical device to
// update that data.
-func (agent *LogicalDeviceAgent) portUpdated(args ...interface{}) interface{} {
+func (agent *LogicalDeviceAgent) portUpdated(ctx context.Context, args ...interface{}) interface{} {
log.Debugw("portUpdated-callback", log.Fields{"argsLen": len(args)})
var oldLD *voltha.LogicalDevice
@@ -1600,7 +1600,7 @@
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1617,7 +1617,7 @@
var portCap *ic.PortCapability
var err error
// First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(context.TODO(), device.Id, port.PortNo); err != nil {
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return false, err
}
@@ -1646,14 +1646,14 @@
}
cloned.Ports = append(cloned.Ports, lp)
- if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ if err = agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
log.Errorw("error-updating-logical-device", log.Fields{"error": err})
return false, err
}
// Update the device graph with this new logical port
clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
- go agent.updateDeviceGraph(clonedLP)
+ go agent.updateDeviceGraph(context.Background(), clonedLP)
return true, nil
}
@@ -1672,7 +1672,7 @@
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1688,7 +1688,7 @@
var portCap *ic.PortCapability
var err error
// First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(context.TODO(), childDevice.Id, port.PortNo); err != nil {
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return false, err
}
@@ -1713,16 +1713,16 @@
cloned.Ports = make([]*voltha.LogicalPort, 0)
}
cloned.Ports = append(cloned.Ports, portCap.Port)
- if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
return false, err
}
// Update the device graph with this new logical port
clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- go agent.updateDeviceGraph(clonedLP)
+ go agent.updateDeviceGraph(context.Background(), clonedLP)
return true, nil
}
-func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
+func (agent *LogicalDeviceAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
log.Debugw("packet-out", log.Fields{
"packet": hex.EncodeToString(packet.Data),
"inPort": packet.GetInPort(),
@@ -1730,7 +1730,7 @@
outPort := fu.GetPacketOutPort(packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
- if err := agent.deviceMgr.packetOut(agent.rootDeviceID, outPort, packet); err != nil {
+ if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
log.Error("packetout-failed", log.Fields{"logicalDeviceID": agent.rootDeviceID})
}
}
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 65c5cb6..ece7c7b 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -502,7 +502,7 @@
// Change the state of the first port to FAILED
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
assert.Nil(t, err)
localWG.Done()
}()
@@ -510,7 +510,7 @@
// Change the state of the second port to TESTING
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
assert.Nil(t, err)
localWG.Done()
}()
@@ -518,9 +518,9 @@
// Change the state of the third port to UNKNOWN and then back to ACTIVE
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
assert.Nil(t, err)
- err = ldAgent.updatePortState(lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
+ err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
assert.Nil(t, err)
localWG.Done()
}()
@@ -540,7 +540,7 @@
}
localWG.Add(1)
go func() {
- err := ldAgent.meterAdd(meterMod)
+ err := ldAgent.meterAdd(context.Background(), meterMod)
assert.Nil(t, err)
localWG.Done()
}()
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 7195034..4026ba1 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -38,7 +38,7 @@
core *Core
deviceMgr *DeviceManager
grpcNbiHdlr *APIHandler
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
clusterDataProxy *model.Proxy
exitChannel chan int
defaultTimeout int64
@@ -46,7 +46,7 @@
logicalDeviceLoadingInProgress map[string][]chan int
}
-func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.core = core
logicalDeviceMgr.exitChannel = make(chan int, 1)
@@ -96,7 +96,7 @@
// getLogicalDeviceAgent returns the logical device agent. If the device is not in memory then the device will
// be loaded from dB and a logical device agent created to managed it.
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceID string) *LogicalDeviceAgent {
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(ctx context.Context, logicalDeviceID string) *LogicalDeviceAgent {
agent, ok := ldMgr.logicalDeviceAgents.Load(logicalDeviceID)
if ok {
lda := agent.(*LogicalDeviceAgent)
@@ -110,7 +110,7 @@
return lda
}
// Try to load into memory - loading will also create the logical device agent
- if err := ldMgr.load(logicalDeviceID); err == nil {
+ if err := ldMgr.load(ctx, logicalDeviceID); err == nil {
if agent, ok = ldMgr.logicalDeviceAgents.Load(logicalDeviceID); ok {
return agent.(*LogicalDeviceAgent)
}
@@ -124,9 +124,9 @@
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
-func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalDeviceManager) getLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.GetLogicalDevice(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -147,10 +147,10 @@
}
//listLogicalDevices returns the list of all logical devices
-func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
+func (ldMgr *LogicalDeviceManager) listLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- logicalDevices, err := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, true, "")
+ logicalDevices, err := ldMgr.clusterDataProxy.List(ctx, "/logical_devices", 0, true, "")
if err != nil {
log.Errorw("failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
@@ -186,7 +186,7 @@
ldMgr.addLogicalDeviceAgentToMap(agent)
// Update the root device with the logical device Id reference
- if err := ldMgr.deviceMgr.setParentID(device, id); err != nil {
+ if err := ldMgr.deviceMgr.setParentID(ctx, device, id); err != nil {
log.Errorw("failed-setting-parent-id", log.Fields{"logicalDeviceId": id, "deviceId": device.Id})
return nil, err
}
@@ -205,7 +205,7 @@
// stopManagingLogicalDeviceWithDeviceId stops the management of the logical device. This implies removal of any
// reference of this logical device in cache. The device Id is passed as param because the logical device may already
// have been removed from the model. This function returns the logical device Id if found
-func (ldMgr *LogicalDeviceManager) stopManagingLogicalDeviceWithDeviceID(id string) string {
+func (ldMgr *LogicalDeviceManager) stopManagingLogicalDeviceWithDeviceID(ctx context.Context, id string) string {
log.Infow("stop-managing-logical-device", log.Fields{"deviceId": id})
// Go over the list of logical device agents to find the one which has rootDeviceId as id
var ldID = ""
@@ -213,7 +213,7 @@
ldAgent := value.(*LogicalDeviceAgent)
if ldAgent.rootDeviceID == id {
log.Infow("stopping-logical-device-agent", log.Fields{"lDeviceId": key})
- if err := ldAgent.stop(context.TODO()); err != nil {
+ if err := ldAgent.stop(ctx); err != nil {
log.Errorw("failed-to-stop-LDAgent", log.Fields{"error": err})
return false
}
@@ -226,8 +226,8 @@
}
//getLogicalDeviceFromModel retrieves the logical device data from the model.
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(lDeviceID string) (*voltha.LogicalDevice, error) {
- logicalDevice, err := ldMgr.clusterDataProxy.Get(context.Background(), "/logical_devices/"+lDeviceID, 0, false, "")
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
+ logicalDevice, err := ldMgr.clusterDataProxy.Get(ctx, "/logical_devices/"+lDeviceID, 0, false, "")
if err != nil {
log.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
@@ -241,7 +241,7 @@
}
// load loads a logical device manager in memory
-func (ldMgr *LogicalDeviceManager) load(lDeviceID string) error {
+func (ldMgr *LogicalDeviceManager) load(ctx context.Context, lDeviceID string) error {
if lDeviceID == "" {
return nil
}
@@ -251,11 +251,11 @@
if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceID); ldAgent == nil {
ldMgr.logicalDeviceLoadingInProgress[lDeviceID] = []chan int{make(chan int, 1)}
ldMgr.logicalDevicesLoadingLock.Unlock()
- if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceID); err == nil {
+ if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
agent := newLogicalDeviceAgent(lDeviceID, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
- if err := agent.start(context.TODO(), true); err != nil {
- if err := agent.stop(context.TODO()); err != nil {
+ if err := agent.start(ctx, true); err != nil {
+ if err := agent.stop(ctx); err != nil {
log.Errorw("failed-to-stop-agent", log.Fields{"error": err})
return err
}
@@ -297,7 +297,7 @@
return errors.New("device-not-root")
}
logDeviceID := device.ParentId
- if agent := ldMgr.getLogicalDeviceAgent(logDeviceID); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, logDeviceID); agent != nil {
// Stop the logical device agent
if err := agent.stop(ctx); err != nil {
log.Errorw("failed-to-stop-agent", log.Fields{"error": err})
@@ -315,7 +315,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceID(device *voltha.Device) (*string, error) {
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceID(ctx context.Context, device *voltha.Device) (*string, error) {
// Device can either be a parent or a child device
if device.Root {
// Parent device. The ID of a parent device is the logical device ID
@@ -323,31 +323,31 @@
}
// Device is child device
// retrieve parent device using child device ID
- if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+ if parentDevice := ldMgr.deviceMgr.getParentDevice(ctx, device); parentDevice != nil {
return &parentDevice.ParentId, nil
}
return nil, status.Errorf(codes.NotFound, "%s", device.Id)
}
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceIDFromDeviceID(deviceID string) (*string, error) {
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceIDFromDeviceID(ctx context.Context, deviceID string) (*string, error) {
// Get the device
var device *voltha.Device
var err error
- if device, err = ldMgr.deviceMgr.GetDevice(deviceID); err != nil {
+ if device, err = ldMgr.deviceMgr.GetDevice(ctx, deviceID); err != nil {
return nil, err
}
- return ldMgr.getLogicalDeviceID(device)
+ return ldMgr.getLogicalDeviceID(ctx, device)
}
-func (ldMgr *LogicalDeviceManager) getLogicalPortID(device *voltha.Device) (*voltha.LogicalPortId, error) {
+func (ldMgr *LogicalDeviceManager) getLogicalPortID(ctx context.Context, device *voltha.Device) (*voltha.LogicalPortId, error) {
// Get the logical device where this device is attached
var lDeviceID *string
var err error
- if lDeviceID, err = ldMgr.getLogicalDeviceID(device); err != nil {
+ if lDeviceID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
return nil, err
}
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.getLogicalDevice(*lDeviceID); err != nil {
+ if lDevice, err = ldMgr.getLogicalDevice(ctx, *lDeviceID); err != nil {
return nil, err
}
// Go over list of ports
@@ -362,7 +362,7 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceFlows(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -371,7 +371,7 @@
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceFlowGroups(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -380,17 +380,17 @@
// ListLogicalDevicePorts returns logical device ports
func (ldMgr *LogicalDeviceManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDevicePorts(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
-func (ldMgr *LogicalDeviceManager) getLogicalPort(lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+func (ldMgr *LogicalDeviceManager) getLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
// Get the logical device where this device is attached
var err error
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.getLogicalDevice(lPortID.Id); err != nil {
+ if lDevice, err = ldMgr.getLogicalDevice(ctx, lPortID.Id); err != nil {
return nil, err
}
// Go over list of ports
@@ -404,15 +404,15 @@
// updateLogicalPort sets up a logical port on the logical device based on the device port
// information, if needed
-func (ldMgr *LogicalDeviceManager) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
- ldID, err := ldMgr.getLogicalDeviceID(device)
+func (ldMgr *LogicalDeviceManager) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ ldID, err := ldMgr.getLogicalDeviceID(ctx, device)
if err != nil || *ldID == "" {
// This is not an error as the logical device may not have been created at this time. In such a case,
// the ports will be created when the logical device is ready.
return nil
}
- if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
- if err := agent.updateLogicalPort(device, port); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
+ if err := agent.updateLogicalPort(ctx, device, port); err != nil {
return err
}
}
@@ -425,7 +425,7 @@
// Get logical port
var logicalPort *voltha.LogicalPort
var err error
- if logicalPort, err = ldMgr.getLogicalPort(lPortID); err != nil {
+ if logicalPort, err = ldMgr.getLogicalPort(ctx, lPortID); err != nil {
log.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
return err
}
@@ -433,8 +433,8 @@
if logicalPort.RootPort {
return errors.New("device-root")
}
- if agent := ldMgr.getLogicalDeviceAgent(lPortID.Id); agent != nil {
- if err := agent.deleteLogicalPort(logicalPort); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id); agent != nil {
+ if err := agent.deleteLogicalPort(ctx, logicalPort); err != nil {
log.Warnw("deleting-logicalport-failed", log.Fields{"LDeviceId": lPortID.Id, "error": err})
}
}
@@ -444,15 +444,15 @@
}
// deleteLogicalPort removes the logical port associated with a child device
-func (ldMgr *LogicalDeviceManager) deleteLogicalPorts(deviceID string) error {
+func (ldMgr *LogicalDeviceManager) deleteLogicalPorts(ctx context.Context, deviceID string) error {
log.Debugw("deleting-logical-ports", log.Fields{"deviceId": deviceID})
// Get logical port
- ldID, err := ldMgr.getLogicalDeviceIDFromDeviceID(deviceID)
+ ldID, err := ldMgr.getLogicalDeviceIDFromDeviceID(ctx, deviceID)
if err != nil {
return err
}
- if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
- if err = agent.deleteLogicalPorts(deviceID); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
+ if err = agent.deleteLogicalPorts(ctx, deviceID); err != nil {
log.Warnw("deleteLogicalPorts-failed", log.Fields{"ldeviceId": *ldID})
return err
}
@@ -470,7 +470,7 @@
// Get the logical device id parent device
parentID := childDevice.ParentId
- logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(parentID)
+ logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
@@ -478,7 +478,7 @@
return errors.New("device-in-invalid-state")
}
- if agent := ldMgr.getLogicalDeviceAgent(logDeviceID); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, logDeviceID); agent != nil {
if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
return err
}
@@ -486,53 +486,53 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) deleteAllLogicalPorts(device *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleteAllLogicalPorts", log.Fields{"deviceId": device.Id})
var ldID *string
var err error
//Get the logical device Id for this device
- if ldID, err = ldMgr.getLogicalDeviceID(device); err != nil {
+ if ldID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
log.Warnw("no-logical-device-found", log.Fields{"deviceId": device.Id, "error": err})
return err
}
- if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
- if err := agent.deleteAllLogicalPorts(device); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
+ if err := agent.deleteAllLogicalPorts(ctx, device); err != nil {
return err
}
}
return nil
}
-func (ldMgr *LogicalDeviceManager) updatePortState(deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
+func (ldMgr *LogicalDeviceManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
log.Debugw("updatePortState", log.Fields{"deviceId": deviceID, "state": state, "portNo": portNo})
var ldID *string
var err error
//Get the logical device Id for this device
- if ldID, err = ldMgr.getLogicalDeviceIDFromDeviceID(deviceID); err != nil {
+ if ldID, err = ldMgr.getLogicalDeviceIDFromDeviceID(ctx, deviceID); err != nil {
log.Warnw("no-logical-device-found", log.Fields{"deviceId": deviceID, "error": err})
return err
}
- if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
- if err := agent.updatePortState(deviceID, portNo, state); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
+ if err := agent.updatePortState(ctx, deviceID, portNo, state); err != nil {
return err
}
}
return nil
}
-func (ldMgr *LogicalDeviceManager) updatePortsState(device *voltha.Device, state voltha.AdminState_Types) error {
+func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
var ldID *string
var err error
//Get the logical device Id for this device
- if ldID, err = ldMgr.getLogicalDeviceID(device); err != nil {
+ if ldID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
log.Warnw("no-logical-device-found", log.Fields{"deviceId": device.Id, "error": err})
return err
}
- if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
- if err := agent.updatePortsState(device, state); err != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
+ if err := agent.updatePortsState(ctx, device, state); err != nil {
return err
}
}
@@ -542,7 +542,7 @@
func (ldMgr *LogicalDeviceManager) updateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
log.Debugw("updateFlowTable", log.Fields{"logicalDeviceId": id})
var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateFlowTable(ctx, flow)
log.Debugw("updateFlowTable-result", log.Fields{"result": res})
} else {
@@ -554,7 +554,7 @@
func (ldMgr *LogicalDeviceManager) updateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
log.Debugw("updateMeterTable", log.Fields{"logicalDeviceId": id})
var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateMeterTable(ctx, meter)
log.Debugw("updateMeterTable-result", log.Fields{"result": res})
} else {
@@ -566,7 +566,7 @@
// ListLogicalDeviceMeters returns logical device meters
func (ldMgr *LogicalDeviceManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
log.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceMeters(), nil
}
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -574,7 +574,7 @@
func (ldMgr *LogicalDeviceManager) updateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
log.Debugw("updateGroupTable", log.Fields{"logicalDeviceId": id})
var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateGroupTable(ctx, groupMod)
log.Debugw("updateGroupTable-result", log.Fields{"result": res})
} else {
@@ -586,8 +586,8 @@
func (ldMgr *LogicalDeviceManager) enableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
log.Debugw("enableLogicalPort", log.Fields{"logicalDeviceId": id})
var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(id.Id); agent != nil {
- res = agent.enableLogicalPort(id.PortId)
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
+ res = agent.enableLogicalPort(ctx, id.PortId)
log.Debugw("enableLogicalPort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
@@ -598,8 +598,8 @@
func (ldMgr *LogicalDeviceManager) disableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
log.Debugw("disableLogicalPort", log.Fields{"logicalDeviceId": id})
var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(id.Id); agent != nil {
- res = agent.disableLogicalPort(id.PortId)
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
+ res = agent.disableLogicalPort(ctx, id.PortId)
log.Debugw("disableLogicalPort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
@@ -607,9 +607,9 @@
sendAPIResponse(ctx, ch, res)
}
-func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
+func (ldMgr *LogicalDeviceManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
log.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceID, "port": port})
- if agent := ldMgr.getLogicalDeviceAgent(logicalDeviceID); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
agent.packetIn(port, transactionID, packet)
} else {
log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceID})
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 93dd28f..fb51d2e 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -30,6 +30,7 @@
package core
import (
+ "context"
"time"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -173,7 +174,7 @@
*/
// Acquired aquires transaction status
-func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
+func (c *KVTransaction) Acquired(ctx context.Context, minDuration int64, ownedByMe ...bool) (bool, error) {
var acquired bool
var currOwner string
var res int
@@ -197,15 +198,15 @@
// Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
// transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
// a success from a request completion perspective).
- if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+ if err := c.tryToReserveTxn(ctx, durationInSecs*2); err == nil {
res = SeizedBySelf
} else {
log.Debugw("watch-other-server",
log.Fields{"transactionId": c.txnID, "owner": currOwner, "timeout": durationInSecs})
- res = c.Watch(durationInSecs)
+ res = c.Watch(ctx, durationInSecs)
}
} else {
- res = c.Watch(durationInSecs)
+ res = c.Watch(ctx, durationInSecs)
}
switch res {
case SeizedBySelf, AbandonedByOther:
@@ -217,11 +218,11 @@
return acquired, nil
}
-func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+func (c *KVTransaction) tryToReserveTxn(ctxt context.Context, durationInSecs int64) error {
var currOwner string
var res int
var err error
- value, _ := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+ value, _ := ctx.kvClient.Reserve(ctxt, c.txnKey, ctx.owner, durationInSecs)
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnID, "error": err})
@@ -231,7 +232,7 @@
log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnID, "result": txnState[res]})
// Setup the monitoring channel
c.monitorCh = make(chan int)
- go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+ go c.holdOnToTxnUntilProcessingCompleted(ctxt, c.txnKey, ctx.owner, durationInSecs)
return nil
}
}
@@ -239,22 +240,21 @@
}
// Watch watches transaction
-func (c *KVTransaction) Watch(durationInSecs int64) int {
+func (c *KVTransaction) Watch(ctxt context.Context, durationInSecs int64) int {
var res int
-
- events := ctx.kvClient.Watch(c.txnKey)
+ events := ctx.kvClient.Watch(ctxt, c.txnKey)
defer ctx.kvClient.CloseWatch(c.txnKey, events)
transactionWasAcquiredByOther := false
//Check whether the transaction was already completed by the other Core before we got here.
- if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+ if kvp, _ := ctx.kvClient.Get(ctxt, c.txnKey); kvp != nil {
transactionWasAcquiredByOther = true
if val, err := kvstore.ToString(kvp.Value); err == nil {
if val == TransactionComplete {
res = CompletedByOther
// Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
- err = c.Delete()
+ err = c.Delete(ctxt)
if err != nil {
log.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
}
@@ -283,7 +283,7 @@
if val == TransactionComplete {
res = CompletedByOther
// Successful request completion has been detected. Remove the transaction key
- err := c.Delete()
+ err := c.Delete(ctxt)
if err != nil {
log.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
}
@@ -315,12 +315,12 @@
}
// Close closes transaction
-func (c *KVTransaction) Close() error {
+func (c *KVTransaction) Close(ctxt context.Context) error {
log.Debugw("close", log.Fields{"txn": c.txnID})
// Stop monitoring the key (applies only when there has been no transaction switch over)
if c.monitorCh != nil {
close(c.monitorCh)
- err := ctx.kvClient.Put(c.txnKey, TransactionComplete, ctx.kvOperationTimeout)
+ err := ctx.kvClient.Put(ctxt, c.txnKey, TransactionComplete)
if err != nil {
log.Errorw("unable-to-write-a-key-value-pair-to-the-KV-store", log.Fields{"error": err})
@@ -330,15 +330,15 @@
}
// Delete deletes transaction
-func (c *KVTransaction) Delete() error {
+func (c *KVTransaction) Delete(ctxt context.Context) error {
log.Debugw("delete", log.Fields{"txn": c.txnID})
- return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+ return ctx.kvClient.Delete(ctxt, c.txnKey)
}
// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete. durationInSecs
// is used to calculate the frequency at which the Core processing the transaction renews the lease. This function
// exits only when the transaction is Closed, i.e completed.
-func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(ctxt context.Context, key string, owner string, durationInSecs int64) {
log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnID})
renewInterval := durationInSecs / NumTxnRenewalPerRequest
if renewInterval < MinTxnRenewalIntervalInSec {
@@ -351,7 +351,7 @@
log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnID})
break forLoop
case <-time.After(time.Duration(renewInterval) * time.Second):
- if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+ if err := ctx.kvClient.RenewReservation(ctxt, c.txnKey); err != nil {
// Log and continue.
log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
}