blob: 88a7d0b5849256e4eafd964a1abb27f04e935919 [file] [log] [blame]
import unittest
from nose.tools import *
from nose.twistedtools import reactor, deferred
from twisted.internet import defer
from scapy.all import *
import time
import json
import threading
from OnosCtrl import OnosCtrl
from OnosFlowCtrl import OnosFlowCtrl, get_mac
from OltConfig import OltConfig
log.setLevel('INFO')
class flows_exchange(unittest.TestCase):
#Use the first available device id as our device id to program flows
app = 'org.onosproject.cli'
PORT_TX_DEFAULT = 2
PORT_RX_DEFAULT = 1
INTF_TX_DEFAULT = 'veth2'
INTF_RX_DEFAULT = 'veth0'
default_port_map = {
PORT_TX_DEFAULT : INTF_TX_DEFAULT,
PORT_RX_DEFAULT : INTF_RX_DEFAULT,
INTF_TX_DEFAULT : PORT_TX_DEFAULT,
INTF_RX_DEFAULT : PORT_RX_DEFAULT
}
@classmethod
def setUpClass(cls):
cls.olt = OltConfig()
cls.port_map = cls.olt.olt_port_map()
if not cls.port_map:
cls.port_map = cls.default_port_map
cls.device_id = 'of:' + get_mac() ##match against our device id
def test_flow_mac(self):
'''Add and verify flows with MAC selectors'''
egress = 1
ingress = 2
egress_mac = '00:00:00:00:00:01'
ingress_mac = '00:00:00:00:00:02'
flow = OnosFlowCtrl(deviceId = self.device_id,
egressPort = egress,
ingressPort = ingress,
ethSrc = ingress_mac,
ethDst = egress_mac)
result = flow.addFlow()
assert_equal(result, True)
##wait for flows to be added to ONOS
time.sleep(3)
self.success = False
def mac_recv_task():
def recv_cb(pkt):
log.info('Pkt seen with ingress mac %s, egress mac %s' %(pkt.src, pkt.dst))
self.success = True
sniff(count=2, timeout=5, lfilter = lambda p: p.src == ingress_mac,
prn = recv_cb, iface = self.port_map[egress])
t = threading.Thread(target = mac_recv_task)
t.start()
pkt = Ether(src = ingress_mac, dst = egress_mac)/IP()
log.info('Sending a packet to verify if flows are correct')
sendp(pkt, count=50, iface = self.port_map[ingress])
t.join()
assert_equal(self.success, True)
def test_flow_ip(self):
'''Add and verify flows with IPv4 selectors'''
egress = 1
ingress = 2
egress_map = { 'ether': '00:00:00:00:00:03', 'ip': '192.168.30.1' }
ingress_map = { 'ether': '00:00:00:00:00:04', 'ip': '192.168.40.1' }
flow = OnosFlowCtrl(deviceId = self.device_id,
egressPort = egress,
ingressPort = ingress,
ethType = '0x0800',
ipSrc = ('IPV4_SRC', ingress_map['ip']+'/32'),
ipDst = ('IPV4_DST', egress_map['ip']+'/32')
)
result = flow.addFlow()
assert_equal(result, True)
##wait for flows to be added to ONOS
time.sleep(3)
self.success = False
def mac_recv_task():
def recv_cb(pkt):
log.info('Pkt seen with ingress ip %s, egress ip %s' %(pkt[IP].src, pkt[IP].dst))
self.success = True
sniff(count=2, timeout=5,
lfilter = lambda p: IP in p and p[IP].dst == egress_map['ip'] and p[IP].src == ingress_map['ip'],
prn = recv_cb, iface = self.port_map[egress])
t = threading.Thread(target = mac_recv_task)
t.start()
L2 = Ether(src = ingress_map['ether'], dst = egress_map['ether'])
L3 = IP(src = ingress_map['ip'], dst = egress_map['ip'])
pkt = L2/L3
log.info('Sending a packet to verify if flows are correct')
sendp(pkt, count=50, iface = self.port_map[ingress])
t.join()
assert_equal(self.success, True)