blob: 6ccac3b4272de63117f7acd57525316d7b64d539 [file] [log] [blame]
Chetan Gaonker26ae67e2017-07-18 19:58:30 +00001#
2# Copyright 2016-present Ciena Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from twisted.internet import defer
17from nose.tools import *
18from nose.twistedtools import reactor, deferred
19from scapy.all import *
20from select import select as socket_select
21import time, monotonic
22import os
23import random
24import threading
25from IGMP import *
26from McastTraffic import *
27from Stats import Stats
28from OnosCtrl import OnosCtrl
29from OltConfig import OltConfig
30from Channels import IgmpChannel
31from CordLogger import CordLogger
32from CordTestConfig import setup_module, teardown_module
33from CordTestUtils import log_test
34log_test.setLevel('INFO')
35
36class IGMPProxyTestState:
37
38 def __init__(self, groups = [], df = None, state = 0):
39 self.df = df
40 self.state = state
41 self.counter = 0
42 self.groups = groups
43 self.group_map = {} ##create a send/recv count map
44 for g in groups:
45 self.group_map[g] = (Stats(), Stats())
46
47 def update(self, group, tx = 0, rx = 0, t = 0):
48 self.counter += 1
49 index = 0 if rx == 0 else 1
50 v = tx if rx == 0 else rx
51 if self.group_map.has_key(group):
52 self.group_map[group][index].update(packets = v, t = t)
53
54 def update_state(self):
55 self.state = self.state ^ 1
56
57class igmpproxy_exchange(CordLogger):
58
59 V_INF1 = 'veth0'
60 MGROUP1 = '239.1.2.3'
61 MGROUP2 = '239.2.2.3'
62 MINVALIDGROUP1 = '255.255.255.255'
63 MINVALIDGROUP2 = '239.255.255.255'
64 MMACGROUP1 = "01:00:5e:01:02:03"
65 MMACGROUP2 = "01:00:5e:02:02:03"
66 IGMP_DST_MAC = "01:00:5e:00:00:16"
67 IGMP_SRC_MAC = "5a:e1:ac:ec:4d:a1"
68 IP_SRC = '1.2.3.4'
69 IP_DST = '224.0.0.22'
70 NEGATIVE_TRAFFIC_STATUS = 1
71 igmp_eth = Ether(dst = IGMP_DST_MAC, type = ETH_P_IP)
72 igmp_ip = IP(dst = IP_DST)
73 IGMP_TEST_TIMEOUT = 5
74 IGMP_QUERY_TIMEOUT = 60
75 MCAST_TRAFFIC_TIMEOUT = 20
76 PORT_TX_DEFAULT = 2
77 PORT_RX_DEFAULT = 1
78 max_packets = 100
79 app = 'org.opencord.igmpproxy'
80 olt_conf_file = os.getenv('OLT_CONFIG_FILE', os.path.join(os.path.dirname(os.path.realpath(__file__)), '../setup/olt_config.json'))
81 ROVER_TEST_TIMEOUT = 300 #3600*86
82 ROVER_TIMEOUT = (ROVER_TEST_TIMEOUT - 100)
83 ROVER_JOIN_TIMEOUT = 60
84 VOLTHA_ENABLED = bool(int(os.getenv('VOLTHA_ENABLED', 0)))
85
86 @classmethod
87 def setUpClass(cls):
88 cls.olt = OltConfig(olt_conf_file = cls.olt_conf_file)
89 cls.port_map, _ = cls.olt.olt_port_map()
90 if cls.VOLTHA_ENABLED is False:
91 OnosCtrl.config_device_driver()
92 OnosCtrl.cord_olt_config(cls.olt)
93 time.sleep(2)
94
95 @classmethod
96 def tearDownClass(cls):
97 if cls.VOLTHA_ENABLED is False:
98 OnosCtrl.config_device_driver(driver = 'ovs')
99
100 def setUp(self):
101 ''' Activate the igmp proxy app'''
102 super(igmp_exchange, self).setUp()
103 self.onos_ctrl = OnosCtrl(self.app)
104 self.onos_ctrl.activate()
105 self.igmp_channel = IgmpChannel()
106
107 def tearDown(self):
108 super(igmp_exchange, self).tearDown()
109
110 def onos_load_config(self, config):
111 log_test.info('onos load config is %s'%config)
112 status, code = OnosCtrl.config(config)
113 if status is False:
114 log_test.info('JSON request returned status %d' %code)
115 assert_equal(status, True)
116 time.sleep(2)
117
118 def onos_ssm_table_load(self, groups, src_list = ['1.2.3.4'],flag = False):
119 return
120 ssm_dict = {'apps' : { 'org.opencord.igmpproxy' : { 'ssmTranslate' : [] } } }
121 ssm_xlate_list = ssm_dict['apps']['org.opencord.igmpproxy']['ssmTranslate']
122 if flag: #to maintain seperate group-source pair.
123 for i in range(len(groups)):
124 d = {}
125 d['source'] = src_list[i] or '0.0.0.0'
126 d['group'] = groups[i]
127 ssm_xlate_list.append(d)
128 else:
129 for g in groups:
130 for s in src_list:
131 d = {}
132 d['source'] = s or '0.0.0.0'
133 d['group'] = g
134 ssm_xlate_list.append(d)
135 self.onos_load_config(ssm_dict)
136 cord_port_map = {}
137 for g in groups:
138 cord_port_map[g] = (self.PORT_TX_DEFAULT, self.PORT_RX_DEFAULT)
139 self.igmp_channel.cord_port_table_load(cord_port_map)
140 time.sleep(2)
141
142 def get_igmp_intf(self):
143 inst = os.getenv('TEST_INSTANCE', None)
144 if not inst:
145 return 'veth0'
146 inst = int(inst) + 1
147 if inst >= self.port_map['uplink']:
148 inst += 1
149 if self.port_map.has_key(inst):
150 return self.port_map[inst]
151 return 'veth0'
152
153 def igmp_verify_join(self, igmpStateList):
154 sendState, recvState = igmpStateList
155 ## check if the send is received for the groups
156 for g in sendState.groups:
157 tx_stats = sendState.group_map[g][0]
158 tx = tx_stats.count
159 assert_greater(tx, 0)
160 rx_stats = recvState.group_map[g][1]
161 rx = rx_stats.count
162 assert_greater(rx, 0)
163 log_test.info('Receive stats %s for group %s' %(rx_stats, g))
164
165 log_test.info('IGMP test verification success')
166
167 def igmp_verify_leave(self, igmpStateList, leave_groups):
168 sendState, recvState = igmpStateList[0], igmpStateList[1]
169 ## check if the send is received for the groups
170 for g in sendState.groups:
171 tx_stats = sendState.group_map[g][0]
172 rx_stats = recvState.group_map[g][1]
173 tx = tx_stats.count
174 rx = rx_stats.count
175 assert_greater(tx, 0)
176 if g not in leave_groups:
177 log_test.info('Received %d packets for group %s' %(rx, g))
178 for g in leave_groups:
179 rx = recvState.group_map[g][1].count
180 assert_equal(rx, 0)
181
182 log_test.info('IGMP test verification success')
183
184 def mcast_traffic_timer(self):
185 log_test.info('MCAST traffic timer expiry')
186 self.mcastTraffic.stopReceives()
187
188 def send_mcast_cb(self, send_state):
189 for g in send_state.groups:
190 send_state.update(g, tx = 1)
191 return 0
192
193 ##Runs in the context of twisted reactor thread
194 def igmp_recv(self, igmpState):
195 s = socket_select([self.recv_socket], [], [], 1.0)
196 if self.recv_socket in s[0]:
197 p = self.recv_socket.recv()
198 try:
199 send_time = float(p.payload.load)
200 recv_time = monotonic.monotonic()
201 except:
202 log_test.info('Unexpected Payload received: %s' %p.payload.load)
203 return 0
204 #log_test.info( 'Recv in %.6f secs' %(recv_time - send_time))
205 igmpState.update(p.dst, rx = 1, t = recv_time - send_time)
206 return 0
207
208 def send_igmp_join(self, groups, src_list = ['1.2.3.4'], record_type=IGMP_V3_GR_TYPE_INCLUDE,
209 ip_pkt = None, iface = 'veth0', ssm_load = False, delay = 1):
210 if ssm_load is True:
211 self.onos_ssm_table_load(groups, src_list)
212 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
213 gaddr=self.IP_DST)
214 for g in groups:
215 gr = IGMPv3gr(rtype= record_type, mcaddr=g)
216 gr.sources = src_list
217 igmp.grps.append(gr)
218 if ip_pkt is None:
219 ip_pkt = self.igmp_eth/self.igmp_ip
220 pkt = ip_pkt/igmp
221 IGMPv3.fixup(pkt)
222 sendp(pkt, iface=iface)
223 if delay != 0:
224 time.sleep(delay)
225
226 def send_igmp_join_recvQuery(self, groups, rec_queryCount = None, src_list = ['1.2.3.4'], ip_pkt = None, iface = 'veth0', delay = 2):
227 self.onos_ssm_table_load(groups, src_list)
228 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
229 gaddr=self.IP_DST)
230 for g in groups:
231 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr=g)
232 gr.sources = src_list
233 gr.sources = src_list
234 igmp.grps.append(gr)
235 if ip_pkt is None:
236 ip_pkt = self.igmp_eth/self.igmp_ip
237 pkt = ip_pkt/igmp
238 IGMPv3.fixup(pkt)
239 if rec_queryCount == None:
240 log_test.info('Sending IGMP join for group %s and waiting for one query packet and printing the packet' %groups)
241 resp = srp1(pkt, iface=iface)
242 else:
243 log_test.info('Sending IGMP join for group %s and waiting for periodic query packets and printing one packet' %groups)
244 resp = srp1(pkt, iface=iface)
245# resp = srp1(pkt, iface=iface) if rec_queryCount else srp3(pkt, iface=iface)
246 resp[0].summary()
247 log_test.info('Sent IGMP join for group %s and received a query packet and printing packet' %groups)
248 if delay != 0:
249 time.sleep(delay)
250
251 def send_igmp_leave(self, groups, src_list = ['1.2.3.4'], ip_pkt = None, iface = 'veth0', delay = 2):
252 log_test.info('entering into igmp leave function')
253 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
254 gaddr=self.IP_DST)
255 for g in groups:
256 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr=g)
257 gr.sources = src_list
258 igmp.grps.append(gr)
259 if ip_pkt is None:
260 ip_pkt = self.igmp_eth/self.igmp_ip
261 pkt = ip_pkt/igmp
262 IGMPv3.fixup(pkt)
263 sendp(pkt, iface = iface)
264 if delay != 0:
265 time.sleep(delay)
266
267 @deferred(timeout=MCAST_TRAFFIC_TIMEOUT+10)
268 def test_igmpproxy_with_join_and_verify_traffic(self):
269 groups = [self.MGROUP1, self.MGROUP1]
270 self.onos_ssm_table_load(groups)
271 df = defer.Deferred()
272 igmpState = IGMPProxyTestState(groups = groups, df = df)
273 igmpStateRecv = IGMPProxyTestState(groups = groups, df = df)
274 igmpStateList = (igmpState, igmpStateRecv)
275 tx_intf = self.port_map[self.PORT_TX_DEFAULT]
276 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
277 mcastTraffic = McastTraffic(groups, iface= tx_intf, cb = self.send_mcast_cb, arg = igmpState)
278 self.df = df
279 self.mcastTraffic = mcastTraffic
280 self.recv_socket = L3PacketSocket(iface = rx_intf, type = ETH_P_IP)
281
282 def igmp_srp_task(stateList):
283 igmpSendState, igmpRecvState = stateList
284 if not mcastTraffic.isRecvStopped():
285 self.igmp_recv(igmpRecvState)
286 reactor.callLater(0, igmp_srp_task, stateList)
287 else:
288 self.mcastTraffic.stop()
289 #log_test.info('Sending IGMP leave for groups: %s' %groups)
290 self.send_igmp_leave(groups, iface = rx_intf, delay = 2)
291 self.recv_socket.close()
292 self.igmp_verify_join(stateList)
293 self.df.callback(0)
294
295 self.send_igmp_join(groups, iface = rx_intf)
296 mcastTraffic.start()
297 self.test_timer = reactor.callLater(self.MCAST_TRAFFIC_TIMEOUT, self.mcast_traffic_timer)
298 reactor.callLater(0, igmp_srp_task, igmpStateList)
299 return df
300
301 @deferred(timeout=MCAST_TRAFFIC_TIMEOUT+40)
302 def test_igmpproxy_with_leave_and_verify_traffic(self):
303 groups = [self.MGROUP1]
304 leave_groups = [self.MGROUP1]
305 self.onos_ssm_table_load(groups)
306 df = defer.Deferred()
307 igmpState = IGMPProxyTestState(groups = groups, df = df)
308 IGMPProxyTestState(groups = groups, df = df)
309 tx_intf = self.port_map[self.PORT_TX_DEFAULT]
310 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
311 mcastTraffic = McastTraffic(groups, iface= tx_intf, cb = self.send_mcast_cb,
312 arg = igmpState)
313 self.df = df
314 self.mcastTraffic = mcastTraffic
315 self.recv_socket = L3PacketSocket(iface = rx_intf, type = ETH_P_IP)
316
317 mcastTraffic.start()
318 self.send_igmp_join(groups, iface = rx_intf)
319 time.sleep(5)
320 self.send_igmp_leave(leave_groups, delay = 3, iface = rx_intf)
321 time.sleep(10)
322 join_state = IGMPProxyTestState(groups = leave_groups)
323 status = self.igmp_not_recv_task(rx_intf, leave_groups, join_state)
324 log_test.info('verified status for igmp recv task %s'%status)
325 assert status == 1 , 'EXPECTED RESULT'
326 self.df.callback(0)
327 return df
328
329 @deferred(timeout=100)
330 def test_igmpproxy_with_leave_and_join_loop(self):
331 self.groups = ['226.0.1.1', '227.0.0.1', '228.0.0.1', '229.0.0.1', '230.0.0.1' ]
332 self.src_list = ['3.4.5.6', '7.8.9.10']
333 self.onos_ssm_table_load(self.groups,src_list=self.src_list)
334 df = defer.Deferred()
335 self.df = df
336 self.iterations = 0
337 self.num_groups = len(self.groups)
338 self.MAX_TEST_ITERATIONS = 10
339 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
340
341 def igmp_srp_task(v):
342 if self.iterations < self.MAX_TEST_ITERATIONS:
343 if v == 1:
344 ##join test
345 self.num_groups = random.randint(0, len(self.groups))
346 self.send_igmp_join(self.groups[:self.num_groups],
347 src_list = self.src_list,
348 iface = rx_intf, delay = 0)
349 else:
350 self.send_igmp_leave(self.groups[:self.num_groups],
351 src_list = self.src_list,
352 iface = rx_intf, delay = 0)
353 self.iterations += 1
354 v ^= 1
355 reactor.callLater(1.0 + 0.5*self.num_groups,
356 igmp_srp_task, v)
357 else:
358 self.df.callback(0)
359
360 reactor.callLater(0, igmp_srp_task, 1)
361 return df
362
363 def igmp_join_task(self, intf, groups, state, src_list = ['1.2.3.4']):
364 #self.onos_ssm_table_load(groups, src_list)
365 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
366 gaddr=self.IP_DST)
367 for g in groups:
368 gr = IGMPv3gr(rtype = IGMP_V3_GR_TYPE_INCLUDE, mcaddr = g)
369 gr.sources = src_list
370 igmp.grps.append(gr)
371
372 for g in groups:
373 state.group_map[g][0].update(1, t = monotonic.monotonic())
374
375 pkt = self.igmp_eth/self.igmp_ip/igmp
376 IGMPv3.fixup(pkt)
377 sendp(pkt, iface=intf)
378 log_test.debug('Returning from join task')
379
380 def igmp_recv_task(self, intf, groups, join_state):
381 recv_socket = L3PacketSocket(iface = intf, type = ETH_P_IP)
382 group_map = {}
383 for g in groups:
384 group_map[g] = [0,0]
385
386 log_test.info('Verifying join interface should receive multicast data')
387 while True:
388 p = recv_socket.recv()
389 if p.dst in groups and group_map[p.dst][0] == 0:
390 group_map[p.dst][0] += 1
391 group_map[p.dst][1] = monotonic.monotonic()
392 c = 0
393 for g in groups:
394 c += group_map[g][0]
395 if c == len(groups):
396 break
397 for g in groups:
398 join_start = join_state.group_map[g][0].start
399 recv_time = group_map[g][1] * 1000000
400 delta = (recv_time - join_start)
401 log_test.info('Join for group %s received in %.3f usecs' %
402 (g, delta))
403
404 recv_socket.close()
405 log_test.debug('Returning from recv task')
406
407 def igmp_not_recv_task(self, intf, groups, join_state):
408 log_test.info('Entering igmp not recv task loop')
409 recv_socket = L2Socket(iface = intf, type = ETH_P_IP)
410 group_map = {}
411 for g in groups:
412 group_map[g] = [0,0]
413
414 log_test.info('Verifying join interface, should not receive any multicast data')
415 self.NEGATIVE_TRAFFIC_STATUS = 1
416 def igmp_recv_cb(pkt):
417 log_test.info('Multicast packet %s received for left groups %s' %(pkt[IP].dst, groups))
418 self.NEGATIVE_TRAFFIC_STATUS = 2
419 sniff(prn = igmp_recv_cb, count = 1, lfilter = lambda p: IP in p and p[IP].dst in groups,
420 timeout = 3, opened_socket = recv_socket)
421 recv_socket.close()
422 return self.NEGATIVE_TRAFFIC_STATUS
423
424 def group_latency_check(self, groups):
425 tasks = []
426 self.send_igmp_leave(groups = groups)
427 join_state = IGMPProxyTestState(groups = groups)
428 tasks.append(threading.Thread(target=self.igmp_join_task, args = ('veth0', groups, join_state,)))
429 traffic_state = IGMPProxyTestState(groups = groups)
430 mcast_traffic = McastTraffic(groups, iface= 'veth2', cb = self.send_mcast_cb,
431 arg = traffic_state)
432 mcast_traffic.start()
433 tasks.append(threading.Thread(target=self.igmp_recv_task, args = ('veth0', groups, join_state)))
434 for t in tasks:
435 t.start()
436 for t in tasks:
437 t.join()
438
439 mcast_traffic.stop()
440 self.send_igmp_leave(groups = groups)
441 return
442
443 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
444 def test_igmpproxy_with_1group_join_latency(self):
445 groups = ['239.0.1.1']
446 df = defer.Deferred()
447 def igmp_1group_join_latency():
448 self.group_latency_check(groups)
449 df.callback(0)
450 reactor.callLater(0, igmp_1group_join_latency)
451 return df
452
453 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
454 def test_igmpproxy_with_2group_join_latency(self):
455 groups = [self.MGROUP1, self.MGROUP1]
456 df = defer.Deferred()
457 def igmp_2group_join_latency():
458 self.group_latency_check(groups)
459 df.callback(0)
460 reactor.callLater(0, igmp_2group_join_latency)
461 return df
462
463 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
464 def test_igmpproxy_with_Ngroup_join_latency(self):
465 groups = ['239.0.1.1', '240.0.1.1', '241.0.1.1', '242.0.1.1']
466 df = defer.Deferred()
467 def igmp_Ngroup_join_latency():
468 self.group_latency_check(groups)
469 df.callback(0)
470 reactor.callLater(0, igmp_Ngroup_join_latency)
471 return df
472
473 def test_igmpproxy_with_join_rover_all(self):
474 s = (224 << 24) | 1
475 #e = (225 << 24) | (255 << 16) | (255 << 16) | 255
476 e = (224 << 24) | 10
477 for i in xrange(s, e+1):
478 if i&0xff:
479 ip = '%d.%d.%d.%d'%((i>>24)&0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
480 self.send_igmp_join([ip], delay = 0)
481
482 @deferred(timeout=ROVER_TEST_TIMEOUT)
483 def test_igmpproxy_with_join_rover(self):
484 df = defer.Deferred()
485 iface = self.get_igmp_intf()
486 self.df = df
487 self.count = 0
488 self.timeout = 0
489 self.complete = False
490 def igmp_join_timer():
491 self.timeout += self.ROVER_JOIN_TIMEOUT
492 log_test.info('IGMP joins sent: %d' %self.count)
493 if self.timeout >= self.ROVER_TIMEOUT:
494 self.complete = True
495 reactor.callLater(self.ROVER_JOIN_TIMEOUT, igmp_join_timer)
496
497 reactor.callLater(self.ROVER_JOIN_TIMEOUT, igmp_join_timer)
498 self.start_channel = (224 << 24) | 1
499 self.end_channel = (224 << 24) | 200 #(225 << 24) | (255 << 16) | (255 << 16) | 255
500 self.current_channel = self.start_channel
501 def igmp_join_rover(self):
502 #e = (224 << 24) | 10
503 chan = self.current_channel
504 self.current_channel += 1
505 if self.current_channel >= self.end_channel:
506 chan = self.current_channel = self.start_channel
507 if chan&0xff:
508 ip = '%d.%d.%d.%d'%((chan>>24)&0xff, (chan>>16)&0xff, (chan>>8)&0xff, chan&0xff)
509 self.send_igmp_join([ip], delay = 0, ssm_load = False, iface = iface)
510 self.count += 1
511 if self.complete == True:
512 log_test.info('%d IGMP joins sent in %d seconds over %s' %(self.count, self.timeout, iface))
513 self.df.callback(0)
514 else:
515 reactor.callLater(0, igmp_join_rover, self)
516 reactor.callLater(0, igmp_join_rover, self)
517 return df
518
519 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
520 def test_igmpproxy_with_query(self):
521 groups = ['224.0.0.1'] ##igmp query group
522 self.onos_ssm_table_load(groups)
523 df = defer.Deferred()
524 self.df = df
525 self.recv_socket = L2Socket(iface = 'veth0', type = ETH_P_IP)
526
527 def igmp_query_timeout():
528 def igmp_query_cb(pkt):
529 log_test.info('received igmp query packet is %s'%pkt.show())
530 log_test.info('Got IGMP query packet from %s for %s' %(pkt[IP].src, pkt[IP].dst))
531 assert_equal(pkt[IP].dst, '224.0.0.1')
532 sniff(prn = igmp_query_cb, count=1, lfilter = lambda p: IP in p and p[IP].dst in groups,
533 opened_socket = self.recv_socket)
534 self.recv_socket.close()
535 self.df.callback(0)
536
537 #self.send_igmp_join(groups)
538 self.test_timer = reactor.callLater(self.IGMP_QUERY_TIMEOUT, igmp_query_timeout)
539 return df