blob: 3c4ff45a5fb0dfceb760a942b0607264cd903b08 [file] [log] [blame]
Chetan Gaonkerb424ff82016-03-08 12:11:12 -08001import threading
2import sys
3import os
4import time
5import monotonic
6import random
7from scapy.all import *
8from McastTraffic import *
9from IGMP import *
10from OnosCtrl import OnosCtrl
11from nose.tools import *
12log.setLevel('DEBUG')
13
14conf.verb = 0
15
16class IgmpChannel:
17
18 IGMP_DST_MAC = "01:00:5e:00:01:01"
19 IGMP_SRC_MAC = "5a:e1:ac:ec:4d:a1"
20 IP_SRC = '1.2.3.4'
21 IP_DST = '224.0.1.1'
22 igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
23 igmp_ip = IP(dst = IP_DST, src = IP_SRC)
24
25 def __init__(self, iface = 'veth0', src_list = ['1.2.3.4'], delay = 2):
26 self.iface = iface
27 self.src_list = src_list
28 self.delay = delay
29 self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
30 self.onos_ctrl.activate()
31
Chetan Gaonkercbe79642016-03-09 17:45:58 -080032 def igmp_load_ssm_config(self, groups):
Chetan Gaonkerb424ff82016-03-08 12:11:12 -080033 self.ssm_table_load(groups)
Chetan Gaonkercbe79642016-03-09 17:45:58 -080034
35 def igmp_join(self, groups, ssm_load = False):
36 if ssm_load:
37 self.igmp_load_ssm_config(groups)
Chetan Gaonkerb424ff82016-03-08 12:11:12 -080038 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
39 gaddr='224.0.1.1')
40 for g in groups:
41 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr=g)
42 gr.sources = self.src_list
43 igmp.grps.append(gr)
44
45 pkt = self.igmp_eth/self.igmp_ip/igmp
46 IGMPv3.fixup(pkt)
47 sendp(pkt, iface=self.iface)
48 if self.delay != 0:
49 time.sleep(self.delay)
50
51 def igmp_leave(self, groups):
52 igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
53 gaddr='224.0.1.1')
54 for g in groups:
55 gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr=g)
56 gr.sources = self.src_list
57 igmp.grps.append(gr)
58
59 pkt = self.igmp_eth/self.igmp_ip/igmp
60 IGMPv3.fixup(pkt)
61 sendp(pkt, iface = self.iface)
62 if self.delay != 0:
63 time.sleep(self.delay)
64
65 def onos_load_config(self, config):
66 status, code = self.onos_ctrl.config(config)
67 if status is False:
68 log.info('JSON config request for app %s returned status %d' %code)
69 time.sleep(2)
70
71 def ssm_table_load(self, groups):
72 ssm_dict = {'apps' : { 'org.onosproject.igmp' : { 'ssmTranslate' : [] } } }
73 ssm_xlate_list = ssm_dict['apps']['org.onosproject.igmp']['ssmTranslate']
74 for g in groups:
75 for s in self.src_list:
76 d = {}
77 d['source'] = s
78 d['group'] = g
79 ssm_xlate_list.append(d)
80 self.onos_load_config(ssm_dict)
81
82class Channels(IgmpChannel):
83 Stopped = 0
84 Started = 1
85 Idle = 0
86 Joined = 1
87 def __init__(self, num, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
88 self.num = num
89 self.channels = self.generate(self.num)
Chetan Gaonkercbe79642016-03-09 17:45:58 -080090 self.group_channel_map = {}
91 for i in range(self.num):
92 self.group_channel_map[self.channels[i]] = i
Chetan Gaonkerb424ff82016-03-08 12:11:12 -080093 self.state = self.Stopped
94 self.streams = None
95 self.channel_states = {}
96 self.last_chan = None
97 self.recv_sock = L2Socket(iface = iface, type = ETH_P_IP)
98 self.iface_mcast = iface_mcast
99 self.mcast_cb = mcast_cb
100 for c in range(self.num):
101 self.channel_states[c] = [self.Idle]
102
103 IgmpChannel.__init__(self, iface=iface)
104
105 def generate(self, num):
106 start = (224 << 24) | 1
107 end = start + num + num/256
108 group_addrs = []
109 for i in range(start, end):
110 if i&255:
111 g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
112 log.debug('Adding group %s' %g)
113 group_addrs.append(g)
114 return group_addrs
115
116 def start(self):
117 if self.state == self.Stopped:
118 if self.streams:
119 self.streams.stop()
120 self.streams = McastTraffic(self.channels, iface=self.iface_mcast, cb = self.mcast_cb)
121 self.streams.start()
122 self.state = self.Started
123
124 def join(self, chan = None):
125 if chan is None:
126 chan = random.randint(0, self.num)
127 else:
128 if chan >= self.num:
129 chan = 0
130
131 if self.get_state(chan) == self.Joined:
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800132 return chan, 0
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800133
134 groups = [self.channels[chan]]
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800135 #load the ssm table first
136 self.igmp_load_ssm_config(groups)
137 join_start = monotonic.monotonic()
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800138 self.igmp_join(groups)
139 self.set_state(chan, self.Joined)
140 self.last_chan = chan
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800141 return chan, join_start
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800142
143 def leave(self, chan):
144 if chan is None:
145 chan = self.last_chan
146 if chan is None or chan >= self.num:
147 return False
148 if self.get_state(chan) != self.Joined:
149 return False
150 groups = [self.channels[chan]]
151 self.igmp_leave(groups)
152 self.set_state(chan, self.Idle)
153 if chan == self.last_chan:
154 self.last_chan = None
155 return True
156
157 def join_next(self, chan = None):
158 if chan is None:
159 chan = self.last_chan
160 if chan is None:
161 return None
162 leave = chan
163 join = chan+1
164 else:
165 leave = chan - 1
166 join = chan
167
168 if join >= self.num:
169 join = 0
170
171 if leave >= 0 and leave != join:
172 self.leave(leave)
173
174 return self.join(join)
175
176 def jump(self):
177 chan = self.last_chan
178 if chan is not None:
179 self.leave(chan)
180 s_next = chan
181 else:
182 s_next = 0
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800183 if self.num - s_next < 2:
184 s_next = 0
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800185 chan = random.randint(s_next, self.num)
186 return self.join(chan)
187
188 def gaddr(self, chan):
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800189 '''Return the group address for a channel'''
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800190 if chan >= self.num:
191 return None
192 return self.channels[chan]
193
Chetan Gaonkercbe79642016-03-09 17:45:58 -0800194 def caddr(self, group):
195 '''Return a channel given a group addr'''
196 if self.group_channel_map.has_key(group):
197 return self.group_channel_map[group]
198 return None
199
Chetan Gaonkerb424ff82016-03-08 12:11:12 -0800200 def recv_cb(self, pkt):
201 '''Default channel receive callback'''
202 log.debug('Received packet from source %s, destination %s' %(pkt[IP].src, pkt[IP].dst))
203 send_time = float(pkt[IP].payload.load)
204 recv_time = monotonic.monotonic()
205 log.debug('Packet received in %.3f usecs' %(recv_time - send_time))
206
207 def recv(self, chan, cb = None, count = 1):
208 if chan is None:
209 return None
210 if type(chan) == type([]) or type(chan) == type(()):
211 channel_list=filter(lambda c: c < self.num, chan)
212 groups = map(lambda c: self.gaddr(c), channel_list)
213 else:
214 groups = (self.gaddr(chan),)
215 if cb is None:
216 cb = self.recv_cb
217 sniff(prn = cb, count=count, lfilter = lambda p: p[IP].dst in groups, opened_socket = self.recv_sock)
218
219 def stop(self):
220 if self.streams:
221 self.streams.stop()
222 self.state = self.Stopped
223
224 def get_state(self, chan):
225 return self.channel_states[chan][0]
226
227 def set_state(self, chan, state):
228 self.channel_states[chan][0] = state
229
230if __name__ == '__main__':
231 num = 2
232 channels = Channels(num)
233 channels.start()
234 for i in range(num):
235 channels.join(i)
236 for i in range(num):
237 channels.recv(i)
238 for i in range(num):
239 channels.leave(i)
240 channels.stop()