VOL-3029 - Changed flow hashing to only take into account tableId, priority, and match.
Also optimized hash computation.
Also modified FlowMatch() and FindFlows() to use a simple comparison based on the hash (flow.Id).
Change-Id: Ic82f340405fd3eb2d0a683e3d552759145097f8f
diff --git a/VERSION b/VERSION
index 7148b0a..c7a2498 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.9
+3.1.10
diff --git a/pkg/flows/flow_utils.go b/pkg/flows/flow_utils.go
index b2086cd..3139144 100644
--- a/pkg/flows/flow_utils.go
+++ b/pkg/flows/flow_utils.go
@@ -18,14 +18,15 @@
import (
"bytes"
"crypto/md5"
- "errors"
+ "encoding/binary"
"fmt"
+ "hash"
+ "sort"
+
"github.com/cevaris/ordered_map"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "math/big"
- "strings"
)
var (
@@ -677,25 +678,229 @@
return 0
}
-// Return unique 64-bit integer hash for flow covering the following attributes:
-// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+// HashFlowStats returns a unique 64-bit integer hash of 'table_id', 'priority', and 'match'
+// The OF spec states that:
+// A flow table entry is identified by its match fields and priority: the match fields
+// and priority taken together identify a unique flow entry in the flow table.
func HashFlowStats(flow *ofp.OfpFlowStats) (uint64, error) {
- if flow == nil { // Should never happen
- return 0, errors.New("hash-flow-stats-nil-flow")
+ // first we need to make sure the oxm fields are in a predictable order (the specific order doesn't matter)
+ sort.Slice(flow.Match.OxmFields, func(a, b int) bool {
+ fieldsA, fieldsB := flow.Match.OxmFields[a], flow.Match.OxmFields[b]
+ if fieldsA.OxmClass < fieldsB.OxmClass {
+ return true
+ }
+ switch fieldA := fieldsA.Field.(type) {
+ case *ofp.OfpOxmField_OfbField:
+ switch fieldB := fieldsB.Field.(type) {
+ case *ofp.OfpOxmField_ExperimenterField:
+ return true // ofp < experimenter
+ case *ofp.OfpOxmField_OfbField:
+ return fieldA.OfbField.Type < fieldB.OfbField.Type
+ }
+ case *ofp.OfpOxmField_ExperimenterField:
+ switch fieldB := fieldsB.Field.(type) {
+ case *ofp.OfpOxmField_OfbField:
+ return false // ofp < experimenter
+ case *ofp.OfpOxmField_ExperimenterField:
+ eFieldA, eFieldB := fieldA.ExperimenterField, fieldB.ExperimenterField
+ if eFieldA.Experimenter != eFieldB.Experimenter {
+ return eFieldA.Experimenter < eFieldB.Experimenter
+ }
+ return eFieldA.OxmHeader < eFieldB.OxmHeader
+ }
+ }
+ return false
+ })
+
+ md5Hash := md5.New() // note that write errors will never occur with md5 hashing
+ var tmp [12]byte
+
+ binary.BigEndian.PutUint32(tmp[0:4], flow.TableId) // tableId
+ binary.BigEndian.PutUint32(tmp[4:8], flow.Priority) // priority
+ binary.BigEndian.PutUint32(tmp[8:12], uint32(flow.Match.Type)) // match type
+ _, _ = md5Hash.Write(tmp[:12])
+
+ for _, field := range flow.Match.OxmFields { // for all match fields
+ binary.BigEndian.PutUint32(tmp[:4], uint32(field.OxmClass)) // match class
+ _, _ = md5Hash.Write(tmp[:4])
+
+ switch oxmField := field.Field.(type) {
+ case *ofp.OfpOxmField_ExperimenterField:
+ binary.BigEndian.PutUint32(tmp[0:4], oxmField.ExperimenterField.Experimenter)
+ binary.BigEndian.PutUint32(tmp[4:8], oxmField.ExperimenterField.OxmHeader)
+ _, _ = md5Hash.Write(tmp[:8])
+
+ case *ofp.OfpOxmField_OfbField:
+ if err := hashWriteOfbField(md5Hash, oxmField.OfbField); err != nil {
+ return 0, err
+ }
+
+ default:
+ return 0, fmt.Errorf("unknown OfpOxmField type: %T", field.Field)
+ }
}
- // Create string with the instructions field first
- var instructionString bytes.Buffer
- for _, instruction := range flow.Instructions {
- instructionString.WriteString(instruction.String())
+
+ ret := md5Hash.Sum(nil)
+ return binary.BigEndian.Uint64(ret[0:8]), nil
+}
+
+func hashWriteOfbField(md5Hash hash.Hash, field *ofp.OfpOxmOfbField) error {
+ var tmp [8]byte
+ binary.BigEndian.PutUint32(tmp[:4], uint32(field.Type)) // type
+ _, _ = md5Hash.Write(tmp[:4])
+
+ // value
+ valType, val32, val64, valSlice := uint8(0), uint32(0), uint64(0), []byte(nil)
+ switch val := field.Value.(type) {
+ case *ofp.OfpOxmOfbField_Port:
+ valType, val32 = 4, val.Port
+ case *ofp.OfpOxmOfbField_PhysicalPort:
+ valType, val32 = 4, val.PhysicalPort
+ case *ofp.OfpOxmOfbField_TableMetadata:
+ valType, val64 = 8, val.TableMetadata
+ case *ofp.OfpOxmOfbField_EthDst:
+ valType, valSlice = 1, val.EthDst
+ case *ofp.OfpOxmOfbField_EthSrc:
+ valType, valSlice = 1, val.EthSrc
+ case *ofp.OfpOxmOfbField_EthType:
+ valType, val32 = 4, val.EthType
+ case *ofp.OfpOxmOfbField_VlanVid:
+ valType, val32 = 4, val.VlanVid
+ case *ofp.OfpOxmOfbField_VlanPcp:
+ valType, val32 = 4, val.VlanPcp
+ case *ofp.OfpOxmOfbField_IpDscp:
+ valType, val32 = 4, val.IpDscp
+ case *ofp.OfpOxmOfbField_IpEcn:
+ valType, val32 = 4, val.IpEcn
+ case *ofp.OfpOxmOfbField_IpProto:
+ valType, val32 = 4, val.IpProto
+ case *ofp.OfpOxmOfbField_Ipv4Src:
+ valType, val32 = 4, val.Ipv4Src
+ case *ofp.OfpOxmOfbField_Ipv4Dst:
+ valType, val32 = 4, val.Ipv4Dst
+ case *ofp.OfpOxmOfbField_TcpSrc:
+ valType, val32 = 4, val.TcpSrc
+ case *ofp.OfpOxmOfbField_TcpDst:
+ valType, val32 = 4, val.TcpDst
+ case *ofp.OfpOxmOfbField_UdpSrc:
+ valType, val32 = 4, val.UdpSrc
+ case *ofp.OfpOxmOfbField_UdpDst:
+ valType, val32 = 4, val.UdpDst
+ case *ofp.OfpOxmOfbField_SctpSrc:
+ valType, val32 = 4, val.SctpSrc
+ case *ofp.OfpOxmOfbField_SctpDst:
+ valType, val32 = 4, val.SctpDst
+ case *ofp.OfpOxmOfbField_Icmpv4Type:
+ valType, val32 = 4, val.Icmpv4Type
+ case *ofp.OfpOxmOfbField_Icmpv4Code:
+ valType, val32 = 4, val.Icmpv4Code
+ case *ofp.OfpOxmOfbField_ArpOp:
+ valType, val32 = 4, val.ArpOp
+ case *ofp.OfpOxmOfbField_ArpSpa:
+ valType, val32 = 4, val.ArpSpa
+ case *ofp.OfpOxmOfbField_ArpTpa:
+ valType, val32 = 4, val.ArpTpa
+ case *ofp.OfpOxmOfbField_ArpSha:
+ valType, valSlice = 1, val.ArpSha
+ case *ofp.OfpOxmOfbField_ArpTha:
+ valType, valSlice = 1, val.ArpTha
+ case *ofp.OfpOxmOfbField_Ipv6Src:
+ valType, valSlice = 1, val.Ipv6Src
+ case *ofp.OfpOxmOfbField_Ipv6Dst:
+ valType, valSlice = 1, val.Ipv6Dst
+ case *ofp.OfpOxmOfbField_Ipv6Flabel:
+ valType, val32 = 4, val.Ipv6Flabel
+ case *ofp.OfpOxmOfbField_Icmpv6Type:
+ valType, val32 = 4, val.Icmpv6Type
+ case *ofp.OfpOxmOfbField_Icmpv6Code:
+ valType, val32 = 4, val.Icmpv6Code
+ case *ofp.OfpOxmOfbField_Ipv6NdTarget:
+ valType, valSlice = 1, val.Ipv6NdTarget
+ case *ofp.OfpOxmOfbField_Ipv6NdSsl:
+ valType, valSlice = 1, val.Ipv6NdSsl
+ case *ofp.OfpOxmOfbField_Ipv6NdTll:
+ valType, valSlice = 1, val.Ipv6NdTll
+ case *ofp.OfpOxmOfbField_MplsLabel:
+ valType, val32 = 4, val.MplsLabel
+ case *ofp.OfpOxmOfbField_MplsTc:
+ valType, val32 = 4, val.MplsTc
+ case *ofp.OfpOxmOfbField_MplsBos:
+ valType, val32 = 4, val.MplsBos
+ case *ofp.OfpOxmOfbField_PbbIsid:
+ valType, val32 = 4, val.PbbIsid
+ case *ofp.OfpOxmOfbField_TunnelId:
+ valType, val64 = 8, val.TunnelId
+ case *ofp.OfpOxmOfbField_Ipv6Exthdr:
+ valType, val32 = 4, val.Ipv6Exthdr
+ default:
+ return fmt.Errorf("unknown OfpOxmField value type: %T", val)
}
- var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
- h := md5.New()
- if _, err := h.Write([]byte(flowString)); err != nil {
- return 0, fmt.Errorf("hash-flow-stats-failed-hash: %v", err)
+ switch valType {
+ case 1: // slice
+ _, _ = md5Hash.Write(valSlice)
+ case 4: // uint32
+ binary.BigEndian.PutUint32(tmp[:4], val32)
+ _, _ = md5Hash.Write(tmp[:4])
+ case 8: // uint64
+ binary.BigEndian.PutUint64(tmp[:8], val64)
+ _, _ = md5Hash.Write(tmp[:8])
}
- hash := big.NewInt(0)
- hash.SetBytes(h.Sum(nil))
- return hash.Uint64(), nil
+
+ // mask
+ if !field.HasMask {
+ tmp[0] = 0x00
+ _, _ = md5Hash.Write(tmp[:1]) // match hasMask = false
+ } else {
+ tmp[0] = 0x01
+ _, _ = md5Hash.Write(tmp[:1]) // match hasMask = true
+
+ maskType, mask32, mask64, maskSlice := uint8(0), uint32(0), uint64(0), []byte(nil)
+ switch mask := field.Mask.(type) {
+ case *ofp.OfpOxmOfbField_TableMetadataMask:
+ maskType, mask64 = 8, mask.TableMetadataMask
+ case *ofp.OfpOxmOfbField_EthDstMask:
+ maskType, maskSlice = 1, mask.EthDstMask
+ case *ofp.OfpOxmOfbField_EthSrcMask:
+ maskType, maskSlice = 1, mask.EthSrcMask
+ case *ofp.OfpOxmOfbField_VlanVidMask:
+ maskType, mask32 = 4, mask.VlanVidMask
+ case *ofp.OfpOxmOfbField_Ipv4SrcMask:
+ maskType, mask32 = 4, mask.Ipv4SrcMask
+ case *ofp.OfpOxmOfbField_Ipv4DstMask:
+ maskType, mask32 = 4, mask.Ipv4DstMask
+ case *ofp.OfpOxmOfbField_ArpSpaMask:
+ maskType, mask32 = 4, mask.ArpSpaMask
+ case *ofp.OfpOxmOfbField_ArpTpaMask:
+ maskType, mask32 = 4, mask.ArpTpaMask
+ case *ofp.OfpOxmOfbField_Ipv6SrcMask:
+ maskType, maskSlice = 1, mask.Ipv6SrcMask
+ case *ofp.OfpOxmOfbField_Ipv6DstMask:
+ maskType, maskSlice = 1, mask.Ipv6DstMask
+ case *ofp.OfpOxmOfbField_Ipv6FlabelMask:
+ maskType, mask32 = 4, mask.Ipv6FlabelMask
+ case *ofp.OfpOxmOfbField_PbbIsidMask:
+ maskType, mask32 = 4, mask.PbbIsidMask
+ case *ofp.OfpOxmOfbField_TunnelIdMask:
+ maskType, mask64 = 8, mask.TunnelIdMask
+ case *ofp.OfpOxmOfbField_Ipv6ExthdrMask:
+ maskType, mask32 = 4, mask.Ipv6ExthdrMask
+ case nil:
+ return fmt.Errorf("hasMask set to true, but no mask present")
+ default:
+ return fmt.Errorf("unknown OfpOxmField mask type: %T", mask)
+ }
+ switch maskType {
+ case 1: // slice
+ _, _ = md5Hash.Write(maskSlice)
+ case 4: // uint32
+ binary.BigEndian.PutUint32(tmp[:4], mask32)
+ _, _ = md5Hash.Write(tmp[:4])
+ case 8: // uint64
+ binary.BigEndian.PutUint64(tmp[:8], mask64)
+ _, _ = md5Hash.Write(tmp[:8])
+ }
+ }
+ return nil
}
// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
@@ -1184,7 +1389,7 @@
// FindFlows returns the index in flows where flow if present. Otherwise, it returns -1
func FindFlows(flows []*ofp.OfpFlowStats, flow *ofp.OfpFlowStats) int {
for idx, f := range flows {
- if FlowMatch(f, flow) {
+ if f.Id == flow.Id {
return idx
}
}
@@ -1194,35 +1399,7 @@
//FlowMatch returns true if two flows matches on the following flow attributes:
//TableId, Priority, Flags, Cookie, Match
func FlowMatch(f1 *ofp.OfpFlowStats, f2 *ofp.OfpFlowStats) bool {
- if f1 == nil || f2 == nil {
- return false
- }
- keysMatter := []string{"TableId", "Priority", "Flags", "Cookie", "Match"}
- for _, key := range keysMatter {
- switch key {
- case "TableId":
- if f1.TableId != f2.TableId {
- return false
- }
- case "Priority":
- if f1.Priority != f2.Priority {
- return false
- }
- case "Flags":
- if f1.Flags != f2.Flags {
- return false
- }
- case "Cookie":
- if f1.Cookie != f2.Cookie {
- return false
- }
- case "Match":
- if strings.Compare(f1.Match.String(), f2.Match.String()) != 0 {
- return false
- }
- }
- }
- return true
+ return f1 != nil && f2 != nil && f1.Id == f2.Id
}
//FlowMatchesMod returns True if given flow is "covered" by the wildcard flow_mod, taking into consideration of
diff --git a/pkg/flows/flow_utils_test.go b/pkg/flows/flow_utils_test.go
index c4e481d..ec00cb1 100644
--- a/pkg/flows/flow_utils_test.go
+++ b/pkg/flows/flow_utils_test.go
@@ -37,11 +37,6 @@
timeoutError = status.Errorf(codes.Aborted, "timeout")
}
-func TestHashFlowStatsNil(t *testing.T) {
- _, err := HashFlowStats(nil)
- assert.EqualError(t, err, "hash-flow-stats-nil-flow")
-}
-
func TestFlowsAndGroups_AddFlow(t *testing.T) {
fg := NewFlowsAndGroups()
allFlows := fg.ListFlows()
@@ -261,7 +256,7 @@
fg.AddGroup(group)
str = fg.String()
- assert.True(t, strings.Contains(str, "id: 1143307409938767207"))
+ assert.True(t, strings.Contains(str, "id: 11819684229970388353"))
assert.True(t, strings.Contains(str, "group_id: 10"))
assert.True(t, strings.Contains(str, "oxm_class: OFPXMC_OPENFLOW_BASICOFPXMC_OPENFLOW_BASIC"))
assert.True(t, strings.Contains(str, "type: OFPXMT_OFB_VLAN_VIDOFPXMT_OFB_VLAN_VID"))
@@ -503,6 +498,7 @@
assert.Nil(t, err)
assert.False(t, FlowMatch(flow1, nil))
+ // different table_id, cookie, flags
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -521,6 +517,7 @@
assert.False(t, FlowMatch(flow1, flow2))
assert.False(t, FlowMatch(nil, flow2))
+ // no difference
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -535,6 +532,7 @@
assert.Nil(t, err)
assert.True(t, FlowMatch(flow1, flow2))
+ // different priority
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 501, "table_id": 1, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -552,6 +550,7 @@
assert.Nil(t, err)
assert.False(t, FlowMatch(flow1, flow2))
+ // different table id
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 2, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -566,6 +565,7 @@
assert.Nil(t, err)
assert.False(t, FlowMatch(flow1, flow2))
+ // different cookie
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268467, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -578,8 +578,9 @@
}
flow2, err = MkFlowStat(fa)
assert.Nil(t, err)
- assert.False(t, FlowMatch(flow1, flow2))
+ assert.True(t, FlowMatch(flow1, flow2))
+ // different flags
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 14},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -592,8 +593,9 @@
}
flow2, err = MkFlowStat(fa)
assert.Nil(t, err)
- assert.False(t, FlowMatch(flow1, flow2))
+ assert.True(t, FlowMatch(flow1, flow2))
+ // different match InPort
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -608,6 +610,7 @@
assert.Nil(t, err)
assert.False(t, FlowMatch(flow1, flow2))
+ // different match Ipv4Dst
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{
@@ -621,6 +624,7 @@
assert.Nil(t, err)
assert.False(t, FlowMatch(flow1, flow2))
+ // different actions
fa = &FlowArgs{
KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
MatchFields: []*ofp.OfpOxmOfbField{