Adding new ONOS flow addition test-cases.
Also verifies traffic after adding mac and ipv4 selector flows in ONOS
diff --git a/src/test/flows/__init__.py b/src/test/flows/__init__.py
new file mode 100644
index 0000000..a881eb6
--- /dev/null
+++ b/src/test/flows/__init__.py
@@ -0,0 +1,7 @@
+import os,sys
+##add the python path to lookup the utils
+working_dir = os.path.dirname(os.path.realpath(sys.argv[-1]))
+utils_dir = os.path.join(working_dir, '../utils')
+fsm_dir = os.path.join(working_dir, '../fsm')
+__path__.append(utils_dir)
+__path__.append(fsm_dir)
diff --git a/src/test/flows/flowsTest.py b/src/test/flows/flowsTest.py
new file mode 100644
index 0000000..9b0d96d
--- /dev/null
+++ b/src/test/flows/flowsTest.py
@@ -0,0 +1,108 @@
+import unittest
+from nose.tools import *
+from nose.twistedtools import reactor, deferred
+from twisted.internet import defer
+from scapy.all import *
+import time
+import json
+import threading
+import fcntl, socket, struct
+from OnosCtrl import OnosCtrl
+from OnosFlowCtrl import OnosFlowCtrl
+from OltConfig import OltConfig
+log.setLevel('INFO')
+
+def get_mac(iface = 'ovsbr0', pad = 4):
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15]))
+ return '0'*pad + ''.join(['%02x' %ord(char) for char in info[18:24]])
+
+class flows_exchange(unittest.TestCase):
+
+ #Use the first available device id as our device id to program flows
+ app = 'org.onosproject.cli'
+ PORT_TX_DEFAULT = 2
+ PORT_RX_DEFAULT = 1
+ INTF_TX_DEFAULT = 'veth2'
+ INTF_RX_DEFAULT = 'veth0'
+ default_port_map = {
+ PORT_TX_DEFAULT : INTF_TX_DEFAULT,
+ PORT_RX_DEFAULT : INTF_RX_DEFAULT,
+ INTF_TX_DEFAULT : PORT_TX_DEFAULT,
+ INTF_RX_DEFAULT : PORT_RX_DEFAULT
+ }
+
+ @classmethod
+ def setUpClass(cls):
+ cls.olt = OltConfig()
+ cls.port_map = cls.olt.olt_port_map()
+ if not cls.port_map:
+ cls.port_map = cls.default_port_map
+ cls.device_id = get_mac() ##match against our device id
+
+ def test_flow_mac(self):
+ '''Add and verify flows with MAC selectors'''
+ egress = 1
+ ingress = 2
+ egress_mac = '00:00:00:00:00:01'
+ ingress_mac = '00:00:00:00:00:02'
+ flow = OnosFlowCtrl(deviceId = self.device_id,
+ egressPort = egress,
+ ingressPort = ingress,
+ ethSrc = ingress_mac,
+ ethDst = egress_mac)
+ result = flow.addFlow()
+ assert_equal(result, True)
+ ##wait for flows to be added to ONOS
+ time.sleep(3)
+ self.success = False
+ def mac_recv_task():
+ def recv_cb(pkt):
+ log.info('Pkt seen with ingress mac %s, egress mac %s' %(pkt.src, pkt.dst))
+ self.success = True
+ sniff(count=2, timeout=5, lfilter = lambda p: p.src == ingress_mac,
+ prn = recv_cb, iface = self.port_map[egress])
+
+ t = threading.Thread(target = mac_recv_task)
+ t.start()
+ pkt = Ether(src = ingress_mac, dst = egress_mac)/IP()
+ log.info('Sending a packet to verify if flows are correct')
+ sendp(pkt, count=50, iface = self.port_map[ingress])
+ t.join()
+ assert_equal(self.success, True)
+
+ def test_flow_ip(self):
+ '''Add and verify flows with IPv4 selectors'''
+ egress = 1
+ ingress = 2
+ egress_map = { 'ether': '00:00:00:00:00:03', 'ip': '192.168.30.1' }
+ ingress_map = { 'ether': '00:00:00:00:00:04', 'ip': '192.168.40.1' }
+ flow = OnosFlowCtrl(deviceId = self.device_id,
+ egressPort = egress,
+ ingressPort = ingress,
+ ethType = '0x0800',
+ ipSrc = ('IPV4_SRC', ingress_map['ip']+'/32'),
+ ipDst = ('IPV4_DST', egress_map['ip']+'/32')
+ )
+ result = flow.addFlow()
+ assert_equal(result, True)
+ ##wait for flows to be added to ONOS
+ time.sleep(3)
+ self.success = False
+ def mac_recv_task():
+ def recv_cb(pkt):
+ log.info('Pkt seen with ingress ip %s, egress ip %s' %(pkt[IP].src, pkt[IP].dst))
+ self.success = True
+ sniff(count=2, timeout=5,
+ lfilter = lambda p: p[IP].dst == egress_map['ip'] and p[IP].src == ingress_map['ip'],
+ prn = recv_cb, iface = self.port_map[egress])
+
+ t = threading.Thread(target = mac_recv_task)
+ t.start()
+ L2 = Ether(src = ingress_map['ether'], dst = egress_map['ether'])
+ L3 = IP(src = ingress_map['ip'], dst = egress_map['ip'])
+ pkt = L2/L3
+ log.info('Sending a packet to verify if flows are correct')
+ sendp(pkt, count=50, iface = self.port_map[ingress])
+ t.join()
+ assert_equal(self.success, True)
diff --git a/src/test/utils/OnosFlowCtrl.py b/src/test/utils/OnosFlowCtrl.py
new file mode 100644
index 0000000..1956392
--- /dev/null
+++ b/src/test/utils/OnosFlowCtrl.py
@@ -0,0 +1,207 @@
+import json
+import requests
+import os,sys,time
+from nose.tools import *
+from scapy.all import *
+from OnosCtrl import OnosCtrl
+
+class OnosFlowCtrl:
+
+ auth = ('karaf', 'karaf')
+ controller = os.getenv('ONOS_CONTROLLER_IP') or 'localhost'
+ cfg_url = 'http://%s:8181/onos/v1/flows/' %(controller)
+
+ def __init__( self,
+ deviceId,
+ appId=0,
+ ingressPort="",
+ egressPort="",
+ ethType="",
+ ethSrc="",
+ ethDst="",
+ vlan="",
+ ipProto="",
+ ipSrc=(),
+ ipDst=(),
+ tcpSrc="",
+ tcpDst="",
+ udpDst="",
+ udpSrc="",
+ mpls=""):
+ self.deviceId = deviceId
+ self.appId = appId
+ self.ingressPort = ingressPort
+ self.egressPort = egressPort
+ self.ethType = ethType
+ self.ethSrc = ethSrc
+ self.ethDst = ethDst
+ self.vlan = vlan
+ self.ipProto = ipProto
+ self.ipSrc = ipSrc
+ self.ipDst = ipDst
+ self.tcpSrc = tcpSrc
+ self.tcpDst = tcpDst
+ self.udpDst = udpDst
+ self.udpSrc = udpSrc
+ self.mpls = mpls
+
+ @classmethod
+ def get_flows(cls, device_id):
+ return OnosCtrl.get_flows(device_id)
+
+ def addFlow(self):
+ """
+ Description:
+ Creates a single flow in the specified device
+ Required:
+ * deviceId: id of the device
+ Optional:
+ * ingressPort: port ingress device
+ * egressPort: port of egress device
+ * ethType: specify ethType
+ * ethSrc: specify ethSrc ( i.e. src mac addr )
+ * ethDst: specify ethDst ( i.e. dst mac addr )
+ * ipProto: specify ip protocol
+ * ipSrc: specify ip source address with mask eg. ip#/24
+ as a tuple (type, ip#)
+ * ipDst: specify ip destination address eg. ip#/24
+ as a tuple (type, ip#)
+ * tcpSrc: specify tcp source port
+ * tcpDst: specify tcp destination port
+ Returns:
+ True for successful requests;
+ False for failure/error on requests
+ """
+ flowJson = { "priority":100,
+ "isPermanent":"true",
+ "timeout":0,
+ "deviceId":self.deviceId,
+ "treatment":{"instructions":[]},
+ "selector": {"criteria":[]}}
+ if self.appId:
+ flowJson[ "appId" ] = self.appId
+
+ if self.egressPort:
+ flowJson[ 'treatment' ][ 'instructions' ].append( {
+ "type":"OUTPUT",
+ "port":self.egressPort } )
+ if self.ingressPort:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"IN_PORT",
+ "port":self.ingressPort } )
+ if self.ethType:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"ETH_TYPE",
+ "ethType":self.ethType } )
+ if self.ethSrc:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"ETH_SRC",
+ "mac":self.ethSrc } )
+ if self.ethDst:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"ETH_DST",
+ "mac":self.ethDst } )
+ if self.vlan:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"VLAN_VID",
+ "vlanId":self.vlan } )
+ if self.mpls:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"MPLS_LABEL",
+ "label":self.mpls } )
+ if self.ipSrc:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":self.ipSrc[0],
+ "ip":self.ipSrc[1] } )
+ if self.ipDst:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":self.ipDst[0],
+ "ip":self.ipDst[1] } )
+ if self.tcpSrc:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"TCP_SRC",
+ "tcpPort": self.tcpSrc } )
+ if self.tcpDst:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"TCP_DST",
+ "tcpPort": self.tcpDst } )
+ if self.udpSrc:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"UDP_SRC",
+ "udpPort": self.udpSrc } )
+ if self.udpDst:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"UDP_DST",
+ "udpPort": self.udpDst } )
+ if self.ipProto:
+ flowJson[ 'selector' ][ 'criteria' ].append( {
+ "type":"IP_PROTO",
+ "protocol": self.ipProto } )
+
+ return self.sendFlow( deviceId=self.deviceId, flowJson=flowJson)
+
+ def removeFlow(self, deviceId, flowId):
+ """
+ Description:
+ Remove specific device flow
+ Required:
+ str deviceId - id of the device
+ str flowId - id of the flow
+ Return:
+ Returns True if successfully deletes flows, otherwise False
+ """
+ # NOTE: REST url requires the intent id to be in decimal form
+ query = self.cfg_url + str( deviceId ) + '/' + str( int( flowId ) )
+ response = requests.delete(query, auth = self.auth)
+ if response:
+ if 200 <= response.status_code <= 299:
+ return True
+ else:
+ return False
+
+ return True
+
+ def findFlow(self, deviceId, **criterias):
+ flows = self.get_flows(deviceId)
+ match_keys = criterias.keys()
+ matches = len(match_keys)
+ num_matched = 0
+ for f in flows:
+ criteria = f['selector']['criteria']
+ for c in criteria:
+ if c['type'] not in match_keys:
+ continue
+ match_key, match_val = criterias.get(c['type'])
+ val = c[match_key]
+ if val == match_val:
+ num_matched += 1
+ if num_matched == matches:
+ return f['id']
+ return None
+
+ def sendFlow(self, deviceId, flowJson):
+ """
+ Description:
+ Sends a single flow to the specified device. This function exists
+ so you can bypass the addFLow driver and send your own custom flow.
+ Required:
+ * The flow in json
+ * the device id to add the flow to
+ Returns:
+ True for successful requests
+ False for error on requests;
+ """
+ url = self.cfg_url + str(deviceId)
+ response = requests.post(url, auth = self.auth, data = json.dumps(flowJson) )
+ if response.ok:
+ if response.status_code in [200, 201]:
+ log.info('Successfully POSTED flow for device %s' %str(deviceId))
+ return True
+ else:
+ log.info('Post flow for device %s failed with status %d' %(str(deviceId),
+ response.status_code))
+ return False
+ else:
+ log.error('Flow post request returned with status %d' %response.status_code)
+
+ return False