go routines removed but writing to kafka is still done via go routines
<Please press edit button to view in proper formatting>
The go routines related to port , device , transition state and kafka writing was removed from the voltha core and the PONC scale test performance was compared with the voltha core keeping the go routines intact.
The tests were carried out for an average of 5 runs with :
NUM_OLTS=6
NUM_ONUS_PER_PON=32
STACK_COUNT=1
NUM_PON_PORTS=16
The table below shows the performance in different cases:
| Test suite | with go routine | without go routine(only port and device op goroutine removed) | without go routine(port and device op go routine removed,tansition state processing,kafka writing is in go routine) | without go routine(port and device operation ,transition state processing and kafka writing go routine removed) | without go routine(port and device op and tansition state processing go routine removed ,only kafka writing is in go routine) |
|---------------------------------------|----------------------------------------------------------|---------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| | | | | | |
| vOLTHA Scale Test | 04 minutes : 36 seconds : 957.5999999999767 milliseconds | 04 minutes : 30 seconds : 233.79999999998836 milliseconds | 04 minutes : 33 seconds : 859.5999999999767 milliseconds | 05 minutes : 47 seconds : 175.20000000001164 milliseconds | 04 minutes : 34 seconds : 200 milliseconds |
| vOLTHA Scale Test.Activate OLTs | 02 minutes : 15 seconds : 440.3999999999942 milliseconds | 02 minutes : 02 seconds : 861.8000000000029 milliseconds | 02 minutes : 12 seconds : 925.6000000000058 milliseconds | 02 minutes : 16 seconds : 199.39999999999418 milliseconds | 02 minutes : 11 seconds : 800 milliseconds<br/> |
| vOLTHA Scale Test.Onos Checks | 11 seconds : 191.39999999999964 milliseconds | 10 seconds : 976.3999999999996 milliseconds | 11 seconds : 552.6000000000004 milliseconds | 10 seconds : 826.3999999999996 milliseconds | 12 seconds : 00 milliseconds |
| vOLTHA Scale Test.Activate Onus | 02 minutes : 10 seconds : 314.1999999999971 milliseconds | 02 minutes : 07 seconds : 102.80000000000291 milliseconds | 02 minutes : 09 seconds : 381.3999999999942 milliseconds | 03 minutes : 20 seconds : 149.39999999999418 milliseconds | 02 minutes : 11 seconds : 00 milliseconds |
| vOLTHA Scale Test.Activate Onus.Scale | 02 minutes : 09 seconds : 207 milliseconds | 02 minutes : 06 seconds : 858.8000000000029 milliseconds | 02 minutes : 07 seconds : 153 milliseconds | 03 minutes : 17 seconds : 63 milliseconds | 02 minutes : 08 seconds : 400 milliseconds<br/> |
Change-Id: I7a7ed75be37065094c2f6b3524101e76a6a795ea
diff --git a/VERSION b/VERSION
index d202b7e..7148b0a 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.9-dev
+3.1.9
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 6599e13..d6430af 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -1060,17 +1060,16 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
- device, prevDevice, transientState, prevTransientState); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
- // Sending RPC EVENT here
- rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
- nil, time.Now().Unix())
- }
- }()
+
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(ctx,
+ device, prevDevice, transientState, prevTransientState); err != nil {
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().Unix())
+ }
+
return nil
}
func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
@@ -1135,15 +1134,15 @@
}
subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- go func() {
- defer cancel()
- _, err := client.ChildDeviceLost(subCtx, device)
- if err == nil {
- agent.onSuccess(subCtx, nil, nil, true)
- } else {
- agent.onFailure(subCtx, err, nil, nil, true)
- }
- }()
+
+ defer cancel()
+ _, err = client.ChildDeviceLost(subCtx, device)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+
return nil
}
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index 937c46c..baa74ac 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -82,14 +82,11 @@
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
+ // TODO: VOL-2707
+ logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
+ }
- go func(portID uint32, ctx context.Context) {
- if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
- // TODO: VOL-2707
- logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
- }
- }(portID, subCtx)
}
}
portHandle.Unlock()
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7a69722..966ab24 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -133,13 +133,12 @@
agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- err := agent.setupLogicalPorts(subCtx)
- if err != nil {
- logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
- }
- }()
+
+ err = agent.setupLogicalPorts(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
+ }
+
} else {
// Check to see if we need to load from dB
ld = logicalDevice
@@ -172,12 +171,11 @@
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if logicalDeviceExist {
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.buildRoutes(subCtx); err != nil {
- logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
+
+ if err := agent.buildRoutes(ctx); err != nil {
+ logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+
}
startSucceeded = true
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index c2dba5a..4fa0090 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -187,6 +187,7 @@
// Create the go routines to wait
go func() {
+
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChannels...); res != nil {
logger.Errorw(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
@@ -257,16 +258,15 @@
respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, mod)
// Wait for the responses
- go func() {
- // Since this action is taken following an add failure, we may also receive a failure for the revert
- if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
- logger.Warnw(ctx, "failure-reverting-added-flows", log.Fields{
- "logical-device-id": agent.logicalDeviceID,
- "flow-cookie": mod.Cookie,
- "errors": res,
- })
- }
- }()
+
+ // Since this action is taken following an add failure, we may also receive a failure for the revert
+ if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
+ logger.Warnw(ctx, "failure-reverting-added-flows", log.Fields{
+ "logical-device-id": agent.logicalDeviceID,
+ "flow-cookie": mod.Cookie,
+ "errors": res,
+ })
+ }
return nil
}
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 719b6b8..54fa620 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -57,24 +57,21 @@
}
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.buildRoutes(subCtx); err != nil {
- // Not an error - temporary state
- logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
- }
- }()
+ if err := agent.buildRoutes(ctx); err != nil {
+ // Not an error - temporary state
+ logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
+ }
+
//fallthrough
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.updateAllRoutes(subCtx, device.Id, devicePorts); err != nil {
- // Not an error - temporary state
- logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
- }
- }()
+
+ if err := agent.updateAllRoutes(ctx, device.Id, devicePorts); err != nil {
+ // Not an error - temporary state
+ logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
+ }
+
default:
return fmt.Errorf("invalid port type %v", port)
}
@@ -218,12 +215,10 @@
}
// Reset the logical device routes
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.removeRoutes(subCtx); err != nil {
- logger.Warnw(ctx, "error-removing-routes", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
+ if err := agent.removeRoutes(ctx); err != nil {
+ logger.Warnw(ctx, "error-removing-routes", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+
return nil
}
@@ -249,12 +244,11 @@
}
// Reset the logical device routes
- go func() {
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.buildRoutes(subCtx); err != nil {
- logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
+
+ if err := agent.buildRoutes(ctx); err != nil {
+ logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+
return nil
}
@@ -346,7 +340,6 @@
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
}
-
// send event, and allow any queued events to be sent as well
queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, nniPort.OfpPort)
}()
@@ -398,9 +391,8 @@
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
}
-
// send event, and allow any queued events to be sent as well
- queuePosition.send(subCtx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
+ queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
}()
return nil
}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 9f03746..8fe04a9 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -169,16 +169,11 @@
return nil, err
}
- go func() {
- //TODO: either wait for the agent to be started before returning, or
- // implement locks in the agent to ensure request are not processed before start() is complete
- ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
- err := agent.start(ldCtx, false, nil)
- if err != nil {
- logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
- ldMgr.deleteLogicalDeviceAgent(id)
- }
- }()
+ err := agent.start(ctx, false, nil)
+ if err != nil {
+ logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
+ ldMgr.deleteLogicalDeviceAgent(id)
+ }
logger.Debug(ctx, "creating-logical-device-ends")
return &id, nil
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 879eaef..e8ffacf 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -400,7 +400,7 @@
"device-type": deviceAgent.deviceType,
"adapter-type": adapter.Type,
})
- go deviceAgent.ReconcileDevice(utils.WithNewSpanAndRPCMetadataContext(ctx, "ReconcileDevice"))
+ deviceAgent.ReconcileDevice(utils.WithNewSpanAndRPCMetadataContext(ctx, "ReconcileDevice"))
numberOfDevicesToReconcile++
} else {
logger.Errorw(ctx, "failed-aborting-exisiting-processing", log.Fields{"error": err})
@@ -455,12 +455,9 @@
return err
}
// Setup peer ports in its own routine
- go func() {
- subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := dMgr.addPeerPort(subCtx, deviceID, port); err != nil {
- logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
- }
- }()
+ if err := dMgr.addPeerPort(ctx, deviceID, port); err != nil {
+ logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
+ }
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
@@ -596,19 +593,16 @@
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
- go func() {
- subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
- err := dMgr.logicalDeviceMgr.updatePortState(subCtx, deviceID, portNo, operStatus)
- if err != nil {
- // While we want to handle (catch) and log when
- // an update to a port was not able to be
- // propagated to the logical port, we can report
- // it as a warning and not an error because it
- // doesn't stop or modify processing.
- // TODO: VOL-2707
- logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
- }
- }()
+ err := dMgr.logicalDeviceMgr.updatePortState(ctx, deviceID, portNo, operStatus)
+ if err != nil {
+ // While we want to handle (catch) and log when
+ // an update to a port was not able to be
+ // propagated to the logical port, we can report
+ // it as a warning and not an error because it
+ // doesn't stop or modify processing.
+ // TODO: VOL-2707
+ logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
+ }
}
return nil
}
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 5f1c01c..0a5eff8 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -43,11 +43,11 @@
return nil, err
}
// Setup peer ports in its own routine
- go func() {
- if err := dMgr.addPeerPort(log.WithSpanFromContext(context.Background(), ctx), port.DeviceId, port); err != nil {
- logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": port.DeviceId})
- }
- }()
+
+ if err := dMgr.addPeerPort(ctx, port.DeviceId, port); err != nil {
+ logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": port.DeviceId})
+ }
+
return &empty.Empty{}, nil
}
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)