vsg-test:
Added funtions for handling routes.
Introduced TIMEOUT for firewall test cases, also
added Deferred object to manage the callbacks in
VCPE firewall test scenarios.
Change-Id: I998032844e937c25c9a9927bf47cca68142b8080
diff --git a/src/test/vsg/vsgTest.py b/src/test/vsg/vsgTest.py
index 777b664..affa0e2 100644
--- a/src/test/vsg/vsgTest.py
+++ b/src/test/vsg/vsgTest.py
@@ -17,6 +17,8 @@
import sys
import json
from nose.tools import *
+from twisted.internet import defer
+from nose.twistedtools import reactor, deferred
from CordTestUtils import *
from OltConfig import OltConfig
from onosclidriver import OnosCliDriver
@@ -47,6 +49,7 @@
subscribers_per_s_tag = 8
subscriber_map = {}
restore_methods = []
+ TIMEOUT=120
@classmethod
def getSubscriberCredentials(cls, subId):
@@ -212,6 +215,37 @@
vsg_vcpe[vcpe_container]=str(vsg.get_ip())
return vsg_vcpe
+ def add_static_route_via_vcpe_interface(self, routes, vcpe=None):
+ if not vcpe:
+ vcpe = self.vcpe_dhcp
+ os.system('dhclient '+self.vcpe_dhcp)
+ cmds = []
+ for route in routes:
+ log.info('route is %s'%route)
+ cmd = 'ip route add ' + route + ' via 192.168.0.1 '+ 'dev ' + vcpe
+ cmds.append(cmd)
+ for cmd in cmds:
+ os.system(cmd)
+ return True
+
+ def del_static_route_via_vcpe_interface(self,routes,vcpe=None):
+ if not vcpe:
+ vcpe = self.vcpe_dhcp
+ cmds = []
+ for route in routes:
+ cmd = 'ip route del ' + route + ' via 192.168.0.1 ' + 'dev ' + vcpe
+ cmds.append(cmd)
+ for cmd in cmds:
+ os.system(cmd)
+ os.system('dhclient '+self.vcpe_dhcp+' -r')
+ return True
+
+ def restart_vcpe_container(self,vcpe=None):
+ vsg = VSGAccess.get_vcpe_vsg(self.vcpe_container)
+ log.info('restarting vcpe container')
+ vsg.run_cmd('sudo docker restart {}'.format(self.vcpe_container))
+ return True
+
def test_vsg_health(self):
"""
Algo:
@@ -223,7 +257,7 @@
status = VSGAccess.health_check()
assert_equal(status, True)
- def test_vsg_health_check(self,vsg_name='mysite_vsg-1',verify_status=True):
+ def test_vsg_health_check(self, vsg_name='mysite_vsg-1', verify_status=True):
"""
Algo:
1. If vsg name not specified, Get vsg corresponding to vcpe
@@ -245,6 +279,7 @@
log.info('vsg health check status is %s'%status)
assert_equal(status,verify_status)
+ @deferred(TIMEOUT)
def test_vsg_for_vcpe(self):
"""
Algo:
@@ -253,10 +288,16 @@
3. Get all vSGs
4. Verifying atleast one compute node and one vSG created
"""
- vsgs = VSGAccess.get_vsgs()
- compute_nodes = VSGAccess.get_compute_nodes()
- assert_not_equal(len(vsgs), 0)
- assert_not_equal(len(compute_nodes), 0)
+ df = defer.Deferred()
+ def vsg_for_vcpe_df(df):
+ vsgs = VSGAccess.get_vsgs()
+ compute_nodes = VSGAccess.get_compute_nodes()
+ time.sleep(14)
+ assert_not_equal(len(vsgs), 0)
+ assert_not_equal(len(compute_nodes), 0)
+ df.callback(0)
+ reactor.callLater(0,vsg_for_vcpe_df,df)
+ return df
def test_vsg_for_login(self):
"""
@@ -306,7 +347,7 @@
host = '8.8.8.8'
self.success = False
assert_not_equal(vcpe, None)
- vcpe_ip = self.get_dhcp(vcpe, mgmt = mgmt)
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(vcpe, mgmt = mgmt)
assert_not_equal(vcpe_ip, None)
log.info('Got DHCP IP %s for %s' %(vcpe_ip, vcpe))
log.info('Sending icmp echo requests to external network 8.8.8.8')
@@ -326,7 +367,7 @@
vcpe = self.vcpe_dhcp
mgmt = 'eth0'
assert_not_equal(vcpe, None)
- vcpe_ip = self.get_dhcp(vcpe, mgmt = mgmt)
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(vcpe, mgmt = mgmt)
assert_not_equal(vcpe_ip, None)
log.info('Got DHCP IP %s for %s' %(vcpe_ip, vcpe))
log.info('Sending icmp ping requests to %s' %host)
@@ -346,7 +387,7 @@
vcpe = self.vcpe_dhcp
mgmt = 'eth0'
assert_not_equal(vcpe, None)
- vcpe_ip = self.get_dhcp(vcpe, mgmt = mgmt)
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(vcpe, mgmt = mgmt)
assert_not_equal(vcpe_ip, None)
log.info('Got DHCP IP %s for %s' %(vcpe_ip, vcpe))
log.info('Sending icmp ping requests to non existent host %s' %host)
@@ -367,7 +408,7 @@
vcpe = self.vcpe_dhcp
mgmt = 'eth0'
assert_not_equal(vcpe, None)
- vcpe_ip = self.get_dhcp(vcpe, mgmt = mgmt)
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(vcpe, mgmt = mgmt)
assert_not_equal(vcpe_ip, None)
log.info('Got DHCP IP %s for %s' %(vcpe_ip, vcpe))
log.info('Sending icmp ping requests to host %s with ttl 1' %host)
@@ -393,24 +434,30 @@
assert_not_equal(vcpe, None)
assert_not_equal(self.vcpe_dhcp, None)
#first get dhcp on the vcpe interface
- try:
- vcpe_ip = self.get_dhcp(self.vcpe_dhcp, mgmt = mgmt)
- assert_not_equal(vcpe_ip, None)
- log.info('Got DHCP IP %s for %s' %(vcpe_ip, self.vcpe_dhcp))
- log.info('Sending ICMP pings to host %s' %(host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, 0)
- #bring down the wan interface and check again
- st = VSGAccess.vcpe_wan_down(vcpe)
- assert_equal(st, True)
- st_ping, _ = getstatusoutput('ping -c 1 {}'.format(host))
- st = VSGAccess.vcpe_wan_up(vcpe)
- assert_not_equal(st_ping, 0)
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, 0)
- finally:
- self.config_restore()
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(self.vcpe_dhcp, mgmt = mgmt)
+ assert_not_equal(vcpe_ip, None)
+ log.info('Got DHCP IP %s for %s' %(vcpe_ip, self.vcpe_dhcp))
+ log.info('Sending ICMP pings to host %s' %(host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ if st != 0:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, 0)
+ #bring down the wan interface and check again
+ st = VSGAccess.vcpe_wan_down(vcpe)
+ if st is False:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ if st == 0:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_not_equal(st, 0)
+ st = VSGAccess.vcpe_wan_up(vcpe)
+ if st is False:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, 0)
def test_vsg_for_external_connectivity_with_lan_interface_toggle_in_vcpe(self):
"""
@@ -430,26 +477,66 @@
assert_not_equal(vcpe, None)
assert_not_equal(self.vcpe_dhcp, None)
#first get dhcp on the vcpe interface
- try:
- vcpe_ip = self.get_dhcp(self.vcpe_dhcp, mgmt = mgmt)
- assert_not_equal(vcpe_ip, None)
- log.info('Got DHCP IP %s for %s' %(vcpe_ip, self.vcpe_dhcp))
- log.info('Sending ICMP pings to host %s' %(host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, 0)
- #bring down the lan interface and check again
- st = VSGAccess.vcpe_lan_down(vcpe)
- assert_equal(st, True)
- st_ping, _ = getstatusoutput('ping -c 1 {}'.format(host))
- st = VSGAccess.vcpe_lan_up(vcpe)
- assert_not_equal(st_ping, 0)
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, 0)
- finally:
- self.config_restore()
+ vcpe_ip = VSGAccess.vcpe_get_dhcp(self.vcpe_dhcp, mgmt = mgmt)
+ assert_not_equal(vcpe_ip, None)
+ log.info('Got DHCP IP %s for %s' %(vcpe_ip, self.vcpe_dhcp))
+ log.info('Sending ICMP pings to host %s' %(host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ if st != 0:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, 0)
+ #bring down the lan interface and check again
+ st = VSGAccess.vcpe_lan_down(vcpe)
+ if st is False:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ if st == 0:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_not_equal(st, 0)
+ st = VSGAccess.vcpe_lan_up(vcpe)
+ if st is False:
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ VSGAccess.restore_interface_config(mgmt, vcpe = self.vcpe_dhcp)
+ assert_equal(st, 0)
- def test_vsg_firewall_with_deny_destination_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_for_external_connectivity_with_vcpe_container_paused(self,vcpe_name=None,vcpe_intf=None):
+ """
+ Algo:
+ 1. Get vSG corresponding to vcpe
+ 2. Get dhcp ip to vcpe interface
+ 3. Add static route to destination route in test container
+ 4. From test container ping to destination route and verify ping success
+ 5. Login to compute node and execute command to pause vcpe container
+ 6. From test container ping to destination route and verify ping success
+ """
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker pause {}'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker unpause'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
+
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_with_deny_destination_ip(self, vcpe_name=None, vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -458,27 +545,31 @@
4. From cord-tester ping to the denied IP address
5. Verifying that ping should not be successful
"""
- mgmt = 'eth0'
- host = '8.8.8.8'
- assert_not_equal(self.vcpe_dhcp, None)
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- assert_not_equal(vsg, None)
- try:
- vcpe_ip = self.get_dhcp(self.vcpe_dhcp, mgmt = mgmt)
- assert_not_equal(vcpe_ip, None)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, 0)
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_not_equal(st, 0)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- self.config_restore()
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_with_rule_add_and_delete_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_with_rule_add_and_delete_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -490,24 +581,34 @@
7. From cord-tester ping to the denied IP address
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,_ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,_ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_verifying_reachability_for_non_blocked_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_verifying_reachability_for_non_blocked_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -518,24 +619,34 @@
6. From cord-tester ping to the denied IP address other than the denied one
7. Verifying the ping should success
"""
- host1 = '8.8.8.8'
- host2 = '204.79.197.203'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, False)
- try:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host1))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host1 = '8.8.8.8'
+ host2 = '204.79.197.203'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ self.del_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_appending_rules_with_deny_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_appending_rules_with_deny_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -547,27 +658,41 @@
6. From cord-tester ping to the denied IP address IP2
7. Verifying that ping should not be successful
"""
- host1 = '8.8.8.8'
- host2 = '204.79.197.203'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host1))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st, False)
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -A FORWARD -d {} -j DROP'.format(vcpe,host2))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,True)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host1 = '8.8.8.8'
+ host2 = '204.79.197.203'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, False)
+ st,_ = vsg.run_cmd('sudo docker exec {} iptables -F FORWARD'.format(vcpe_name))
+ time.sleep(2)
+ st,_ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, out = getstatusoutput('ping -c 1 {}'.format(host2))
+ log.info('host2 ping output is %s'%out)
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -A FORWARD -d {} -j DROP'.format(vcpe_name,host2))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,True)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host2))
+ self.del_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_removing_one_rule_denying_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_removing_one_rule_denying_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -582,28 +707,40 @@
10. From cord-tester ping to the denied IP address IP2
11. Verifying the ping should success
"""
- host1 = '8.8.8.8'
- host2 = '204.79.197.203'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host1))
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host2))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe,host2))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host1 = '8.8.8.8'
+ host2 = '204.79.197.203'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host1,host2],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host2))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host2))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host1))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host2))
+ self.del_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ log.info('restarting vcpe container')
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_changing_rule_id_deny_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_changing_rule_id_deny_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -615,24 +752,34 @@
10. From cord-tester ping to the denied IP address IP
11. Verifying that ping should not be successful
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 2 -d {} -j DROP '.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 2 -d {} -j DROP '.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,True)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_changing_deny_rule_to_accept_dest_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_changing_deny_rule_to_accept_dest_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -644,25 +791,35 @@
10. From cord-tester ping to the accepted IP
11. Verifying the ping should success
"""
- host1 = '8.8.8.8'
- host2 = '204.79.197.203'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -d {} -j ACCEPT'.format(vcpe,host))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -d {} -j ACCEPT'.format(vcpe_name,host))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j ACCEPT'.format(vcpe_name,host))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_denying_destination_network(self, vcpe=None):
+ @deferred(TIMEOUT) #Fail
+ def test_vsg_firewall_denying_destination_network(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -673,24 +830,35 @@
6. From cord-tester ping to the denied IP address IP2 in the denied subnet
7. Verifying that ping should not be successful
"""
- network = '206.190.36.44/28'
- host1 = '204.79.197.46'
- host2 = '204.79.197.51'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,network))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ network = '204.79.197.192/28'
+ host1 = '204.79.197.203'
+ host2 = '204.79.197.210'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host1,host2],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,network))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,network))
+ self.del_static_route_via_vcpe_interface([host1,host2],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_denying_destination_network_subnet_modification(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_denying_destination_network_subnet_modification(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -701,33 +869,45 @@
6. From cord-tester ping to the denied IP address IP2 in the denied subnet
7. Verifying that ping should not be successful
"""
- network1 = '206.190.36.44/28'
- network2 = '206.190.36.44/26'
- host1 = '204.79.197.46'
- host2 = '204.79.197.51'
- host2 = '204.79.197.63'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,network1))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st,False)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -A FORWARD -d {} -j DROP'.format(vcpe,network2))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
- assert_equal(st, True)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host3))
- assert_equal(st, False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ network1 = '204.79.197.192/28'
+ network2 = '204.79.197.192/27'
+ host1 = '204.79.197.203'
+ host2 = '204.79.197.210'
+ host3 = '204.79.197.224'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host1,host2,host3],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,network1))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st,False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 2 -d {} -j DROP'.format(vcpe_name,network2))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host1))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host2))
+ assert_equal(st, True)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host3))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,network1))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,network2))
+ self.del_static_route_via_vcpe_interface([host1,host2,host3],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_with_deny_source_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_with_deny_source_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -736,21 +916,33 @@
4. From cord-tester ping to 8.8.8.8 from the denied IP
5. Verifying that ping should not be successful
"""
- host = '8.8.8.8'
- source_ip = self.vcpe_dhcp
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -s {} -j DROP'.format(vcpe,source_ip))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ #source_ip = get_ip(self.vcpe_dhcp)
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ source_ip = get_ip(self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -s {} -j DROP'.format(vcpe_name,source_ip))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -s {} -j DROP'.format(vcpe_name,source_ip))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_rule_with_add_and_delete_deny_source_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_rule_with_add_and_delete_deny_source_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -762,24 +954,36 @@
7. From cord-tester ping to 8.8.8.8 from the denied IP
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- source_ip = self.vcpe_dhcp
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -s {} -j DROP'.format(vcpe,source_ip))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -s {} -j DROP'.format(vcpe,source_ip))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ source_ip = get_ip(self.vcpe_dhcp)
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ source_ip = get_ip(self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -s {} -j DROP'.format(vcpe_name,source_ip))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -s {} -j DROP'.format(vcpe_name,source_ip))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -s {} -j DROP'.format(vcpe_name,source_ip))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_rule_with_deny_icmp_protocol_echo_requests_type(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_rule_with_deny_icmp_protocol_echo_requests_type(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -791,24 +995,34 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_rule_with_deny_icmp_protocol_echo_reply_type(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_rule_with_deny_icmp_protocol_echo_reply_type(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -820,24 +1034,34 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format('8.8.8.8'))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format('8.8.8.8'))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_changing_deny_rule_to_accept_rule_with_icmp_protocol_echo_requests_type(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_changing_deny_rule_to_accept_rule_with_icmp_protocol_echo_requests_type(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -849,42 +1073,75 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -p icmp --icmp-type echo-request -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -p icmp --icmp-type echo-request -j ACCEPT'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -p icmp --icmp-type echo-request -j ACCEPT'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-request -j DROP'.format(vcpe_name))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-request -j ACCEPT'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_changing_deny_rule_to_accept_rule_with_icmp_protocol_echo_reply_type(self, vcpe=None):
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, out1 = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -p icmp --icmp-type echo-reply -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -p icmp --icmp-type echo-reply -j ACCEPT'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_changing_deny_rule_to_accept_rule_with_icmp_protocol_echo_reply_type(self,vcpe_name=None,vcpe_intf=None):
+ """
+ Algo:
+ 1. Get vSG corresponding to vcpe
+ 2. Login to compute node
+ 3. Execute iptable command on vcpe from compute node to deny icmp echo-reply type protocol packets
+ 4. From cord-tester ping to 8.8.8.8
+ 5. Verifying the ping should not success
+ 6. Insert another rule to accept the icmp-echo requests protocol packets
+ 7. From cord-tester ping to 8.8.8.8
+ 8. Verifying the ping should success
+ """
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -p icmp --icmp-type echo-reply -j ACCEPT'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-reply -j DROP'.format(vcpe_name))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp --icmp-type echo-reply -j ACCEPT'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_for_deny_icmp_protocol(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_for_deny_icmp_protocol(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -896,24 +1153,34 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st, _ = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_rule_deny_icmp_protocol_and_destination_ip(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_rule_deny_icmp_protocol_and_destination_ip(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -931,28 +1198,41 @@
13. From cord-tester ping to 8.8.8.8
14. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe,host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe))
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st,False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe_name))
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st,False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p icmp -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_flushing_all_configured_rules(self, vcpe=None):
+ @deferred(TIMEOUT) #Fail
+ def test_vsg_firewall_flushing_all_configured_rules(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -968,27 +1248,38 @@
11. From cord-tester ping to 8.8.8.8
12. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe,host))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -F'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p icmp -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -F FORWARD'.format(vcpe_name))
+ time.sleep(1)
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -d {} -j DROP'.format(vcpe_name,host))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_deny_all_ipv4_traffic(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_ipv4_traffic(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -1000,23 +1291,34 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -4 -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -4 -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -4 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -4 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -4 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_replacing_deny_rule_to_accept_rule(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_replacing_deny_rule_to_accept_rule_ipv4_traffic(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -1028,23 +1330,112 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -4 -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -4 -j ACCEPT'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -4 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -4 -j ACCEPT'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -4 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_deny_all_traffic_from_lan_to_wan_in_vcpe(self, vcpe=None):
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_traffic_coming_on_lan_interface_in_vcpe(self,vcpe_name=None,vcpe_intf=None):
+ """
+ Algo:
+ 1. Get vSG corresponding to vcpe
+ 2. Login to compute node
+ 3. Execute iptable command on vcpe from compute node to deny all the traffic coming on lan interface inside vcpe container
+ 4. From cord-tester ping to 8.8.8.8
+ 5. Verifying the ping should not success
+ 6. Delete the iptable rule added
+ 7. From cord-tester ping to 8.8.8.8
+ 8. Verifying the ping should success
+ """
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -i eth1 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -i eth1 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -i eth1 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
+
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_traffic_going_out_of_wan_interface_in_vcpe(self,vcpe_name=None,vcpe_intf=None):
+ """
+ Algo:
+ 1. Get vSG corresponding to vcpe
+ 2. Login to compute node
+ 3. Execute iptable command on vcpe from compute node to deny all the traffic going out of wan interface inside vcpe container
+ 4. From cord-tester ping to 8.8.8.8
+ 5. Verifying the ping should not success
+ 6. Delete the iptable rule added
+ 7. From cord-tester ping to 8.8.8.8
+ 8. Verifying the ping should success
+ """
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -o eth0 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -o eth0 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -o eth0 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
+
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_traffic_from_lan_to_wan_in_vcpe(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -1056,23 +1447,76 @@
7. From cord-tester ping to 8.8.8.8
8. Verifying the ping should success
"""
- host = '8.8.8.8'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -i eth1 -o eth0 -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -i eth1 -o eth0 -j ACCEPT'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -i eth1 -o eth0 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -i eth1 -o eth0 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -i eth1 -o eth0 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
- def test_vsg_firewall_deny_all_dns_traffic(self, vcpe=None):
+
+ #this test case needs modification.default route should be vcpe interface to run this test case
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_dns_traffic(self,vcpe_name=None,vcpe_intf=None):
+ """
+ Algo:
+ 1. Get vSG corresponding to vcpe
+ 2. Login to compute node
+ 3. Execute iptable command on vcpe from compute node to deny all dns Traffic
+ 4. From cord-tester ping to www.google.com
+ 5. Verifying the ping should not success
+ 6. Delete the iptable rule added
+ 7. From cord-tester ping to www.google.com
+ 8. Verifying the ping should success
+ """
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = 'www.msn.com'
+ host_ip = '131.253.33.203'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host_ip],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -p udp --dport 53 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -p udp --dport 53 -j ACCEPT'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -p udp --dport 53 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host_ip],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
+
+ @deferred(TIMEOUT)
+ def test_vsg_firewall_deny_all_ipv4_traffic_vcpe_container_restart(self,vcpe_name=None,vcpe_intf=None):
"""
Algo:
1. Get vSG corresponding to vcpe
@@ -1084,21 +1528,32 @@
7. From cord-tester ping to www.google.com
8. Verifying the ping should success
"""
- host = 'www.google.com'
- if not vcpe:
- vcpe = self.vcpe_container
- vsg = VSGAccess.get_vcpe_vsg(vcpe)
- st, _ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- try:
- st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD 1 -p udp --dport 53 -j DROP'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, True)
- st,output = vsg.run_cmd('sudo docker exec {} iptables -R FORWARD 1 -p udp --dport 53 -j ACCEPT'.format(vcpe))
- st,_ = getstatusoutput('ping -c 1 {}'.format(host))
- assert_equal(st, False)
- finally:
- st, _ = vsg.run_cmd('sudo docker exec {} iptables -X'.format(vcpe))
+ if not vcpe_name:
+ vcpe_name = self.vcpe_container
+ if not vcpe_intf:
+ vcpe_intf = self.vcpe_dhcp
+ df = defer.Deferred()
+ def vcpe_firewall(df):
+ host = '8.8.8.8'
+ vsg = VSGAccess.get_vcpe_vsg(vcpe_name)
+ try:
+ self.add_static_route_via_vcpe_interface([host],vcpe=self.vcpe_dhcp)
+ st, _ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ st,output = vsg.run_cmd('sudo docker exec {} iptables -I FORWARD -4 -j DROP'.format(vcpe_name))
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, True)
+ st,output = vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ time.sleep(3)
+ st,_ = getstatusoutput('ping -c 1 {}'.format(host))
+ assert_equal(st, False)
+ finally:
+ vsg.run_cmd('sudo docker exec {} iptables -D FORWARD -4 -j DROP'.format(vcpe_name))
+ self.del_static_route_via_vcpe_interface([host],vcpe=vcpe_intf)
+ vsg.run_cmd('sudo docker restart {}'.format(vcpe_name))
+ df.callback(0)
+ reactor.callLater(0, vcpe_firewall, df)
+ return df
def test_vsg_xos_subscriber(self):
subscriber_info = self.subscriber_info[0]
@@ -1116,29 +1571,6 @@
result = self.restApiXos.ApiPost('TENANT_VOLT', volt_tenant)
assert_equal(result, True)
- def test_vsg_for_ping_from_vsg_to_external_network(self):
- """
- Algo:
- 1.Create a vSG VM in compute node
- 2.Ensure VM created properly
- 3.Verify login to VM success
- 4.Do ping to external network from vSG VM
- 5.Verify that ping gets success
- 6.Verify ping success flows added in OvS
- """
-
- def test_vsg_for_ping_from_vcpe_to_external_network(self):
- """
- Algo:
- 1.Create a vSG VM in compute node
- 2.Create a vCPE container inside VM
- 3.Verify both VM and Container created properly
- 4.Verify login to vCPE container success
- 5.Do ping to external network from vCPE container
- 6.Verify that ping gets success
- 7.Verify ping success flows added in OvS
- """
-
def test_vsg_for_dns_service(self):
"""
Algo: