[VOL-3108] Sending multipart messages when size is bigger than 64KB

This patch is a quick and dirty workaround to verify these messages are
causing the issue. Will need to be appropriateli reworked once
confirmed.

Change-Id: Ie70f107281023e5c0272fd52091d71fcb73154b5
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 8330d10..bb6a941 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -56,21 +56,27 @@
 		return ofc.SendMessage(response)
 	case ofp.OFPSTFlow:
 		statsReq := request.(*ofp.FlowStatsRequest)
-		response, err := ofc.handleFlowStatsRequest(statsReq)
+		responses, err := ofc.handleFlowStatsRequest(statsReq)
 		if err != nil {
 			return err
 		}
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
-			resJs, _ := json.Marshal(response)
+			resJs, _ := json.Marshal(responses)
 			logger.Debugw("handle-stats-request-flow",
 				log.Fields{
-					"device-id":       ofc.DeviceID,
-					"request":         reqJs,
-					"response-object": response,
-					"response":        resJs})
+					"device-id":        ofc.DeviceID,
+					"request":          reqJs,
+					"responses-object": responses,
+					"response":         resJs})
 		}
-		return ofc.SendMessage(response)
+		for _, response := range responses {
+			err := ofc.SendMessage(response)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
 
 	case ofp.OFPSTAggregate:
 		statsReq := request.(*ofp.AggregateStatsRequest)
@@ -106,20 +112,26 @@
 		return ofc.SendMessage(response)
 	case ofp.OFPSTPort:
 		statsReq := request.(*ofp.PortStatsRequest)
-		response, err := ofc.handlePortStatsRequest(statsReq)
+		responses, err := ofc.handlePortStatsRequest(statsReq)
 		if err != nil {
 			return err
 		}
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
-			resJs, _ := json.Marshal(response)
+			resJs, _ := json.Marshal(responses)
 			logger.Debugw("handle-stats-request-port",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
 					"response":  resJs})
 		}
-		return ofc.SendMessage(response)
+		for _, response := range responses {
+			err := ofc.SendMessage(response)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
 	case ofp.OFPSTQueue:
 		statsReq := request.(*ofp.QueueStatsRequest)
 		response, err := ofc.handleQueueStatsRequest(statsReq)
@@ -251,20 +263,26 @@
 		return ofc.SendMessage(response)
 	case ofp.OFPSTPortDesc:
 		statsReq := request.(*ofp.PortDescStatsRequest)
-		response, err := ofc.handlePortDescStatsRequest(statsReq)
+		responses, err := ofc.handlePortDescStatsRequest(statsReq)
 		if err != nil {
 			return err
 		}
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
-			resJs, _ := json.Marshal(response)
+			resJs, _ := json.Marshal(responses)
 			logger.Debugw("handle-stats-request-port-desc",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
 					"response":  resJs})
 		}
-		return ofc.SendMessage(response)
+		for _, response := range responses {
+			err := ofc.SendMessage(response)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
 
 	case ofp.OFPSTExperimenter:
 		statsReq := request.(*ofp.ExperimenterStatsRequest)
@@ -311,21 +329,17 @@
 	return response, nil
 }
 
-func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) ([]*ofp.FlowStatsReply, error) {
 	volthaClient := ofc.VolthaClient.Get()
 	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
-	response := ofp.NewFlowStatsReply()
-	response.SetXid(request.GetXid())
-	response.SetVersion(4)
-	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
 	resp, err := volthaClient.ListLogicalDeviceFlows(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
 	}
-	var flow []*ofp.FlowStatsEntry
+	var flows []*ofp.FlowStatsEntry
 	for _, item := range resp.GetItems() {
 		entry := ofp.NewFlowStatsEntry()
 		entry.SetTableId(uint8(item.GetTableId()))
@@ -357,10 +371,32 @@
 			instructions = append(instructions, instruction)
 		}
 		entry.Instructions = instructions
-		flow = append(flow, entry)
+		flows = append(flows, entry)
 	}
-	response.SetEntries(flow)
-	return response, nil
+	var responses []*ofp.FlowStatsReply
+	chunkSize := 150
+	total := len(flows) / chunkSize
+	n := 0
+	for n <= total {
+
+		chunk := flows[n*chunkSize : min((n*chunkSize)+chunkSize, len(flows))]
+
+		if len(chunk) == 0 {
+			break
+		}
+
+		response := ofp.NewFlowStatsReply()
+		response.SetXid(request.GetXid())
+		response.SetVersion(4)
+		response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+		if total != n {
+			response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+		}
+		response.SetEntries(chunk)
+		responses = append(responses, response)
+		n++
+	}
+	return responses, nil
 }
 
 func (ofc *OFConnection) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
@@ -528,15 +564,12 @@
 	return response, nil
 }
 
-func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) ([]*ofp.PortStatsReply, error) {
 	volthaClient := ofc.VolthaClient.Get()
 	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
-	response := ofp.NewPortStatsReply()
-	response.SetXid(request.GetXid())
-	response.SetVersion(request.GetVersion())
-	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+
 	reply, err := volthaClient.ListLogicalDevicePorts(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
@@ -554,19 +587,38 @@
 			}
 		}
 	}
-	response.SetEntries(entries)
-	return response, nil
+
+	var responses []*ofp.PortStatsReply
+	chunkSize := 500
+	total := len(entries) / chunkSize
+	n := 0
+	for n <= total {
+
+		chunk := entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))]
+
+		if len(chunk) == 0 {
+			break
+		}
+
+		response := ofp.NewPortStatsReply()
+		response.SetXid(request.GetXid())
+		response.SetVersion(request.GetVersion())
+		if total != n {
+			response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+		}
+		response.SetEntries(entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))])
+		responses = append(responses, response)
+		n++
+	}
+	return responses, nil
 }
 
-func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) ([]*ofp.PortDescStatsReply, error) {
 	volthaClient := ofc.VolthaClient.Get()
 	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
-	response := ofp.NewPortDescStatsReply()
-	response.SetVersion(request.GetVersion())
-	response.SetXid(request.GetXid())
-	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+
 	logicalDevice, err := volthaClient.GetLogicalDevice(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
@@ -597,10 +649,39 @@
 		entries = append(entries, &entry)
 	}
 
-	response.SetEntries(entries)
-	//TODO call voltha and get port descriptions etc
-	return response, nil
+	var responses []*ofp.PortDescStatsReply
+	chunkSize := 500
+	total := len(entries) / chunkSize
+	n := 0
+	for n <= total {
 
+		chunk := entries[n*chunkSize : min((n*chunkSize)+chunkSize, len(entries))]
+
+		if len(chunk) == 0 {
+			break
+		}
+
+		response := ofp.NewPortDescStatsReply()
+		response.SetVersion(request.GetVersion())
+		response.SetXid(request.GetXid())
+		response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+		if total != n {
+			response.SetFlags(ofp.StatsReplyFlags(ofp.OFPSFReplyMore))
+		}
+		response.SetEntries(chunk)
+		responses = append(responses, response)
+		n++
+	}
+	return responses, nil
+
+}
+
+// Interestingly enough there is no min function fot two integers
+func min(a, b int) int {
+	if a < b {
+		return a
+	}
+	return b
 }
 
 func (ofc *OFConnection) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {