[VOL-3141] Created Span for various Openflow and Ofagent operations
Change-Id: I49e371479edc087dcf89bd60b43c000ab0bb5547
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index 7ec9aae..3f5bb7f 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,6 +29,9 @@
)
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
+ span, ctx := log.CreateChildSpan(ctx, "receive-change-events")
+ defer span.Finish()
+
logger.Debug(ctx, "receive-change-events-started")
// If we exit, assume disconnected
defer func() {
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index f369ab3..4fe0de3 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -55,7 +55,7 @@
if err == nil {
svc := voltha.NewVolthaServiceClient(conn)
if svc != nil {
- if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
+ if _, err = svc.GetVoltha(log.WithSpanFromContext(context.Background(), ctx), &empty.Empty{}); err == nil {
logger.Debugw(ctx, "Established connection to Voltha",
log.Fields{
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 2a49e94..4d28410 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,6 +31,9 @@
)
func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
+ span, ctx := log.CreateChildSpan(ctx, "receive-packets-in")
+ defer span.Finish()
+
logger.Debug(ctx, "receive-packets-in-started")
// If we exit, assume disconnected
defer func() {
@@ -42,7 +45,7 @@
return
}
opt := grpc.EmptyCallOption{}
- streamCtx, streamDone := context.WithCancel(context.Background())
+ streamCtx, streamDone := context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
defer streamDone()
stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
@@ -70,6 +73,9 @@
}
func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
+ span, ctx := log.CreateChildSpan(ctx, "handle-packets-in")
+ defer span.Finish()
+
logger.Debug(ctx, "handle-packets-in-started")
top:
for {
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index d4b6a73..c8bd299 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -25,6 +25,9 @@
)
func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
+ span, ctx := log.CreateChildSpan(ctx, "stream-packet-out")
+ defer span.Finish()
+
logger.Debug(ctx, "packet-out-started")
// If we exit, assume disconnected
defer func() {
@@ -36,7 +39,7 @@
return
}
opt := grpc.EmptyCallOption{}
- streamCtx, streamDone := context.WithCancel(context.Background())
+ streamCtx, streamDone := context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
defer streamDone()
if err != nil {
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 43cbb86..f2dd1b0 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -43,13 +43,16 @@
}
func (ofa *OFAgent) refreshDeviceList(ctx context.Context) {
+ span, ctx := log.CreateChildSpan(ctx, "refresh-device-list")
+ defer span.Finish()
+
// If we exit, assume disconnected
if ofa.volthaClient == nil {
logger.Error(ctx, "no-voltha-connection")
ofa.events <- ofaEventVolthaDisconnected
return
}
- deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
+ deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(log.WithSpanFromContext(context.Background(), ctx), &empty.Empty{})
if err != nil {
logger.Errorw(ctx, "ofagent failed to query device list from voltha",
log.Fields{"error": err})
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 86bb6b9..ca8121c 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -202,7 +202,6 @@
}
func (ofc *OFClient) Run(ctx context.Context) {
-
for _, endpoint := range ofc.OFControllerEndPoints {
connection := &OFConnection{
OFControllerEndPoint: endpoint,
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index aafe741..a3a3555 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -147,7 +147,7 @@
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateStarted || state == ofcStateDisconnected {
state = ofcStateConnected
- ofCtx, ofDone = context.WithCancel(context.Background())
+ ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
} else {
diff --git a/internal/pkg/openflow/echo.go b/internal/pkg/openflow/echo.go
index e02e00f..9d0026f 100644
--- a/internal/pkg/openflow/echo.go
+++ b/internal/pkg/openflow/echo.go
@@ -24,6 +24,9 @@
)
func (ofc *OFConnection) handleEchoRequest(ctx context.Context, request *ofp.EchoRequest) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-echo")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleEchoRequest called",
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index 7f3b557..d9caec6 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -26,6 +26,9 @@
)
func (ofc *OFConnection) handleFeatureRequest(ctx context.Context, request *ofp.FeaturesRequest) error {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-feature")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleFeatureRequest called",
@@ -38,7 +41,7 @@
return NoVolthaConnectionError
}
var id = common.ID{Id: ofc.DeviceID}
- logicalDevice, err := volthaClient.GetLogicalDevice(context.Background(), &id)
+ logicalDevice, err := volthaClient.GetLogicalDevice(log.WithSpanFromContext(context.Background(), ctx), &id)
if err != nil {
return err
}
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 1a47d17..86088de 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -72,6 +72,9 @@
}
func (ofc *OFConnection) handleFlowAdd(ctx context.Context, flowAdd *ofp.FlowAdd) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-flow-add")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowAdd)
logger.Debugw(ctx, "handleFlowAdd called",
@@ -240,7 +243,7 @@
"flow-mod-object": flowUpdate,
"flow-mod-request": flowUpdateJs})
}
- if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+ if _, err := volthaClient.UpdateLogicalDeviceFlowTable(log.WithSpanFromContext(context.Background(), ctx), &flowUpdate); err != nil {
logger.Errorw(ctx, "Error calling FlowAdd ",
log.Fields{
"device-id": ofc.DeviceID,
@@ -278,6 +281,9 @@
}
func (ofc *OFConnection) handleFlowMod(ctx context.Context, flowMod *ofp.FlowMod) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-flow-modification")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowMod)
logger.Debugw(ctx, "handleFlowMod called",
@@ -290,6 +296,9 @@
}
func (ofc *OFConnection) handleFlowModStrict(ctx context.Context, flowModStrict *ofp.FlowModifyStrict) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-flow-modification-strict")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowModStrict)
logger.Debugw(ctx, "handleFlowModStrict called",
@@ -302,6 +311,9 @@
}
func (ofc *OFConnection) handleFlowDelete(ctx context.Context, flowDelete *ofp.FlowDelete) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-flow-delete")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDelete)
logger.Debugw(ctx, "handleFlowDelete called",
@@ -315,6 +327,9 @@
}
func (ofc *OFConnection) handleFlowDeleteStrict(ctx context.Context, flowDeleteStrict *ofp.FlowDeleteStrict) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-flow-delete-strict")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDeleteStrict)
logger.Debugw(ctx, "handleFlowDeleteStrict called",
@@ -443,7 +458,7 @@
"device-id": ofc.DeviceID,
"flow-update": flowUpdateJs})
}
- if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+ if _, err := volthaClient.UpdateLogicalDeviceFlowTable(log.WithSpanFromContext(context.Background(), ctx), &flowUpdate); err != nil {
logger.Errorw(ctx, "Error calling FlowDelete ",
log.Fields{
"device-id": ofc.DeviceID,
diff --git a/internal/pkg/openflow/getConfig.go b/internal/pkg/openflow/getConfig.go
index eef49eb..3991d0c 100644
--- a/internal/pkg/openflow/getConfig.go
+++ b/internal/pkg/openflow/getConfig.go
@@ -24,6 +24,9 @@
)
func (ofc *OFConnection) handleGetConfigRequest(ctx context.Context, request *ofp.GetConfigRequest) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-get-config")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleGetConfigRequest called",
diff --git a/internal/pkg/openflow/group.go b/internal/pkg/openflow/group.go
index 425e99b..99d12b7 100644
--- a/internal/pkg/openflow/group.go
+++ b/internal/pkg/openflow/group.go
@@ -26,6 +26,8 @@
)
func (ofc *OFConnection) handleGroupMod(ctx context.Context, groupMod ofp.IGroupMod) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-group-modification")
+ defer span.Finish()
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
@@ -44,7 +46,7 @@
},
}
- _, err := volthaClient.UpdateLogicalDeviceFlowGroupTable(context.Background(), groupUpdate)
+ _, err := volthaClient.UpdateLogicalDeviceFlowGroupTable(log.WithSpanFromContext(context.Background(), ctx), groupUpdate)
if err != nil {
logger.Errorw(ctx, "Error updating group table",
log.Fields{"device-id": ofc.DeviceID, "error": err})
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index 1a110ac..2334fb2 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -27,6 +27,9 @@
)
func (ofc *OFConnection) handleMeterModRequest(ctx context.Context, request *ofp.MeterMod) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-meter-modification")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleMeterModRequest called",
@@ -95,7 +98,7 @@
"device-id": ofc.DeviceID,
"meter-mod-request": meterModJS})
}
- if _, err := volthaClient.UpdateLogicalDeviceMeterTable(context.Background(), &meterModUpdate); err != nil {
+ if _, err := volthaClient.UpdateLogicalDeviceMeterTable(log.WithSpanFromContext(context.Background(), ctx), &meterModUpdate); err != nil {
logger.Errorw(ctx, "Error calling UpdateLogicalDeviceMeterTable",
log.Fields{
"device-id": ofc.DeviceID,
diff --git a/internal/pkg/openflow/packet.go b/internal/pkg/openflow/packet.go
index dcef5fc..dfbb0d9 100644
--- a/internal/pkg/openflow/packet.go
+++ b/internal/pkg/openflow/packet.go
@@ -25,6 +25,9 @@
)
func (ofc *OFConnection) handlePacketOut(ctx context.Context, packetOut *ofp.PacketOut) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-packet-out")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(packetOut)
logger.Debugw(ctx, "handlePacketOut called",
diff --git a/internal/pkg/openflow/role.go b/internal/pkg/openflow/role.go
index 740616f..d522bb2 100644
--- a/internal/pkg/openflow/role.go
+++ b/internal/pkg/openflow/role.go
@@ -25,6 +25,9 @@
)
func (ofc *OFConnection) handleRoleRequest(ctx context.Context, request *ofp.RoleRequest) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-role")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleRoleRequest called",
diff --git a/internal/pkg/openflow/setConfig.go b/internal/pkg/openflow/setConfig.go
index 3ffba6b..554db38 100644
--- a/internal/pkg/openflow/setConfig.go
+++ b/internal/pkg/openflow/setConfig.go
@@ -24,6 +24,9 @@
)
func (ofc *OFConnection) handleSetConfig(ctx context.Context, request *ofp.SetConfig) {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-set-config")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleSetConfig called",
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 78a9b93..d50fc14 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -29,6 +29,9 @@
)
func (ofc *OFConnection) handleStatsRequest(ctx context.Context, request ofp.IHeader, statType uint16) error {
+ span, ctx := log.CreateChildSpan(ctx, "openflow-stats")
+ defer span.Finish()
+
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw(ctx, "handleStatsRequest called",
@@ -41,7 +44,7 @@
switch statType {
case ofp.OFPSTDesc:
statsReq := request.(*ofp.DescStatsRequest)
- response, err := ofc.handleDescStatsRequest(statsReq)
+ response, err := ofc.handleDescStatsRequest(ctx, statsReq)
if err != nil {
return err
}
@@ -113,7 +116,7 @@
return ofc.SendMessage(ctx, response)
case ofp.OFPSTPort:
statsReq := request.(*ofp.PortStatsRequest)
- responses, err := ofc.handlePortStatsRequest(statsReq)
+ responses, err := ofc.handlePortStatsRequest(ctx, statsReq)
if err != nil {
return err
}
@@ -151,7 +154,7 @@
return ofc.SendMessage(ctx, response)
case ofp.OFPSTGroup:
statsReq := request.(*ofp.GroupStatsRequest)
- response, err := ofc.handleGroupStatsRequest(statsReq)
+ response, err := ofc.handleGroupStatsRequest(ctx, statsReq)
if err != nil {
return err
}
@@ -200,7 +203,7 @@
return ofc.SendMessage(ctx, response)
case ofp.OFPSTMeter:
statsReq := request.(*ofp.MeterStatsRequest)
- response, err := ofc.handleMeterStatsRequest(statsReq)
+ response, err := ofc.handleMeterStatsRequest(ctx, statsReq)
if err != nil {
return err
}
@@ -264,7 +267,7 @@
return ofc.SendMessage(ctx, response)
case ofp.OFPSTPortDesc:
statsReq := request.(*ofp.PortDescStatsRequest)
- responses, err := ofc.handlePortDescStatsRequest(statsReq)
+ responses, err := ofc.handlePortDescStatsRequest(ctx, statsReq)
if err != nil {
return err
}
@@ -305,7 +308,7 @@
return nil
}
-func (ofc *OFConnection) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
+func (ofc *OFConnection) handleDescStatsRequest(ctx context.Context, request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
@@ -315,7 +318,7 @@
response.SetVersion(request.GetVersion())
response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
- resp, err := volthaClient.GetLogicalDevice(context.Background(),
+ resp, err := volthaClient.GetLogicalDevice(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -335,7 +338,7 @@
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- resp, err := volthaClient.ListLogicalDeviceFlows(context.Background(),
+ resp, err := volthaClient.ListLogicalDeviceFlows(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -410,7 +413,7 @@
return response, nil
}
-func (ofc *OFConnection) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsRequest(ctx context.Context, request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
@@ -419,7 +422,7 @@
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
- reply, err := volthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+ reply, err := volthaClient.ListLogicalDeviceFlowGroups(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -461,7 +464,7 @@
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
- reply, err := volthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+ reply, err := volthaClient.ListLogicalDeviceFlowGroups(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -492,7 +495,7 @@
return response, nil
}
-func (ofc *OFConnection) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
+func (ofc *OFConnection) handleMeterStatsRequest(ctx context.Context, request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
@@ -501,7 +504,7 @@
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
- resp, err := volthaClient.ListLogicalDeviceMeters(context.Background(),
+ resp, err := volthaClient.ListLogicalDeviceMeters(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -565,13 +568,13 @@
return response, nil
}
-func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) ([]*ofp.PortStatsReply, error) {
+func (ofc *OFConnection) handlePortStatsRequest(ctx context.Context, request *ofp.PortStatsRequest) ([]*ofp.PortStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- reply, err := volthaClient.ListLogicalDevicePorts(context.Background(),
+ reply, err := volthaClient.ListLogicalDevicePorts(log.WithSpanFromContext(context.Background(), ctx),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
@@ -614,13 +617,13 @@
return responses, nil
}
-func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) ([]*ofp.PortDescStatsReply, error) {
+func (ofc *OFConnection) handlePortDescStatsRequest(ctx context.Context, request *ofp.PortDescStatsRequest) ([]*ofp.PortDescStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- ports, err := volthaClient.ListLogicalDevicePorts(context.Background(), &common.ID{Id: ofc.DeviceID})
+ ports, err := volthaClient.ListLogicalDevicePorts(log.WithSpanFromContext(context.Background(), ctx), &common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
}
diff --git a/internal/pkg/openflow/stats_test.go b/internal/pkg/openflow/stats_test.go
index 35cd503..e80d239 100644
--- a/internal/pkg/openflow/stats_test.go
+++ b/internal/pkg/openflow/stats_test.go
@@ -242,7 +242,7 @@
// request stats for all ports
request.PortNo = 0xffffffff
- replies, err := ofc.handlePortStatsRequest(request)
+ replies, err := ofc.handlePortStatsRequest(context.Background(), request)
assert.Equal(t, err, nil)
assert.Equal(t, int(math.Ceil(float64(generatedPortsCount)/ofcPortsChunkSize)), len(replies))
@@ -287,7 +287,7 @@
request := of13.NewPortDescStatsRequest()
- replies, err := ofc.handlePortDescStatsRequest(request)
+ replies, err := ofc.handlePortDescStatsRequest(context.Background(), request)
assert.Equal(t, err, nil)
// check that the correct number of messages is generated