[VOL-3143] Optimize and test ofAgent response chunking
Change-Id: Ia48199ea055d1833a116a3ac47c72f09ca78563a
diff --git a/cmd/ofagent/profile.go b/cmd/ofagent/profile.go
index 6ac7528..f539afd 100644
--- a/cmd/ofagent/profile.go
+++ b/cmd/ofagent/profile.go
@@ -28,4 +28,4 @@
logger.Fatal(http.ListenAndServe("0.0.0.0:6060", nil))
}()
-}
\ No newline at end of file
+}
diff --git a/internal/pkg/mock/voltha_client_mock.go b/internal/pkg/mock/voltha_client_mock.go
new file mode 100644
index 0000000..9b583ae
--- /dev/null
+++ b/internal/pkg/mock/voltha_client_mock.go
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mock
+
+import (
+ "context"
+ . "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-protos/v3/go/common"
+ "github.com/opencord/voltha-protos/v3/go/omci"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ . "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc"
+)
+
+type MockVolthaClient struct {
+ LogicalDeviceFlows openflow_13.Flows
+ LogicalPorts LogicalPorts
+ LogicalDevice LogicalDevice
+}
+
+func (c MockVolthaClient) GetMembership(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Membership, error) {
+ return &Membership{}, nil
+}
+
+func (c MockVolthaClient) UpdateMembership(ctx context.Context, in *Membership, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) GetVoltha(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Voltha, error) {
+ return &Voltha{}, nil
+}
+
+func (c MockVolthaClient) ListCoreInstances(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CoreInstances, error) {
+ return &CoreInstances{}, nil
+}
+
+func (c MockVolthaClient) GetCoreInstance(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*CoreInstance, error) {
+ return &CoreInstance{}, nil
+}
+
+func (c MockVolthaClient) ListAdapters(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Adapters, error) {
+ return &Adapters{}, nil
+}
+
+func (c MockVolthaClient) ListLogicalDevices(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*LogicalDevices, error) {
+ return &LogicalDevices{}, nil
+}
+
+func (c MockVolthaClient) GetLogicalDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*LogicalDevice, error) {
+ return &c.LogicalDevice, nil
+}
+
+func (c MockVolthaClient) ListLogicalDevicePorts(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*LogicalPorts, error) {
+ return &c.LogicalPorts, nil
+}
+
+func (c MockVolthaClient) GetLogicalDevicePort(ctx context.Context, in *LogicalPortId, opts ...grpc.CallOption) (*LogicalPort, error) {
+ return &LogicalPort{}, nil
+}
+
+func (c MockVolthaClient) EnableLogicalDevicePort(ctx context.Context, in *LogicalPortId, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) DisableLogicalDevicePort(ctx context.Context, in *LogicalPortId, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) ListLogicalDeviceFlows(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*openflow_13.Flows, error) {
+ return &c.LogicalDeviceFlows, nil
+}
+
+func (c MockVolthaClient) UpdateLogicalDeviceFlowTable(ctx context.Context, in *openflow_13.FlowTableUpdate, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) UpdateLogicalDeviceMeterTable(ctx context.Context, in *openflow_13.MeterModUpdate, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) ListLogicalDeviceMeters(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*openflow_13.Meters, error) {
+ return &openflow_13.Meters{}, nil
+}
+
+func (c MockVolthaClient) ListLogicalDeviceFlowGroups(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*openflow_13.FlowGroups, error) {
+ return &openflow_13.FlowGroups{}, nil
+}
+
+func (c MockVolthaClient) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, in *openflow_13.FlowGroupTableUpdate, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) ListDevices(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Devices, error) {
+ return &Devices{}, nil
+}
+
+func (c MockVolthaClient) ListDeviceIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*common.IDs, error) {
+ return &common.IDs{}, nil
+}
+
+func (c MockVolthaClient) ReconcileDevices(ctx context.Context, in *common.IDs, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) GetDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Device, error) {
+ return &Device{}, nil
+}
+
+func (c MockVolthaClient) CreateDevice(ctx context.Context, in *Device, opts ...grpc.CallOption) (*Device, error) {
+ return &Device{}, nil
+}
+
+func (c MockVolthaClient) EnableDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) DisableDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) RebootDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) DeleteDevice(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) DownloadImage(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*common.OperationResp, error) {
+ return &common.OperationResp{}, nil
+}
+
+func (c MockVolthaClient) GetImageDownloadStatus(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*ImageDownload, error) {
+ return &ImageDownload{}, nil
+}
+
+func (c MockVolthaClient) GetImageDownload(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*ImageDownload, error) {
+ return &ImageDownload{}, nil
+}
+
+func (c MockVolthaClient) ListImageDownloads(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*ImageDownloads, error) {
+ return &ImageDownloads{}, nil
+}
+
+func (c MockVolthaClient) CancelImageDownload(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*common.OperationResp, error) {
+ return &common.OperationResp{}, nil
+}
+
+func (c MockVolthaClient) ActivateImageUpdate(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*common.OperationResp, error) {
+ return &common.OperationResp{}, nil
+}
+
+func (c MockVolthaClient) RevertImageUpdate(ctx context.Context, in *ImageDownload, opts ...grpc.CallOption) (*common.OperationResp, error) {
+ return &common.OperationResp{}, nil
+}
+
+func (c MockVolthaClient) ListDevicePorts(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Ports, error) {
+ return &Ports{}, nil
+}
+
+func (c MockVolthaClient) ListDevicePmConfigs(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*PmConfigs, error) {
+ return &PmConfigs{}, nil
+}
+
+func (c MockVolthaClient) UpdateDevicePmConfigs(ctx context.Context, in *PmConfigs, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) ListDeviceFlows(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*openflow_13.Flows, error) {
+ return &openflow_13.Flows{}, nil
+}
+
+func (c MockVolthaClient) ListDeviceFlowGroups(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*openflow_13.FlowGroups, error) {
+ return &openflow_13.FlowGroups{}, nil
+}
+
+func (c MockVolthaClient) ListDeviceTypes(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*DeviceTypes, error) {
+ return &DeviceTypes{}, nil
+}
+
+func (c MockVolthaClient) GetDeviceType(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*DeviceType, error) {
+ return &DeviceType{}, nil
+}
+
+func (c MockVolthaClient) ListDeviceGroups(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*DeviceGroups, error) {
+ return &DeviceGroups{}, nil
+}
+
+func (c MockVolthaClient) StreamPacketsOut(ctx context.Context, opts ...grpc.CallOption) (VolthaService_StreamPacketsOutClient, error) {
+ return nil, nil
+}
+
+func (c MockVolthaClient) ReceivePacketsIn(ctx context.Context, in *Empty, opts ...grpc.CallOption) (VolthaService_ReceivePacketsInClient, error) {
+ return nil, nil
+}
+
+func (c MockVolthaClient) ReceiveChangeEvents(ctx context.Context, in *Empty, opts ...grpc.CallOption) (VolthaService_ReceiveChangeEventsClient, error) {
+ return nil, nil
+}
+
+func (c MockVolthaClient) GetDeviceGroup(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*DeviceGroup, error) {
+ return &DeviceGroup{}, nil
+}
+
+func (c MockVolthaClient) CreateEventFilter(ctx context.Context, in *EventFilter, opts ...grpc.CallOption) (*EventFilter, error) {
+ return &EventFilter{}, nil
+}
+
+func (c MockVolthaClient) GetEventFilter(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*EventFilters, error) {
+ return &EventFilters{}, nil
+}
+
+func (c MockVolthaClient) UpdateEventFilter(ctx context.Context, in *EventFilter, opts ...grpc.CallOption) (*EventFilter, error) {
+ return &EventFilter{}, nil
+}
+
+func (c MockVolthaClient) DeleteEventFilter(ctx context.Context, in *EventFilter, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) ListEventFilters(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*EventFilters, error) {
+ return &EventFilters{}, nil
+}
+
+func (c MockVolthaClient) GetImages(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*Images, error) {
+ return &Images{}, nil
+}
+
+func (c MockVolthaClient) SelfTest(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*SelfTestResponse, error) {
+ return &SelfTestResponse{}, nil
+}
+
+func (c MockVolthaClient) GetMibDeviceData(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*omci.MibDeviceData, error) {
+ return &omci.MibDeviceData{}, nil
+}
+
+func (c MockVolthaClient) GetAlarmDeviceData(ctx context.Context, in *common.ID, opts ...grpc.CallOption) (*omci.AlarmDeviceData, error) {
+ return &omci.AlarmDeviceData{}, nil
+}
+
+func (c MockVolthaClient) SimulateAlarm(ctx context.Context, in *SimulateAlarmRequest, opts ...grpc.CallOption) (*common.OperationResp, error) {
+ return &common.OperationResp{}, nil
+}
+
+func (c MockVolthaClient) Subscribe(ctx context.Context, in *OfAgentSubscriber, opts ...grpc.CallOption) (*OfAgentSubscriber, error) {
+ return &OfAgentSubscriber{}, nil
+}
+
+func (c MockVolthaClient) EnablePort(ctx context.Context, in *Port, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) DisablePort(ctx context.Context, in *Port, opts ...grpc.CallOption) (*Empty, error) {
+ return &Empty{}, nil
+}
+
+func (c MockVolthaClient) GetExtValue(ctx context.Context, in *common.ValueSpecifier, opts ...grpc.CallOption) (*common.ReturnValues, error) {
+ return &common.ReturnValues{}, nil
+}
+
+func (c MockVolthaClient) StartOmciTestAction(ctx context.Context, in *OmciTestRequest, opts ...grpc.CallOption) (*TestResponse, error) {
+ return &TestResponse{}, nil
+}
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 5b44fa4..635c712 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -50,6 +50,12 @@
ofcRoleEqual
ofcRoleMaster
ofcRoleSlave
+
+ // according to testing this is the maximum content of an
+ // openflow message to remain under 64KB
+ ofcFlowsChunkSize = 450 // this amount of flows is around 57KB
+ ofcPortsChunkSize = 550 // this amount of port stats is around 61KB
+ ofcPortsDescChunkSize = 900 // this amount of port desc is around 57KB
)
func (e ofcEvent) String() string {
@@ -101,6 +107,10 @@
generationIsDefined bool
generationID uint64
roleLock sync.Mutex
+
+ flowsChunkSize int
+ portsChunkSize int
+ portsDescChunkSize int
}
type RoleManager interface {
@@ -168,6 +178,9 @@
ConnectionMaxRetries: config.ConnectionMaxRetries,
ConnectionRetryDelay: config.ConnectionRetryDelay,
connections: make(map[string]*OFConnection),
+ flowsChunkSize: ofcFlowsChunkSize,
+ portsChunkSize: ofcPortsChunkSize,
+ portsDescChunkSize: ofcPortsDescChunkSize,
}
if ofc.ConnectionRetryDelay <= 0 {
@@ -202,6 +215,9 @@
roleManager: ofc,
events: make(chan ofcEvent, 10),
sendChannel: make(chan Message, 100),
+ flowsChunkSize: ofc.flowsChunkSize,
+ portsChunkSize: ofc.portsChunkSize,
+ portsDescChunkSize: ofc.portsDescChunkSize,
}
ofc.connections[endpoint] = connection
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index 209990d..7ad6092 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -49,6 +49,10 @@
events chan ofcEvent
sendChannel chan Message
lastUnsentMessage Message
+
+ flowsChunkSize int
+ portsChunkSize int
+ portsDescChunkSize int
}
func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index bb6a941..9cd217e 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -374,7 +374,7 @@
flows = append(flows, entry)
}
var responses []*ofp.FlowStatsReply
- chunkSize := 150
+ chunkSize := ofc.flowsChunkSize
total := len(flows) / chunkSize
n := 0
for n <= total {
@@ -589,7 +589,7 @@
}
var responses []*ofp.PortStatsReply
- chunkSize := 500
+ chunkSize := ofc.portsChunkSize
total := len(entries) / chunkSize
n := 0
for n <= total {
@@ -650,7 +650,7 @@
}
var responses []*ofp.PortDescStatsReply
- chunkSize := 500
+ chunkSize := ofc.portsDescChunkSize
total := len(entries) / chunkSize
n := 0
for n <= total {
diff --git a/internal/pkg/openflow/stats_test.go b/internal/pkg/openflow/stats_test.go
new file mode 100644
index 0000000..1c82946
--- /dev/null
+++ b/internal/pkg/openflow/stats_test.go
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openflow
+
+import (
+ "fmt"
+ "github.com/opencord/goloxi"
+ "github.com/opencord/goloxi/of13"
+ "github.com/opencord/ofagent-go/internal/pkg/holder"
+ "github.com/opencord/ofagent-go/internal/pkg/mock"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "math"
+ "testing"
+)
+
+var msgSizeLimit = 64000
+
+func createEapolFlow(id int) openflow_13.OfpFlowStats {
+
+ portField := openflow_13.OfpOxmField{
+ OxmClass: openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+ Field: &openflow_13.OfpOxmField_OfbField{
+ OfbField: &openflow_13.OfpOxmOfbField{
+ Type: openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT,
+ Value: &openflow_13.OfpOxmOfbField_Port{Port: 16},
+ },
+ },
+ }
+
+ ethField := openflow_13.OfpOxmField{
+ OxmClass: openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+ Field: &openflow_13.OfpOxmField_OfbField{
+ OfbField: &openflow_13.OfpOxmOfbField{
+ Type: openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE,
+ Value: &openflow_13.OfpOxmOfbField_EthType{EthType: 2048},
+ },
+ },
+ }
+
+ vlanField := openflow_13.OfpOxmField{
+ OxmClass: openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+ Field: &openflow_13.OfpOxmField_OfbField{
+ OfbField: &openflow_13.OfpOxmOfbField{
+ Type: openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID,
+ Value: &openflow_13.OfpOxmOfbField_VlanVid{VlanVid: 4096},
+ },
+ },
+ }
+
+ return openflow_13.OfpFlowStats{
+ Id: uint64(id),
+ Priority: 10000,
+ Flags: 1,
+ Match: &openflow_13.OfpMatch{
+ Type: openflow_13.OfpMatchType_OFPMT_OXM,
+ OxmFields: []*openflow_13.OfpOxmField{
+ &portField, ðField, &vlanField,
+ },
+ },
+ Instructions: []*openflow_13.OfpInstruction{
+ {
+ Type: uint32(openflow_13.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+ Data: &openflow_13.OfpInstruction_Actions{
+ Actions: &openflow_13.OfpInstructionActions{
+ Actions: []*openflow_13.OfpAction{
+ {
+ Type: openflow_13.OfpActionType_OFPAT_OUTPUT,
+ Action: &openflow_13.OfpAction_Output{
+ Output: &openflow_13.OfpActionOutput{
+ Port: 4294967293,
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ Type: uint32(openflow_13.OfpInstructionType_OFPIT_WRITE_METADATA),
+ Data: &openflow_13.OfpInstruction_WriteMetadata{
+ WriteMetadata: &openflow_13.OfpInstructionWriteMetadata{
+ Metadata: 274877906944,
+ },
+ },
+ },
+ {
+ Type: uint32(openflow_13.OfpInstructionType_OFPIT_METER),
+ Data: &openflow_13.OfpInstruction_Meter{
+ Meter: &openflow_13.OfpInstructionMeter{
+ MeterId: 2,
+ },
+ },
+ },
+ },
+ }
+}
+
+func createRandomFlows(count int) []*openflow_13.OfpFlowStats {
+
+ var flows []*openflow_13.OfpFlowStats
+
+ n := 0
+ for n < count {
+
+ flow := createEapolFlow(n)
+
+ flows = append(flows, &flow)
+
+ n = n + 1
+ }
+ return flows
+}
+
+func createRandomPorts(count int) []*voltha.LogicalPort {
+ var ports []*voltha.LogicalPort
+
+ n := 0
+ for n < count {
+
+ port := voltha.LogicalPort{
+ Id: fmt.Sprintf("uni-%d", n),
+ OfpPort: &openflow_13.OfpPort{
+ PortNo: uint32(n),
+ HwAddr: []uint32{8, 0, 0, 0, 0, uint32(n)},
+ Name: fmt.Sprintf("BBSM-%d", n),
+ State: uint32(openflow_13.OfpPortState_OFPPS_LIVE),
+ Curr: 4128,
+ Advertised: 4128,
+ Peer: 4128,
+ CurrSpeed: 32,
+ MaxSpeed: 32,
+ Config: 1,
+ Supported: 1,
+ },
+ }
+
+ ports = append(ports, &port)
+
+ n = n + 1
+ }
+ return ports
+}
+
+func newTestOFConnection(flowsCount int, portsCount int) *OFConnection {
+
+ flows := openflow_13.Flows{
+ Items: createRandomFlows(flowsCount),
+ }
+
+ ports := voltha.LogicalPorts{
+ Items: createRandomPorts(portsCount),
+ }
+
+ logicalDevice := voltha.LogicalDevice{
+ Ports: createRandomPorts(portsCount),
+ }
+
+ volthaClient := mock.MockVolthaClient{
+ LogicalDeviceFlows: flows,
+ LogicalPorts: ports,
+ LogicalDevice: logicalDevice,
+ }
+
+ volthaClientHolder := &holder.VolthaServiceClientHolder{}
+ volthaClientHolder.Set(volthaClient)
+
+ return &OFConnection{
+ VolthaClient: volthaClientHolder,
+ flowsChunkSize: ofcFlowsChunkSize,
+ portsChunkSize: ofcPortsChunkSize,
+ portsDescChunkSize: ofcPortsDescChunkSize,
+ }
+}
+
+func TestHandleFlowStatsRequest(t *testing.T) {
+
+ generatedFlowsCount := 2000
+
+ ofc := newTestOFConnection(2000, 0)
+
+ request := of13.NewFlowStatsRequest()
+
+ replies, err := ofc.handleFlowStatsRequest(request)
+ assert.Equal(t, err, nil)
+
+ // check that the correct number of messages is generated
+ assert.Equal(t, int(math.Ceil(float64(generatedFlowsCount)/ofcFlowsChunkSize)), len(replies))
+
+ n := 1
+ entriesCount := 0
+
+ for _, r := range replies {
+ json, _ := r.Flags.MarshalJSON()
+
+ // check that the ReplyMore flag is correctly set in the messages
+ if n == len(replies) {
+ assert.Equal(t, string(json), "{}")
+ } else {
+ assert.Equal(t, string(json), "{\"OFPSFReplyMore\": true}")
+ }
+
+ // check the message size
+ enc := goloxi.NewEncoder()
+ if err := r.Serialize(enc); err != nil {
+ t.Fail()
+ }
+ bytes := enc.Bytes()
+ fmt.Println("FlowStats msg size: ", len(bytes))
+ if len(bytes) > msgSizeLimit {
+ t.Fatal("Message size is bigger than 64KB")
+ }
+
+ entriesCount = entriesCount + len(r.GetEntries())
+ n++
+ }
+
+ // make sure all the generate item are included in the responses
+ assert.Equal(t, generatedFlowsCount, entriesCount)
+}
+
+func TestHandlePortStatsRequest(t *testing.T) {
+
+ generatedPortsCount := 2560
+
+ ofc := newTestOFConnection(0, generatedPortsCount)
+
+ request := of13.NewPortStatsRequest()
+
+ // request stats for all ports
+ request.PortNo = 0xffffffff
+
+ replies, err := ofc.handlePortStatsRequest(request)
+ assert.Equal(t, err, nil)
+
+ assert.Equal(t, int(math.Ceil(float64(generatedPortsCount)/ofcPortsChunkSize)), len(replies))
+
+ n := 1
+ entriesCount := 0
+
+ for _, r := range replies {
+ json, _ := r.Flags.MarshalJSON()
+
+ // check that the ReplyMore flag is correctly set in the messages
+ if n == len(replies) {
+ assert.Equal(t, string(json), "{}")
+ } else {
+ assert.Equal(t, string(json), "{\"OFPSFReplyMore\": true}")
+ }
+
+ // check the message size
+ enc := goloxi.NewEncoder()
+ if err := r.Serialize(enc); err != nil {
+ t.Fail()
+ }
+ bytes := enc.Bytes()
+ fmt.Println("PortStats msg size: ", len(bytes))
+ if len(bytes) > msgSizeLimit {
+ t.Fatal("Message size is bigger than 64KB")
+ }
+
+ entriesCount = entriesCount + len(r.GetEntries())
+ n++
+ }
+
+ // make sure all the generate item are included in the responses
+ assert.Equal(t, generatedPortsCount, entriesCount)
+}
+
+func TestHandlePortDescStatsRequest(t *testing.T) {
+
+ generatedPortsCount := 2560
+
+ ofc := newTestOFConnection(0, generatedPortsCount)
+
+ request := of13.NewPortDescStatsRequest()
+
+ replies, err := ofc.handlePortDescStatsRequest(request)
+ assert.Equal(t, err, nil)
+
+ // check that the correct number of messages is generated
+ assert.Equal(t, int(math.Ceil(float64(generatedPortsCount)/ofcPortsDescChunkSize)), len(replies))
+
+ n := 1
+ entriesCount := 0
+
+ for _, r := range replies {
+ json, _ := r.Flags.MarshalJSON()
+
+ // check that the ReplyMore flag is correctly set in the messages
+ if n == len(replies) {
+ assert.Equal(t, string(json), "{}")
+ } else {
+ assert.Equal(t, string(json), "{\"OFPSFReplyMore\": true}")
+ }
+
+ // check the message size
+ enc := goloxi.NewEncoder()
+ if err := r.Serialize(enc); err != nil {
+ t.Fail()
+ }
+ bytes := enc.Bytes()
+ fmt.Println("PortDesc msg size: ", len(bytes))
+ if len(bytes) > msgSizeLimit {
+ t.Fatal("Message size is bigger than 64KB")
+ }
+
+ entriesCount = entriesCount + len(r.GetEntries())
+ n++
+ }
+
+ // make sure all the generate item are included in the responses
+ assert.Equal(t, generatedPortsCount, entriesCount)
+}