[VOL-3108] Sending multipart messages when size is bigger than 64KB
This patch is a quick and dirty workaround to verify these messages are
causing the issue. Will need to be appropriateli reworked once
confirmed.
Change-Id: Ie70f107281023e5c0272fd52091d71fcb73154b5
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 8330d10..bb6a941 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -56,21 +56,27 @@
return ofc.SendMessage(response)
case ofp.OFPSTFlow:
statsReq := request.(*ofp.FlowStatsRequest)
- response, err := ofc.handleFlowStatsRequest(statsReq)
+ responses, err := ofc.handleFlowStatsRequest(statsReq)
if err != nil {
return err
}
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
- resJs, _ := json.Marshal(response)
+ resJs, _ := json.Marshal(responses)
logger.Debugw("handle-stats-request-flow",
log.Fields{
- "device-id": ofc.DeviceID,
- "request": reqJs,
- "response-object": response,
- "response": resJs})
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "responses-object": responses,
+ "response": resJs})
}
- return ofc.SendMessage(response)
+ for _, response := range responses {
+ err := ofc.SendMessage(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
case ofp.OFPSTAggregate:
statsReq := request.(*ofp.AggregateStatsRequest)
@@ -106,20 +112,26 @@
return ofc.SendMessage(response)
case ofp.OFPSTPort:
statsReq := request.(*ofp.PortStatsRequest)
- response, err := ofc.handlePortStatsRequest(statsReq)
+ responses, err := ofc.handlePortStatsRequest(statsReq)
if err != nil {
return err
}
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
- resJs, _ := json.Marshal(response)
+ resJs, _ := json.Marshal(responses)
logger.Debugw("handle-stats-request-port",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ for _, response := range responses {
+ err := ofc.SendMessage(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
case ofp.OFPSTQueue:
statsReq := request.(*ofp.QueueStatsRequest)
response, err := ofc.handleQueueStatsRequest(statsReq)
@@ -251,20 +263,26 @@
return ofc.SendMessage(response)
case ofp.OFPSTPortDesc:
statsReq := request.(*ofp.PortDescStatsRequest)
- response, err := ofc.handlePortDescStatsRequest(statsReq)
+ responses, err := ofc.handlePortDescStatsRequest(statsReq)
if err != nil {
return err
}
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
- resJs, _ := json.Marshal(response)
+ resJs, _ := json.Marshal(responses)
logger.Debugw("handle-stats-request-port-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ for _, response := range responses {
+ err := ofc.SendMessage(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
case ofp.OFPSTExperimenter:
statsReq := request.(*ofp.ExperimenterStatsRequest)
@@ -311,21 +329,17 @@
return response, nil
}
-func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) ([]*ofp.FlowStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- response := ofp.NewFlowStatsReply()
- response.SetXid(request.GetXid())
- response.SetVersion(4)
- response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
resp, err := volthaClient.ListLogicalDeviceFlows(context.Background(),
&common.ID{Id: ofc.DeviceID})
if err != nil {
return nil, err
}
- var flow []*ofp.FlowStatsEntry
+ var flows []*ofp.FlowStatsEntry
for _, item := range resp.GetItems() {
entry := ofp.NewFlowStatsEntry()
entry.SetTableId(uint8(item.GetTableId()))
@@ -357,10 +371,32 @@
instructions = append(instructions, instruction)
}
entry.Instructions = instructions
- flow = append(flow, entry)
+ flows = append(flows, entry)
}
- response.SetEntries(flow)
- return response, nil
+ var responses []*ofp.FlowStatsReply
+ chunkSize := 150
+ total := len(flows) / chunkSize
+ n := 0
+ for n <= total {
+
+ chunk := flows[n*chunkSize : min((n*chunkSize)+chunkSize, len(flows))]
+
+ if len(chunk) == 0 {
+ break
+ }
+
+ response := ofp.NewFlowStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(4)
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ if total != n {
+ response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+ }
+ response.SetEntries(chunk)
+ responses = append(responses, response)
+ n++
+ }
+ return responses, nil
}
func (ofc *OFConnection) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
@@ -528,15 +564,12 @@
return response, nil
}
-func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) ([]*ofp.PortStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- response := ofp.NewPortStatsReply()
- response.SetXid(request.GetXid())
- response.SetVersion(request.GetVersion())
- response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+
reply, err := volthaClient.ListLogicalDevicePorts(context.Background(),
&common.ID{Id: ofc.DeviceID})
if err != nil {
@@ -554,19 +587,38 @@
}
}
}
- response.SetEntries(entries)
- return response, nil
+
+ var responses []*ofp.PortStatsReply
+ chunkSize := 500
+ total := len(entries) / chunkSize
+ n := 0
+ for n <= total {
+
+ chunk := entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))]
+
+ if len(chunk) == 0 {
+ break
+ }
+
+ response := ofp.NewPortStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ if total != n {
+ response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+ }
+ response.SetEntries(entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))])
+ responses = append(responses, response)
+ n++
+ }
+ return responses, nil
}
-func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) ([]*ofp.PortDescStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
}
- response := ofp.NewPortDescStatsReply()
- response.SetVersion(request.GetVersion())
- response.SetXid(request.GetXid())
- response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+
logicalDevice, err := volthaClient.GetLogicalDevice(context.Background(),
&common.ID{Id: ofc.DeviceID})
if err != nil {
@@ -597,10 +649,39 @@
entries = append(entries, &entry)
}
- response.SetEntries(entries)
- //TODO call voltha and get port descriptions etc
- return response, nil
+ var responses []*ofp.PortDescStatsReply
+ chunkSize := 500
+ total := len(entries) / chunkSize
+ n := 0
+ for n <= total {
+ chunk := entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))]
+
+ if len(chunk) == 0 {
+ break
+ }
+
+ response := ofp.NewPortDescStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ if total != n {
+ response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+ }
+ response.SetEntries(chunk)
+ responses = append(responses, response)
+ n++
+ }
+ return responses, nil
+
+}
+
+// Interestingly enough there is no min function fot two integers
+func min(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
}
func (ofc *OFConnection) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {