blob: bdf7748214defec55b0c7c5cb87d7663eed845f5 [file] [log] [blame]
Matteo Scandolo48d3d2d2017-08-08 13:05:27 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Chetan Gaonker26ae67e2017-07-18 19:58:30 +000017#
18# Copyright 2016-present Ciena Corporation
19#
20# Licensed under the Apache License, Version 2.0 (the "License");
21# you may not use this file except in compliance with the License.
22# You may obtain a copy of the License at
23#
24# http://www.apache.org/licenses/LICENSE-2.0
25#
26# Unless required by applicable law or agreed to in writing, software
27# distributed under the License is distributed on an "AS IS" BASIS,
28# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29# See the License for the specific language governing permissions and
30# limitations under the License.
31#
32from twisted.internet import defer
33from nose.tools import *
34from nose.twistedtools import reactor, deferred
35from scapy.all import *
36from select import select as socket_select
37import time, monotonic
38import os
39import random
40import threading
41from IGMP import *
42from McastTraffic import *
43from Stats import Stats
44from OnosCtrl import OnosCtrl
45from OltConfig import OltConfig
46from Channels import IgmpChannel
47from CordLogger import CordLogger
48from CordTestConfig import setup_module, teardown_module
49from CordTestUtils import log_test
50log_test.setLevel('INFO')
51
52class IGMPProxyTestState:
53
54 def __init__(self, groups = [], df = None, state = 0):
55 self.df = df
56 self.state = state
57 self.counter = 0
58 self.groups = groups
59 self.group_map = {} ##create a send/recv count map
60 for g in groups:
61 self.group_map[g] = (Stats(), Stats())
62
63 def update(self, group, tx = 0, rx = 0, t = 0):
64 self.counter += 1
65 index = 0 if rx == 0 else 1
66 v = tx if rx == 0 else rx
67 if self.group_map.has_key(group):
68 self.group_map[group][index].update(packets = v, t = t)
69
70 def update_state(self):
71 self.state = self.state ^ 1
72
73class igmpproxy_exchange(CordLogger):
74
75 V_INF1 = 'veth0'
76 MGROUP1 = '239.1.2.3'
77 MGROUP2 = '239.2.2.3'
78 MINVALIDGROUP1 = '255.255.255.255'
79 MINVALIDGROUP2 = '239.255.255.255'
80 MMACGROUP1 = "01:00:5e:01:02:03"
81 MMACGROUP2 = "01:00:5e:02:02:03"
82 IGMP_DST_MAC = "01:00:5e:00:00:16"
83 IGMP_SRC_MAC = "5a:e1:ac:ec:4d:a1"
84 IP_SRC = '1.2.3.4'
85 IP_DST = '224.0.0.22'
86 NEGATIVE_TRAFFIC_STATUS = 1
87 igmp_eth = Ether(dst = IGMP_DST_MAC, type = ETH_P_IP)
88 igmp_ip = IP(dst = IP_DST)
89 IGMP_TEST_TIMEOUT = 5
90 IGMP_QUERY_TIMEOUT = 60
91 MCAST_TRAFFIC_TIMEOUT = 20
92 PORT_TX_DEFAULT = 2
93 PORT_RX_DEFAULT = 1
94 max_packets = 100
95 app = 'org.opencord.igmpproxy'
96 olt_conf_file = os.getenv('OLT_CONFIG_FILE', os.path.join(os.path.dirname(os.path.realpath(__file__)), '../setup/olt_config.json'))
97 ROVER_TEST_TIMEOUT = 300 #3600*86
98 ROVER_TIMEOUT = (ROVER_TEST_TIMEOUT - 100)
99 ROVER_JOIN_TIMEOUT = 60
100 VOLTHA_ENABLED = bool(int(os.getenv('VOLTHA_ENABLED', 0)))
101
102 @classmethod
103 def setUpClass(cls):
104 cls.olt = OltConfig(olt_conf_file = cls.olt_conf_file)
105 cls.port_map, _ = cls.olt.olt_port_map()
106 if cls.VOLTHA_ENABLED is False:
107 OnosCtrl.config_device_driver()
108 OnosCtrl.cord_olt_config(cls.olt)
109 time.sleep(2)
110
111 @classmethod
112 def tearDownClass(cls):
113 if cls.VOLTHA_ENABLED is False:
114 OnosCtrl.config_device_driver(driver = 'ovs')
115
116 def setUp(self):
117 ''' Activate the igmp proxy app'''
118 super(igmp_exchange, self).setUp()
119 self.onos_ctrl = OnosCtrl(self.app)
120 self.onos_ctrl.activate()
121 self.igmp_channel = IgmpChannel()
122
123 def tearDown(self):
124 super(igmp_exchange, self).tearDown()
125
126 def onos_load_config(self, config):
127 log_test.info('onos load config is %s'%config)
128 status, code = OnosCtrl.config(config)
129 if status is False:
130 log_test.info('JSON request returned status %d' %code)
131 assert_equal(status, True)
132 time.sleep(2)
133
134 def onos_ssm_table_load(self, groups, src_list = ['1.2.3.4'],flag = False):
135 return
136 ssm_dict = {'apps' : { 'org.opencord.igmpproxy' : { 'ssmTranslate' : [] } } }
137 ssm_xlate_list = ssm_dict['apps']['org.opencord.igmpproxy']['ssmTranslate']
138 if flag: #to maintain seperate group-source pair.
139 for i in range(len(groups)):
140 d = {}
141 d['source'] = src_list[i] or '0.0.0.0'
142 d['group'] = groups[i]
143 ssm_xlate_list.append(d)
144 else:
145 for g in groups:
146 for s in src_list:
147 d = {}
148 d['source'] = s or '0.0.0.0'
149 d['group'] = g
150 ssm_xlate_list.append(d)
151 self.onos_load_config(ssm_dict)
152 cord_port_map = {}
153 for g in groups:
154 cord_port_map[g] = (self.PORT_TX_DEFAULT, self.PORT_RX_DEFAULT)
155 self.igmp_channel.cord_port_table_load(cord_port_map)
156 time.sleep(2)
157
158 def get_igmp_intf(self):
159 inst = os.getenv('TEST_INSTANCE', None)
160 if not inst:
161 return 'veth0'
162 inst = int(inst) + 1
163 if inst >= self.port_map['uplink']:
164 inst += 1
165 if self.port_map.has_key(inst):
166 return self.port_map[inst]
167 return 'veth0'
168
169 def igmp_verify_join(self, igmpStateList):
170 sendState, recvState = igmpStateList
171 ## check if the send is received for the groups
172 for g in sendState.groups:
173 tx_stats = sendState.group_map[g][0]
174 tx = tx_stats.count
175 assert_greater(tx, 0)
176 rx_stats = recvState.group_map[g][1]
177 rx = rx_stats.count
178 assert_greater(rx, 0)
179 log_test.info('Receive stats %s for group %s' %(rx_stats, g))
180
181 log_test.info('IGMP test verification success')
182
183 def igmp_verify_leave(self, igmpStateList, leave_groups):
184 sendState, recvState = igmpStateList[0], igmpStateList[1]
185 ## check if the send is received for the groups
186 for g in sendState.groups:
187 tx_stats = sendState.group_map[g][0]
188 rx_stats = recvState.group_map[g][1]
189 tx = tx_stats.count
190 rx = rx_stats.count
191 assert_greater(tx, 0)
192 if g not in leave_groups:
193 log_test.info('Received %d packets for group %s' %(rx, g))
194 for g in leave_groups:
195 rx = recvState.group_map[g][1].count
196 assert_equal(rx, 0)
197
198 log_test.info('IGMP test verification success')
199
200 def mcast_traffic_timer(self):
201 log_test.info('MCAST traffic timer expiry')
202 self.mcastTraffic.stopReceives()
203
204 def send_mcast_cb(self, send_state):
205 for g in send_state.groups:
206 send_state.update(g, tx = 1)
207 return 0
208
209 ##Runs in the context of twisted reactor thread
210 def igmp_recv(self, igmpState):
211 s = socket_select([self.recv_socket], [], [], 1.0)
212 if self.recv_socket in s[0]:
213 p = self.recv_socket.recv()
214 try:
215 send_time = float(p.payload.load)
216 recv_time = monotonic.monotonic()
217 except:
218 log_test.info('Unexpected Payload received: %s' %p.payload.load)
219 return 0
220 #log_test.info( 'Recv in %.6f secs' %(recv_time - send_time))
221 igmpState.update(p.dst, rx = 1, t = recv_time - send_time)
222 return 0
223
224 def send_igmp_join(self, groups, src_list = ['1.2.3.4'], record_type=IGMP_V3_GR_TYPE_INCLUDE,
225 ip_pkt = None, iface = 'veth0', ssm_load = False, delay = 1):
226 if ssm_load is True:
227 self.onos_ssm_table_load(groups, src_list)
228 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
229 gaddr=self.IP_DST)
230 for g in groups:
231 gr = IGMPv3gr(rtype= record_type, mcaddr=g)
232 gr.sources = src_list
233 igmp.grps.append(gr)
234 if ip_pkt is None:
235 ip_pkt = self.igmp_eth/self.igmp_ip
236 pkt = ip_pkt/igmp
237 IGMPv3.fixup(pkt)
238 sendp(pkt, iface=iface)
239 if delay != 0:
240 time.sleep(delay)
241
242 def send_igmp_join_recvQuery(self, groups, rec_queryCount = None, src_list = ['1.2.3.4'], ip_pkt = None, iface = 'veth0', delay = 2):
243 self.onos_ssm_table_load(groups, src_list)
244 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
245 gaddr=self.IP_DST)
246 for g in groups:
247 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr=g)
248 gr.sources = src_list
249 gr.sources = src_list
250 igmp.grps.append(gr)
251 if ip_pkt is None:
252 ip_pkt = self.igmp_eth/self.igmp_ip
253 pkt = ip_pkt/igmp
254 IGMPv3.fixup(pkt)
255 if rec_queryCount == None:
256 log_test.info('Sending IGMP join for group %s and waiting for one query packet and printing the packet' %groups)
257 resp = srp1(pkt, iface=iface)
258 else:
259 log_test.info('Sending IGMP join for group %s and waiting for periodic query packets and printing one packet' %groups)
260 resp = srp1(pkt, iface=iface)
261# resp = srp1(pkt, iface=iface) if rec_queryCount else srp3(pkt, iface=iface)
262 resp[0].summary()
263 log_test.info('Sent IGMP join for group %s and received a query packet and printing packet' %groups)
264 if delay != 0:
265 time.sleep(delay)
266
267 def send_igmp_leave(self, groups, src_list = ['1.2.3.4'], ip_pkt = None, iface = 'veth0', delay = 2):
268 log_test.info('entering into igmp leave function')
269 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
270 gaddr=self.IP_DST)
271 for g in groups:
272 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr=g)
273 gr.sources = src_list
274 igmp.grps.append(gr)
275 if ip_pkt is None:
276 ip_pkt = self.igmp_eth/self.igmp_ip
277 pkt = ip_pkt/igmp
278 IGMPv3.fixup(pkt)
279 sendp(pkt, iface = iface)
280 if delay != 0:
281 time.sleep(delay)
282
283 @deferred(timeout=MCAST_TRAFFIC_TIMEOUT+10)
284 def test_igmpproxy_with_join_and_verify_traffic(self):
285 groups = [self.MGROUP1, self.MGROUP1]
286 self.onos_ssm_table_load(groups)
287 df = defer.Deferred()
288 igmpState = IGMPProxyTestState(groups = groups, df = df)
289 igmpStateRecv = IGMPProxyTestState(groups = groups, df = df)
290 igmpStateList = (igmpState, igmpStateRecv)
291 tx_intf = self.port_map[self.PORT_TX_DEFAULT]
292 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
293 mcastTraffic = McastTraffic(groups, iface= tx_intf, cb = self.send_mcast_cb, arg = igmpState)
294 self.df = df
295 self.mcastTraffic = mcastTraffic
296 self.recv_socket = L3PacketSocket(iface = rx_intf, type = ETH_P_IP)
297
298 def igmp_srp_task(stateList):
299 igmpSendState, igmpRecvState = stateList
300 if not mcastTraffic.isRecvStopped():
301 self.igmp_recv(igmpRecvState)
302 reactor.callLater(0, igmp_srp_task, stateList)
303 else:
304 self.mcastTraffic.stop()
305 #log_test.info('Sending IGMP leave for groups: %s' %groups)
306 self.send_igmp_leave(groups, iface = rx_intf, delay = 2)
307 self.recv_socket.close()
308 self.igmp_verify_join(stateList)
309 self.df.callback(0)
310
311 self.send_igmp_join(groups, iface = rx_intf)
312 mcastTraffic.start()
313 self.test_timer = reactor.callLater(self.MCAST_TRAFFIC_TIMEOUT, self.mcast_traffic_timer)
314 reactor.callLater(0, igmp_srp_task, igmpStateList)
315 return df
316
317 @deferred(timeout=MCAST_TRAFFIC_TIMEOUT+40)
318 def test_igmpproxy_with_leave_and_verify_traffic(self):
319 groups = [self.MGROUP1]
320 leave_groups = [self.MGROUP1]
321 self.onos_ssm_table_load(groups)
322 df = defer.Deferred()
323 igmpState = IGMPProxyTestState(groups = groups, df = df)
324 IGMPProxyTestState(groups = groups, df = df)
325 tx_intf = self.port_map[self.PORT_TX_DEFAULT]
326 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
327 mcastTraffic = McastTraffic(groups, iface= tx_intf, cb = self.send_mcast_cb,
328 arg = igmpState)
329 self.df = df
330 self.mcastTraffic = mcastTraffic
331 self.recv_socket = L3PacketSocket(iface = rx_intf, type = ETH_P_IP)
332
333 mcastTraffic.start()
334 self.send_igmp_join(groups, iface = rx_intf)
335 time.sleep(5)
336 self.send_igmp_leave(leave_groups, delay = 3, iface = rx_intf)
337 time.sleep(10)
338 join_state = IGMPProxyTestState(groups = leave_groups)
339 status = self.igmp_not_recv_task(rx_intf, leave_groups, join_state)
340 log_test.info('verified status for igmp recv task %s'%status)
341 assert status == 1 , 'EXPECTED RESULT'
342 self.df.callback(0)
343 return df
344
345 @deferred(timeout=100)
346 def test_igmpproxy_with_leave_and_join_loop(self):
347 self.groups = ['226.0.1.1', '227.0.0.1', '228.0.0.1', '229.0.0.1', '230.0.0.1' ]
348 self.src_list = ['3.4.5.6', '7.8.9.10']
349 self.onos_ssm_table_load(self.groups,src_list=self.src_list)
350 df = defer.Deferred()
351 self.df = df
352 self.iterations = 0
353 self.num_groups = len(self.groups)
354 self.MAX_TEST_ITERATIONS = 10
355 rx_intf = self.port_map[self.PORT_RX_DEFAULT]
356
357 def igmp_srp_task(v):
358 if self.iterations < self.MAX_TEST_ITERATIONS:
359 if v == 1:
360 ##join test
361 self.num_groups = random.randint(0, len(self.groups))
362 self.send_igmp_join(self.groups[:self.num_groups],
363 src_list = self.src_list,
364 iface = rx_intf, delay = 0)
365 else:
366 self.send_igmp_leave(self.groups[:self.num_groups],
367 src_list = self.src_list,
368 iface = rx_intf, delay = 0)
369 self.iterations += 1
370 v ^= 1
371 reactor.callLater(1.0 + 0.5*self.num_groups,
372 igmp_srp_task, v)
373 else:
374 self.df.callback(0)
375
376 reactor.callLater(0, igmp_srp_task, 1)
377 return df
378
379 def igmp_join_task(self, intf, groups, state, src_list = ['1.2.3.4']):
380 #self.onos_ssm_table_load(groups, src_list)
381 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
382 gaddr=self.IP_DST)
383 for g in groups:
384 gr = IGMPv3gr(rtype = IGMP_V3_GR_TYPE_INCLUDE, mcaddr = g)
385 gr.sources = src_list
386 igmp.grps.append(gr)
387
388 for g in groups:
389 state.group_map[g][0].update(1, t = monotonic.monotonic())
390
391 pkt = self.igmp_eth/self.igmp_ip/igmp
392 IGMPv3.fixup(pkt)
393 sendp(pkt, iface=intf)
394 log_test.debug('Returning from join task')
395
396 def igmp_recv_task(self, intf, groups, join_state):
397 recv_socket = L3PacketSocket(iface = intf, type = ETH_P_IP)
398 group_map = {}
399 for g in groups:
400 group_map[g] = [0,0]
401
402 log_test.info('Verifying join interface should receive multicast data')
403 while True:
404 p = recv_socket.recv()
405 if p.dst in groups and group_map[p.dst][0] == 0:
406 group_map[p.dst][0] += 1
407 group_map[p.dst][1] = monotonic.monotonic()
408 c = 0
409 for g in groups:
410 c += group_map[g][0]
411 if c == len(groups):
412 break
413 for g in groups:
414 join_start = join_state.group_map[g][0].start
415 recv_time = group_map[g][1] * 1000000
416 delta = (recv_time - join_start)
417 log_test.info('Join for group %s received in %.3f usecs' %
418 (g, delta))
419
420 recv_socket.close()
421 log_test.debug('Returning from recv task')
422
423 def igmp_not_recv_task(self, intf, groups, join_state):
424 log_test.info('Entering igmp not recv task loop')
425 recv_socket = L2Socket(iface = intf, type = ETH_P_IP)
426 group_map = {}
427 for g in groups:
428 group_map[g] = [0,0]
429
430 log_test.info('Verifying join interface, should not receive any multicast data')
431 self.NEGATIVE_TRAFFIC_STATUS = 1
432 def igmp_recv_cb(pkt):
433 log_test.info('Multicast packet %s received for left groups %s' %(pkt[IP].dst, groups))
434 self.NEGATIVE_TRAFFIC_STATUS = 2
435 sniff(prn = igmp_recv_cb, count = 1, lfilter = lambda p: IP in p and p[IP].dst in groups,
436 timeout = 3, opened_socket = recv_socket)
437 recv_socket.close()
438 return self.NEGATIVE_TRAFFIC_STATUS
439
440 def group_latency_check(self, groups):
441 tasks = []
442 self.send_igmp_leave(groups = groups)
443 join_state = IGMPProxyTestState(groups = groups)
444 tasks.append(threading.Thread(target=self.igmp_join_task, args = ('veth0', groups, join_state,)))
445 traffic_state = IGMPProxyTestState(groups = groups)
446 mcast_traffic = McastTraffic(groups, iface= 'veth2', cb = self.send_mcast_cb,
447 arg = traffic_state)
448 mcast_traffic.start()
449 tasks.append(threading.Thread(target=self.igmp_recv_task, args = ('veth0', groups, join_state)))
450 for t in tasks:
451 t.start()
452 for t in tasks:
453 t.join()
454
455 mcast_traffic.stop()
456 self.send_igmp_leave(groups = groups)
457 return
458
459 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
460 def test_igmpproxy_with_1group_join_latency(self):
461 groups = ['239.0.1.1']
462 df = defer.Deferred()
463 def igmp_1group_join_latency():
464 self.group_latency_check(groups)
465 df.callback(0)
466 reactor.callLater(0, igmp_1group_join_latency)
467 return df
468
469 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
470 def test_igmpproxy_with_2group_join_latency(self):
471 groups = [self.MGROUP1, self.MGROUP1]
472 df = defer.Deferred()
473 def igmp_2group_join_latency():
474 self.group_latency_check(groups)
475 df.callback(0)
476 reactor.callLater(0, igmp_2group_join_latency)
477 return df
478
479 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
480 def test_igmpproxy_with_Ngroup_join_latency(self):
481 groups = ['239.0.1.1', '240.0.1.1', '241.0.1.1', '242.0.1.1']
482 df = defer.Deferred()
483 def igmp_Ngroup_join_latency():
484 self.group_latency_check(groups)
485 df.callback(0)
486 reactor.callLater(0, igmp_Ngroup_join_latency)
487 return df
488
489 def test_igmpproxy_with_join_rover_all(self):
490 s = (224 << 24) | 1
491 #e = (225 << 24) | (255 << 16) | (255 << 16) | 255
492 e = (224 << 24) | 10
493 for i in xrange(s, e+1):
494 if i&0xff:
495 ip = '%d.%d.%d.%d'%((i>>24)&0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
496 self.send_igmp_join([ip], delay = 0)
497
498 @deferred(timeout=ROVER_TEST_TIMEOUT)
499 def test_igmpproxy_with_join_rover(self):
500 df = defer.Deferred()
501 iface = self.get_igmp_intf()
502 self.df = df
503 self.count = 0
504 self.timeout = 0
505 self.complete = False
506 def igmp_join_timer():
507 self.timeout += self.ROVER_JOIN_TIMEOUT
508 log_test.info('IGMP joins sent: %d' %self.count)
509 if self.timeout >= self.ROVER_TIMEOUT:
510 self.complete = True
511 reactor.callLater(self.ROVER_JOIN_TIMEOUT, igmp_join_timer)
512
513 reactor.callLater(self.ROVER_JOIN_TIMEOUT, igmp_join_timer)
514 self.start_channel = (224 << 24) | 1
515 self.end_channel = (224 << 24) | 200 #(225 << 24) | (255 << 16) | (255 << 16) | 255
516 self.current_channel = self.start_channel
517 def igmp_join_rover(self):
518 #e = (224 << 24) | 10
519 chan = self.current_channel
520 self.current_channel += 1
521 if self.current_channel >= self.end_channel:
522 chan = self.current_channel = self.start_channel
523 if chan&0xff:
524 ip = '%d.%d.%d.%d'%((chan>>24)&0xff, (chan>>16)&0xff, (chan>>8)&0xff, chan&0xff)
525 self.send_igmp_join([ip], delay = 0, ssm_load = False, iface = iface)
526 self.count += 1
527 if self.complete == True:
528 log_test.info('%d IGMP joins sent in %d seconds over %s' %(self.count, self.timeout, iface))
529 self.df.callback(0)
530 else:
531 reactor.callLater(0, igmp_join_rover, self)
532 reactor.callLater(0, igmp_join_rover, self)
533 return df
534
535 @deferred(timeout=IGMP_QUERY_TIMEOUT + 10)
536 def test_igmpproxy_with_query(self):
537 groups = ['224.0.0.1'] ##igmp query group
538 self.onos_ssm_table_load(groups)
539 df = defer.Deferred()
540 self.df = df
541 self.recv_socket = L2Socket(iface = 'veth0', type = ETH_P_IP)
542
543 def igmp_query_timeout():
544 def igmp_query_cb(pkt):
545 log_test.info('received igmp query packet is %s'%pkt.show())
546 log_test.info('Got IGMP query packet from %s for %s' %(pkt[IP].src, pkt[IP].dst))
547 assert_equal(pkt[IP].dst, '224.0.0.1')
548 sniff(prn = igmp_query_cb, count=1, lfilter = lambda p: IP in p and p[IP].dst in groups,
549 opened_socket = self.recv_socket)
550 self.recv_socket.close()
551 self.df.callback(0)
552
553 #self.send_igmp_join(groups)
554 self.test_timer = reactor.callLater(self.IGMP_QUERY_TIMEOUT, igmp_query_timeout)
555 return df