VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 6b0b6f7..7ab00da 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -92,7 +92,7 @@
return nil
}
- logger.Infow(ctx, "starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+ logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
var startSucceeded bool
defer func() {
@@ -128,13 +128,14 @@
logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
- logger.Debugw(ctx, "logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
+ logger.Debugw(ctx, "logical-device-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
- err := agent.setupLogicalPorts(log.WithSpanFromContext(context.Background(), ctx))
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.setupLogicalPorts(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
}
@@ -169,7 +170,8 @@
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromDB {
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -183,7 +185,7 @@
func (agent *LogicalAgent) stop(ctx context.Context) error {
var returnErr error
agent.stopOnce.Do(func() {
- logger.Info(ctx, "stopping-logical_device-agent")
+ logger.Info(ctx, "stopping-logical-device-agent")
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
@@ -202,14 +204,14 @@
if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
- logger.Debugw(ctx, "logicaldevice-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Debugw(ctx, "logical-device-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
}
// TODO: remove all entries from all loaders
// TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
- logger.Info(ctx, "logical_device-agent-stopped")
+ logger.Info(ctx, "logical-device-agent-stopped")
})
return returnErr
}
@@ -224,7 +226,7 @@
}
func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+ logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules, "flow-metadata": flowMetadata})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -232,6 +234,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -260,6 +264,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -286,6 +292,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw(ctx, "flow-update-failed", log.Fields{"device-id": deviceId, "error": err})
@@ -313,6 +321,8 @@
logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
@@ -331,15 +341,15 @@
func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
if logger.V(log.InfoLevel) {
logger.Infow(ctx, "packet-out", log.Fields{
- "packet": hex.EncodeToString(packet.Data),
- "inPort": packet.GetInPort(),
+ "packet": hex.EncodeToString(packet.Data),
+ "in-port": packet.GetInPort(),
})
}
outPort := fu.GetPacketOutPort(packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
- logger.Error(ctx, "packetout-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
+ logger.Error(ctx, "packet-out-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
}
}