blob: 1f19848c9376e811e17f3ab38b917526b047bf7f [file] [log] [blame]
A R Karthickb7e80902016-05-17 09:38:31 -07001import unittest
2from nose.tools import *
3from nose.twistedtools import reactor, deferred
4from twisted.internet import defer
5from scapy.all import *
6import time, monotonic
7import os, sys
8import tempfile
9import random
10import threading
11import json
12from Stats import Stats
13from OnosCtrl import OnosCtrl
14from DHCP import DHCPTest
15from EapTLS import TLSAuthTest
16from Channels import Channels, IgmpChannel
17from subscriberDb import SubscriberDB
18from threadPool import ThreadPool
19from portmaps import g_subscriber_port_map
20from OltConfig import *
21from OnosFlowCtrl import get_mac
22from CordTestServer import cord_test_onos_restart
23
24log.setLevel('INFO')
25
26class Subscriber(Channels):
27 PORT_TX_DEFAULT = 2
28 PORT_RX_DEFAULT = 1
29 INTF_TX_DEFAULT = 'veth2'
30 INTF_RX_DEFAULT = 'veth0'
31 STATS_RX = 0
32 STATS_TX = 1
33 STATS_JOIN = 2
34 STATS_LEAVE = 3
35 SUBSCRIBER_SERVICES = 'DHCP IGMP TLS'
36 def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, port_map = None,
37 num = 1, channel_start = 0,
38 tx_port = PORT_TX_DEFAULT, rx_port = PORT_RX_DEFAULT,
39 iface = INTF_RX_DEFAULT, iface_mcast = INTF_TX_DEFAULT,
40 mcast_cb = None, loginType = 'wireless'):
41 self.tx_port = tx_port
42 self.rx_port = rx_port
43 self.port_map = port_map or g_subscriber_port_map
44 try:
45 self.tx_intf = self.port_map[tx_port]
46 self.rx_intf = self.port_map[rx_port]
47 except:
48 self.tx_intf = self.port_map[self.PORT_TX_DEFAULT]
49 self.rx_intf = self.port_map[self.PORT_RX_DEFAULT]
50
51 Channels.__init__(self, num, channel_start = channel_start,
52 iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
53 self.name = name
54 self.service = service
55 self.service_map = {}
56 services = self.service.strip().split(' ')
57 for s in services:
58 self.service_map[s] = True
59 self.loginType = loginType
60 ##start streaming channels
61 self.join_map = {}
62 ##accumulated join recv stats
63 self.join_rx_stats = Stats()
64
65 def has_service(self, service):
66 if self.service_map.has_key(service):
67 return self.service_map[service]
68 if self.service_map.has_key(service.upper()):
69 return self.service_map[service.upper()]
70 return False
71
72 def channel_join_update(self, chan, join_time):
73 self.join_map[chan] = ( Stats(), Stats(), Stats(), Stats() )
74 self.channel_update(chan, self.STATS_JOIN, 1, t = join_time)
75
76 def channel_join(self, chan = 0, delay = 2):
77 '''Join a channel and create a send/recv stats map'''
78 if self.join_map.has_key(chan):
79 del self.join_map[chan]
80 self.delay = delay
81 chan, join_time = self.join(chan)
82 self.channel_join_update(chan, join_time)
83 return chan
84
85 def channel_join_next(self, delay = 2):
86 '''Joins the next channel leaving the last channel'''
87 if self.last_chan:
88 if self.join_map.has_key(self.last_chan):
89 del self.join_map[self.last_chan]
90 self.delay = delay
91 chan, join_time = self.join_next()
92 self.channel_join_update(chan, join_time)
93 return chan
94
95 def channel_jump(self, delay = 2):
96 '''Jumps randomly to the next channel leaving the last channel'''
97 if self.last_chan is not None:
98 if self.join_map.has_key(self.last_chan):
99 del self.join_map[self.last_chan]
100 self.delay = delay
101 chan, join_time = self.jump()
102 self.channel_join_update(chan, join_time)
103 return chan
104
105 def channel_leave(self, chan = 0):
106 if self.join_map.has_key(chan):
107 del self.join_map[chan]
108 self.leave(chan)
109
110 def channel_update(self, chan, stats_type, packets, t=0):
111 if type(chan) == type(0):
112 chan_list = (chan,)
113 else:
114 chan_list = chan
115 for c in chan_list:
116 if self.join_map.has_key(c):
117 self.join_map[c][stats_type].update(packets = packets, t = t)
118
119 def channel_receive(self, chan, cb = None, count = 1):
120 log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
121 self.recv(chan, cb = cb, count = count)
122
123 def recv_channel_cb(self, pkt):
124 ##First verify that we have received the packet for the joined instance
125 log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
126 chan = self.caddr(pkt[IP].dst)
127 assert_equal(chan in self.join_map.keys(), True)
128 recv_time = monotonic.monotonic() * 1000000
129 join_time = self.join_map[chan][self.STATS_JOIN].start
130 delta = recv_time - join_time
131 self.join_rx_stats.update(packets=1, t = delta, usecs = True)
132 self.channel_update(chan, self.STATS_RX, 1, t = delta)
133 log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
134
135class subscriber_pool:
136
137 def __init__(self, subscriber, test_cbs):
138 self.subscriber = subscriber
139 self.test_cbs = test_cbs
140
141 def pool_cb(self):
142 for cb in self.test_cbs:
143 if cb:
144 cb(self.subscriber)
145
146class subscriber_exchange(unittest.TestCase):
147
148 apps = ('org.onosproject.aaa', 'org.onosproject.dhcp')
149 olt_apps = () #'org.onosproject.cordmcast')
150 table_app = 'org.ciena.cordigmp'
151 dhcp_server_config = {
152 "ip": "10.1.11.50",
153 "mac": "ca:fe:ca:fe:ca:fe",
154 "subnet": "255.255.252.0",
155 "broadcast": "10.1.11.255",
156 "router": "10.1.8.1",
157 "domain": "8.8.8.8",
158 "ttl": "63",
159 "delay": "2",
160 "startip": "10.1.11.51",
161 "endip": "10.1.11.100"
162 }
163
164 aaa_loaded = False
165 test_path = os.path.dirname(os.path.realpath(__file__))
166 table_app_file = os.path.join(test_path, '..', 'apps/ciena-cordigmp-multitable-1.0-SNAPSHOT.oar')
167 app_file = os.path.join(test_path, '..', 'apps/ciena-cordigmp-1.0-SNAPSHOT.oar')
168 onos_config_path = os.path.join(test_path, '..', 'setup/onos-config')
169 olt_conf_file = os.path.join(test_path, '..', 'setup/olt_config_multitable.json')
170 cpqd_path = os.path.join(test_path, '..', 'setup')
171 ovs_path = cpqd_path
172 device_id = 'of:' + get_mac('ovsbr0')
173 cpqd_device_dict = { "devices" : {
174 "{}".format(device_id) : {
175 "basic" : {
176 "driver" : "spring-open-cpqd"
177 }
178 }
179 },
180 }
181
182 @classmethod
183 def setUpClass(cls):
184 '''Load the OLT config and activate relevant apps'''
185 ## First restart ONOS with cpqd driver config for OVS
186 #cls.start_onos(network_cfg = cls.cpqd_device_dict)
187 cls.install_app_table()
188 cls.start_cpqd(mac = RandMAC()._fix())
189 cls.olt = OltConfig(olt_conf_file = cls.olt_conf_file)
190 OnosCtrl.cord_olt_config(cls.olt.olt_device_data())
191 cls.port_map, cls.port_list = cls.olt.olt_port_map_multi()
192 cls.activate_apps(cls.apps + cls.olt_apps)
193
194 @classmethod
195 def tearDownClass(cls):
196 '''Deactivate the olt apps and restart OVS back'''
197 apps = cls.olt_apps + ( cls.table_app,)
198 for app in apps:
199 onos_ctrl = OnosCtrl(app)
200 onos_ctrl.deactivate()
201 cls.uninstall_app_table()
202 cls.start_ovs()
203
204 @classmethod
205 def activate_apps(cls, apps):
206 for app in apps:
207 onos_ctrl = OnosCtrl(app)
208 status, _ = onos_ctrl.activate()
209 assert_equal(status, True)
210 time.sleep(2)
211
212 @classmethod
213 def install_app_table(cls):
214 ##Uninstall the existing app if any
215 OnosCtrl.uninstall_app(cls.table_app)
216 time.sleep(2)
217 log.info('Installing the multi table app %s for subscriber test' %(cls.table_app_file))
218 OnosCtrl.install_app(cls.table_app_file)
219 time.sleep(3)
220
221 @classmethod
222 def uninstall_app_table(cls):
223 ##Uninstall the table app on class exit
224 OnosCtrl.uninstall_app(cls.table_app)
225 time.sleep(2)
226 log.info('Installing back the cord igmp app %s for subscriber test on exit' %(cls.app_file))
227 OnosCtrl.install_app(cls.app_file)
228
229 @classmethod
230 def start_onos(cls, network_cfg = None):
231 if network_cfg is None:
232 network_cfg = cls.cpqd_device_dict
233
234 if type(network_cfg) is tuple:
235 res = []
236 for v in network_cfg:
237 res += v.items()
238 config = dict(res)
239 else:
240 config = network_cfg
241 log.info('Restarting ONOS with new network configuration')
242 cfg = json.dumps(config)
243 with open('{}/network-cfg.json'.format(cls.onos_config_path), 'w') as f:
244 f.write(cfg)
245
246 return cord_test_onos_restart()
247
248 @classmethod
249 def start_cpqd(cls, mac = '00:11:22:33:44:55'):
250 dpid = mac.replace(':', '')
251 cpqd_file = os.sep.join( (cls.cpqd_path, 'cpqd.sh') )
252 cpqd_cmd = '{} {}'.format(cpqd_file, dpid)
253 ret = os.system(cpqd_cmd)
254 assert_equal(ret, 0)
255 time.sleep(10)
256
257 @classmethod
258 def start_ovs(cls):
259 ovs_file = os.sep.join( (cls.ovs_path, 'of-bridge-local.sh') )
260 ret = os.system(ovs_file)
261 assert_equal(ret, 0)
262 time.sleep(2)
263
264 def onos_aaa_load(self):
265 if self.aaa_loaded:
266 return
267 aaa_dict = {'apps' : { 'org.onosproject.aaa' : { 'AAA' : { 'radiusSecret': 'radius_password',
268 'radiusIp': '172.17.0.2' } } } }
269 radius_ip = os.getenv('ONOS_AAA_IP') or '172.17.0.2'
270 aaa_dict['apps']['org.onosproject.aaa']['AAA']['radiusIp'] = radius_ip
271 self.onos_load_config('org.onosproject.aaa', aaa_dict)
272 self.aaa_loaded = True
273
274 def onos_dhcp_table_load(self, config = None):
275 dhcp_dict = {'apps' : { 'org.onosproject.dhcp' : { 'dhcp' : copy.copy(self.dhcp_server_config) } } }
276 dhcp_config = dhcp_dict['apps']['org.onosproject.dhcp']['dhcp']
277 if config:
278 for k in config.keys():
279 if dhcp_config.has_key(k):
280 dhcp_config[k] = config[k]
281 self.onos_load_config('org.onosproject.dhcp', dhcp_dict)
282
283 def onos_load_config(self, app, config):
284 status, code = OnosCtrl.config(config)
285 if status is False:
286 log.info('JSON config request for app %s returned status %d' %(app, code))
287 assert_equal(status, True)
288 time.sleep(2)
289
290 def dhcp_sndrcv(self, dhcp, update_seed = False):
291 cip, sip = dhcp.discover(update_seed = update_seed)
292 assert_not_equal(cip, None)
293 assert_not_equal(sip, None)
294 log.info('Got dhcp client IP %s from server %s for mac %s' %
295 (cip, sip, dhcp.get_mac(cip)[0]))
296 return cip,sip
297
298 def dhcp_request(self, subscriber, seed_ip = '10.10.10.1', update_seed = False):
299 config = {'startip':'10.10.10.20', 'endip':'10.10.10.200',
300 'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
301 'subnet': '255.255.255.0', 'broadcast':'10.10.10.255', 'router':'10.10.10.1'}
302 self.onos_dhcp_table_load(config)
303 dhcp = DHCPTest(seed_ip = seed_ip, iface = subscriber.iface)
304 cip, sip = self.dhcp_sndrcv(dhcp, update_seed = update_seed)
305 return cip, sip
306
307 def recv_channel_cb(self, pkt):
308 ##First verify that we have received the packet for the joined instance
309 chan = self.subscriber.caddr(pkt[IP].dst)
310 assert_equal(chan in self.subscriber.join_map.keys(), True)
311 recv_time = monotonic.monotonic() * 1000000
312 join_time = self.subscriber.join_map[chan][self.subscriber.STATS_JOIN].start
313 delta = recv_time - join_time
314 self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
315 self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
316 log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
317 self.test_status = True
318
319 def tls_verify(self, subscriber):
320 if subscriber.has_service('TLS'):
321 time.sleep(2)
322 tls = TLSAuthTest()
323 log.info('Running subscriber %s tls auth test' %subscriber.name)
324 tls.runTest()
325 self.test_status = True
326
327 def dhcp_verify(self, subscriber):
328 cip, sip = self.dhcp_request(subscriber, update_seed = True)
329 log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
330 subscriber.src_list = [cip]
331 self.test_status = True
332
333 def dhcp_jump_verify(self, subscriber):
334 cip, sip = self.dhcp_request(subscriber, seed_ip = '10.10.200.1')
335 log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
336 subscriber.src_list = [cip]
337 self.test_status = True
338
339 def dhcp_next_verify(self, subscriber):
340 cip, sip = self.dhcp_request(subscriber, seed_ip = '10.10.150.1')
341 log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
342 subscriber.src_list = [cip]
343 self.test_status = True
344
345 def igmp_verify(self, subscriber):
346 chan = 0
347 if subscriber.has_service('IGMP'):
348 for i in range(5):
349 log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
350 subscriber.channel_join(chan, delay = 0)
351 subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
352 log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
353 subscriber.channel_leave(chan)
354 time.sleep(3)
355 log.info('Interface %s Join RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name,subscriber.join_rx_stats))
356 self.test_status = True
357
358 def igmp_jump_verify(self, subscriber):
359 if subscriber.has_service('IGMP'):
360 for i in xrange(subscriber.num):
361 log.info('Subscriber %s jumping channel' %subscriber.name)
362 chan = subscriber.channel_jump(delay=0)
363 subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
364 log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
365 time.sleep(3)
366 log.info('Interface %s Jump RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
367 self.test_status = True
368
369 def igmp_next_verify(self, subscriber):
370 if subscriber.has_service('IGMP'):
371 for i in xrange(subscriber.num):
372 if i:
373 chan = subscriber.channel_join_next(delay=0)
374 else:
375 chan = subscriber.channel_join(i, delay=0)
376 log.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
377 subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
378 log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
379 time.sleep(3)
380 log.info('Interface %s Join Next RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
381 self.test_status = True
382
383 def generate_port_list(self, subscribers, channels):
384 return self.port_list[:subscribers]
385
386 def subscriber_load(self, create = True, num = 10, num_channels = 1, channel_start = 0, port_list = []):
387 '''Load the subscriber from the database'''
388 self.subscriber_db = SubscriberDB(create = create)
389 if create is True:
390 self.subscriber_db.generate(num)
391 self.subscriber_info = self.subscriber_db.read(num)
392 self.subscriber_list = []
393 if not port_list:
394 port_list = self.generate_port_list(num, num_channels)
395
396 index = 0
397 for info in self.subscriber_info:
398 self.subscriber_list.append(Subscriber(name=info['Name'],
399 service=info['Service'],
400 port_map = self.port_map,
401 num=num_channels,
402 channel_start = channel_start,
403 tx_port = port_list[index][0],
404 rx_port = port_list[index][1]))
405 if num_channels > 1:
406 channel_start += num_channels
407 index += 1
408
409 #load the ssm list for all subscriber channels
410 igmpChannel = IgmpChannel()
411 ssm_groups = map(lambda sub: sub.channels, self.subscriber_list)
412 ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
413 igmpChannel.igmp_load_ssm_config(ssm_list)
414
415 def subscriber_join_verify( self, num_subscribers = 10, num_channels = 1,
416 channel_start = 0, cbs = None, port_list = []):
417 self.test_status = False
418 self.num_subscribers = num_subscribers
419 self.subscriber_load(create = True, num = num_subscribers,
420 num_channels = num_channels, channel_start = channel_start, port_list = port_list)
421 self.onos_aaa_load()
422 self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
423 if cbs is None:
424 cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
425 for subscriber in self.subscriber_list:
426 subscriber.start()
427 pool_object = subscriber_pool(subscriber, cbs)
428 self.thread_pool.addTask(pool_object.pool_cb)
429 self.thread_pool.cleanUpThreads()
430 for subscriber in self.subscriber_list:
431 subscriber.stop()
432 return self.test_status
433
434 def test_subscriber_join_recv(self):
435 """Test subscriber join and receive"""
436 num_subscribers = 5
437 num_channels = 1
438 test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
439 num_channels = num_channels,
440 port_list = self.generate_port_list(num_subscribers, num_channels))
441 assert_equal(test_status, True)
442
443 def test_subscriber_join_jump(self):
444 """Test subscriber join and receive for channel surfing"""
445 num_subscribers = 5
446 num_channels = 50
447 test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
448 num_channels = num_channels,
449 cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify),
450 port_list = self.generate_port_list(num_subscribers, num_channels))
451 assert_equal(test_status, True)
452
453 def test_subscriber_join_next(self):
454 """Test subscriber join next for channels"""
455 num_subscribers = 5
456 num_channels = 50
457 test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
458 num_channels = num_channels,
459 cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify),
460 port_list = self.generate_port_list(num_subscribers, num_channels))
461 assert_equal(test_status, True)