VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 2fca7c5..9ea12fa 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -67,7 +67,7 @@
// This can happen when an agent for the logical device has been created but the logical device
// itself is not ready for action as it is waiting for switch and port capabilities from the
// relevant adapter. In such a case prevent any request aimed at that logical device.
- logger.Debugf(ctx, "Logical device %s is not ready to serve requests", logicalDeviceID)
+ logger.Debugf(ctx, "logical-device-%s-is-not-ready-to-serve-requests", logicalDeviceID)
return nil
}
return lda
@@ -88,7 +88,8 @@
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
- logger.Debugw(ctx, "getlogicalDevice", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevice")
+ logger.Debugw(ctx, "get-logical-device", log.Fields{"logical-device-id": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.GetLogicalDeviceReadOnly(ctx)
}
@@ -97,7 +98,8 @@
//ListLogicalDevices returns the list of all logical devices
func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
- logger.Debug(ctx, "ListAllLogicalDevices")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevices")
+ logger.Debug(ctx, "list-all-logical-devices")
var logicalDevices []*voltha.LogicalDevice
if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
@@ -139,7 +141,8 @@
go func() {
//TODO: either wait for the agent to be started before returning, or
// implement locks in the agent to ensure request are not processed before start() is complete
- err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
+ ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.start(ldCtx, false)
if err != nil {
logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
ldMgr.deleteLogicalDeviceAgent(id)
@@ -205,7 +208,7 @@
}
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
} else {
- logger.Debugw(ctx, "logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
+ logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
}
// announce completion of task to any number of waiting channels
ldMgr.logicalDevicesLoadingLock.Lock()
@@ -281,7 +284,8 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlows", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlows")
+ logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -298,7 +302,8 @@
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlowGroups", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlowGroups")
+ logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -315,7 +320,8 @@
// ListLogicalDevicePorts returns logical device ports
func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
- logger.Debugw(ctx, "ListLogicalDevicePorts", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevicePorts")
+ logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -332,6 +338,7 @@
// GetLogicalDevicePort returns logical device port details
func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevicePort")
// Get the logical device where this port is attached
agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
if agent == nil {
@@ -382,7 +389,7 @@
}
func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"child-device-id": childDevice.Id, "parent-device-id": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -392,7 +399,7 @@
parentID := childDevice.ParentId
logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": logDeviceID, "parentId": parentID})
if parentID == "" || logDeviceID == "" {
return errors.New("device-in-invalid-state")
@@ -407,7 +414,7 @@
}
func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "deleteAllLogicalPorts", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": device.Id})
var ldID *string
var err error
@@ -425,7 +432,7 @@
}
func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortState", log.Fields{"device-id": deviceID, "state": state, "portNo": portNo})
+ logger.Debugw(ctx, "update-Port-state", log.Fields{"device-id": deviceID, "state": state, "port-no": portNo})
var ldID *string
var err error
@@ -444,7 +451,8 @@
// UpdateLogicalDeviceFlowTable updates logical device flow table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowTable")
+ logger.Debugw(ctx, "update-logical-device-flow-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -454,7 +462,8 @@
// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceMeterTable", log.Fields{"logical-device-id": meter.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceMeterTable")
+ logger.Debugw(ctx, "update-logical-device-meter-table", log.Fields{"logical-device-id": meter.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
@@ -464,7 +473,8 @@
// ListLogicalDeviceMeters returns logical device meters
func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
- logger.Debugw(ctx, "ListLogicalDeviceMeters", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceMeters")
+ logger.Debugw(ctx, "list-logical-device-meters", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -480,7 +490,8 @@
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateGroupTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowGroupTable")
+ logger.Debugw(ctx, "update-group-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -490,7 +501,8 @@
// EnableLogicalDevicePort enables logical device port
func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "EnableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "EnableLogicalDevicePort")
+ logger.Debugw(ctx, "enable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -504,7 +516,8 @@
// DisableLogicalDevicePort disables logical device port
func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "DisableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "DisableLogicalDevicePort")
+ logger.Debugw(ctx, "disable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -517,7 +530,7 @@
}
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
- logger.Debugw(ctx, "packetIn", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
+ logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
agent.packetIn(ctx, port, transactionID, packet)
} else {
@@ -529,35 +542,45 @@
// StreamPacketsOut sends packets to adapter
func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
ctx := context.Background()
- logger.Debugw(ctx, "StreamPacketsOut-request", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request", log.Fields{"packets": packets})
+
loop:
for {
select {
case <-packets.Context().Done():
- logger.Infow(ctx, "StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+ logger.Infow(ctx, "stream-packets-out-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
break loop
default:
}
packet, err := packets.Recv()
+ pktCtx := utils.WithRPCMetadataContext(packets.Context(), "StreamPacketsOut")
+
if err == io.EOF {
- logger.Debugw(ctx, "Received-EOF", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "received-eof", log.Fields{"packets": packets})
break loop
}
-
if err != nil {
- logger.Errorw(ctx, "Failed to receive packet out", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
+ go ldMgr.SendRPCEvent(pktCtx, packet.Id, err.Error(), nil,
+ "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
continue
}
- if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
- agent.packetOut(packets.Context(), packet.PacketOut)
+ if agent := ldMgr.getLogicalDeviceAgent(pktCtx, packet.Id); agent != nil {
+ agent.packetOut(pktCtx, packet.PacketOut)
} else {
- logger.Errorf(ctx, "No logical device agent present", log.Fields{"logical-device-id": packet.Id})
+ logger.Errorf(ctx, "no-logical-device-agent-present", log.Fields{"logical-device-id": packet.Id})
}
}
- logger.Debugw(ctx, "StreamPacketsOut-request-done", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request-done", log.Fields{"packets": packets})
return nil
}
+
+func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+ id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+ category, subCategory, raisedTs)
+}