[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7abd8f2..6a14e5a 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -64,7 +64,7 @@
portLoader *port.Loader
}
-func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+func newLogicalAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
agent := &LogicalAgent{
logicalDeviceID: id,
@@ -82,7 +82,7 @@
meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
return agent
}
@@ -93,13 +93,13 @@
return nil
}
- logger.Infow("starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+ logger.Infow(ctx, "starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
var startSucceeded bool
defer func() {
if !startSucceeded {
if err := agent.stop(ctx); err != nil {
- logger.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}
}()
@@ -121,7 +121,7 @@
}
ld.DatapathId = datapathID
ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
- logger.Debugw("Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
+ logger.Debugw(ctx, "Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
ld.Flows = &ofp.Flows{Items: nil}
ld.FlowGroups = &ofp.FlowGroups{Items: nil}
@@ -129,10 +129,10 @@
// Save the logical device
if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
- logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
- logger.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
+ logger.Debugw(ctx, "logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
agent.logicalDevice = ld
@@ -140,7 +140,7 @@
go func() {
err := agent.setupLogicalPorts(context.Background())
if err != nil {
- logger.Errorw("unable-to-setup-logical-ports", log.Fields{"error": err})
+ logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
}
}()
} else {
@@ -171,7 +171,7 @@
if loadFromDB {
go func() {
if err := agent.buildRoutes(context.Background()); err != nil {
- logger.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
}
@@ -184,7 +184,7 @@
func (agent *LogicalAgent) stop(ctx context.Context) error {
var returnErr error
agent.stopOnce.Do(func() {
- logger.Info("stopping-logical_device-agent")
+ logger.Info(ctx, "stopping-logical_device-agent")
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
@@ -197,14 +197,14 @@
if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
- logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ logger.Debugw(ctx, "logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
// TODO: remove all entries from all loaders
// TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
- logger.Info("logical_device-agent-stopped")
+ logger.Info(ctx, "logical_device-agent-stopped")
})
return returnErr
}
@@ -218,8 +218,8 @@
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
-func (agent *LogicalAgent) addFlowsAndGroupsToDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+ logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -230,7 +230,7 @@
defer cancel()
start := time.Now()
if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
+ logger.Errorw(ctx, "flow-add-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
response.Done()
@@ -240,8 +240,8 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
- logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
+ logger.Debugw(ctx, "send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -252,7 +252,7 @@
defer cancel()
start := time.Now()
if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flows-and-groups-delete-failed", log.Fields{
+ logger.Errorw(ctx, "flows-and-groups-delete-failed", log.Fields{
"device-id": deviceId,
"error": err,
"flow-cookie": mod.Cookie,
@@ -266,8 +266,8 @@
return responses
}
-func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+ logger.Debugw(ctx, "send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -277,7 +277,7 @@
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
+ logger.Errorw(ctx, "flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
response.Done()
@@ -286,25 +286,25 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows map[uint64]*ofp.OfpFlowStats, metadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
- logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows map[uint64]*ofp.OfpFlowStats, metadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
+ logger.Debugw(ctx, "deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
for _, flow := range flows {
response := coreutils.NewResponse()
responses = append(responses, response)
uniPort, err := agent.getUNILogicalPortNo(flow)
if err != nil {
- logger.Error("no-uni-port-in-flow", log.Fields{"deviceID": agent.rootDeviceID, "flow": flow, "error": err})
+ logger.Error(ctx, "no-uni-port-in-flow", log.Fields{"deviceID": agent.rootDeviceID, "flow": flow, "error": err})
response.Error(err)
response.Done()
continue
}
- logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
+ logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
- logger.Error("flow-delete-failed-from-parent-device", log.Fields{
+ logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
"device-id": agent.rootDeviceID,
"error": err,
"flow-cookie": mod.Cookie,
@@ -318,7 +318,7 @@
}
func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
- logger.Debugw("packet-out", log.Fields{
+ logger.Debugw(ctx, "packet-out", log.Fields{
"packet": hex.EncodeToString(packet.Data),
"inPort": packet.GetInPort(),
})
@@ -326,17 +326,17 @@
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
- logger.Error("packetout-failed", log.Fields{"logicalDeviceID": agent.rootDeviceID})
+ logger.Error(ctx, "packetout-failed", log.Fields{"logicalDeviceID": agent.rootDeviceID})
}
}
-func (agent *LogicalAgent) packetIn(port uint32, transactionID string, packet []byte) {
- logger.Debugw("packet-in", log.Fields{
+func (agent *LogicalAgent) packetIn(ctx context.Context, port uint32, transactionID string, packet []byte) {
+ logger.Debugw(ctx, "packet-in", log.Fields{
"port": port,
"packet": hex.EncodeToString(packet),
"transactionId": transactionID,
})
packetIn := fu.MkPacketIn(port, packet)
- agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
- logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
+ agent.ldeviceMgr.SendPacketIn(ctx, agent.logicalDeviceID, transactionID, packetIn)
+ logger.Debugw(ctx, "sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}