blob: b888ba236bad88aaa73d78a9694f295a745c5738 [file] [log] [blame]
Zsolt Haraszti656ecc62016-12-28 15:08:23 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Simple PON Simulator which would not be needed if openvswitch could do
19802.1ad (QinQ), which it cannot (the reason is beyond me), or if CPQD could
20handle 0-tagged packets (no comment).
21"""
22import structlog
23from scapy.layers.inet import IP, UDP
24from scapy.layers.l2 import Ether, Dot1Q
25from scapy.packet import Packet
26
27from common.frameio.frameio import hexify
28from voltha.protos import third_party
Sergio Slobodrian98eff412017-03-15 14:46:30 -040029from voltha.protos.ponsim_pb2 import PonSimMetrics, PonSimPortMetrics, \
30PonSimPacketCounter
31#from voltha.protos.ponsim_pb2 import
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080032from voltha.core.flow_decomposer import *
Sergio Slobodrian98eff412017-03-15 14:46:30 -040033from twisted.internet.task import LoopingCall
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080034_ = third_party
35
36
37def ipv4int2str(ipv4int):
38 return '{}.{}.{}.{}'.format(
39 (ipv4int >> 24) & 0xff,
40 (ipv4int >> 16) & 0xff,
41 (ipv4int >> 8) & 0xff,
42 ipv4int & 0xff
43 )
44
45
Sergio Slobodrian98eff412017-03-15 14:46:30 -040046class FrameIOCounter(object):
47 class SingleFrameCounter(object):
48 def __init__(self,name,min,max):
49 # Currently there are 2 values, one for the PON interface (port 1)
50 # and one for the Network Interface (port 2). This can be extended if
51 # the virtual devices extend the number of ports.
52 self.value = [0,0] #{PON,NI}
53 self.name = name
54 self.min = min
55 self.max = max
56
57 def __init__(self, device):
58 self.device = device
59 self.tx_counters = dict (
60 tx_64=self.SingleFrameCounter("tx_64", 1, 64),
61 tx_65_127=self.SingleFrameCounter("tx_65_127", 65, 127),
62 tx_128_255=self.SingleFrameCounter("tx_128_255", 128, 255),
63 tx_256_511=self.SingleFrameCounter("tx_256_511", 256, 511),
64 tx_512_1023=self.SingleFrameCounter("tx_512_1023", 512, 1024),
65 tx_1024_1518=self.SingleFrameCounter("tx_1024_1518", 1024, 1518),
66 tx_1519_9k=self.SingleFrameCounter("tx_1519_9k", 1519, 9216),
67 )
68 self.rx_counters = dict(
69 rx_64=self.SingleFrameCounter("rx_64", 1, 64),
70 rx_65_127=self.SingleFrameCounter("rx_65_127", 65, 127),
71 rx_128_255=self.SingleFrameCounter("rx_128_255", 128, 255),
72 rx_256_511=self.SingleFrameCounter("rx_256_511", 256, 511),
73 rx_512_1023=self.SingleFrameCounter("rx_512_1023", 512, 1024),
74 rx_1024_1518=self.SingleFrameCounter("rx_1024_1518", 1024, 1518),
75 rx_1519_9k=self.SingleFrameCounter("rx_1519_9k", 1519, 9216)
76 )
77
78 def count_rx_frame(self, port, size):
79 log.info("counting-rx-frame", size=size, port=port)
80 for k,v in self.rx_counters.iteritems():
81 if size >= v.min and size <= v.max:
82 self.rx_counters[k].value[port-1] += 1
83 return
84 log.warn("unsupported-packet-size", size=size)
85
86 def count_tx_frame(self, port, size):
87 for k, v in self.tx_counters.iteritems():
88 if size >= v.min and size <= v.max:
89 self.tx_counters[k].value[port-1] += 1
90 return
91 log.warn("unsupported-packet-size", size=size)
92
93 def log_counts(self):
94 rx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.rx_counters.values()]
95 tx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.tx_counters.values()]
96 log.info("rx-counts",rx_ct_list=rx_ct_list)
97 log.info("tx-counts",tx_ct_list=tx_ct_list)
98
99 def make_proto(self):
100 sim_metrics = PonSimMetrics(
101 device = self.device
102 )
103 pon_port_metrics = PonSimPortMetrics (
104 port_name = "pon"
105 )
106 ni_port_metrics = PonSimPortMetrics (
107 port_name = "nni"
108 )
109 for c in sorted(self.rx_counters):
110 ctr = self.rx_counters[c]
111 pon_port_metrics.packets.extend([
112 PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
113 ])
114 # Since they're identical keys, save some time and cheat
115 ni_port_metrics.packets.extend([
116 PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
117 ])
118
119 for c in sorted(self.tx_counters):
120 ctr = self.tx_counters[c]
121 pon_port_metrics.packets.extend([
122 PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
123 ])
124 # Since they're identical keys, save some time and cheat
125 ni_port_metrics.packets.extend([
126 PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
127 ])
128 sim_metrics.metrics.extend([pon_port_metrics])
129 sim_metrics.metrics.extend([ni_port_metrics])
130
131 return sim_metrics
132
133
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800134class SimDevice(object):
135
136 def __init__(self, name, logical_port_no):
137 self.name = name
138 self.logical_port_no = logical_port_no
139 self.links = dict()
140 self.flows = list()
141 self.log = structlog.get_logger(name=name,
142 logical_port_no=logical_port_no)
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400143 self.counter = FrameIOCounter(name)
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800144
145 def link(self, port, egress_fun):
146 self.links.setdefault(port, []).append(egress_fun)
147
148 def ingress(self, port, frame):
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400149 self.log.debug('ingress', ingress_port=port, name=self.name)
150 self.counter.count_rx_frame(port, len(frame["Ether"].payload))
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800151 outcome = self.process_frame(port, frame)
152 if outcome is not None:
153 egress_port, egress_frame = outcome
154 forwarded = 0
155 links = self.links.get(egress_port)
156 if links is not None:
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400157 self.counter.count_tx_frame(egress_port, len(egress_frame["Ether"].payload))
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800158 for fun in links:
159 forwarded += 1
160 self.log.debug('forwarding', egress_port=egress_port)
161 fun(egress_port, egress_frame)
162 if not forwarded:
163 self.log.debug('no-one-to-forward-to', egress_port=egress_port)
164 else:
165 self.log.debug('dropped')
166
167 def install_flows(self, flows):
168 # store flows in precedence order so we can roll down on frame arrival
169 self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
170
171 def process_frame(self, ingress_port, ingress_frame):
172 for flow in self.flows:
173 if self.is_match(flow, ingress_port, ingress_frame):
174 egress_port, egress_frame = self.process_actions(
175 flow, ingress_frame)
176 return egress_port, egress_frame
177 return None
178
179 @staticmethod
180 def is_match(flow, ingress_port, frame):
181
182 def get_non_shim_ether_type(f):
183 if f.haslayer(Dot1Q):
184 f = f.getlayer(Dot1Q)
185 return f.type
186
187 def get_vlan_pcp(f):
188 if f.haslayer(Dot1Q):
189 return f.getlayer(Dot1Q).prio
190
191 def get_ip_proto(f):
192 if f.haslayer(IP):
193 return f.getlayer(IP).proto
194
195 def get_ipv4_dst(f):
196 if f.haslayer(IP):
197 return f.getlayer(IP).dst
198
Zsolt Haraszti3578a1c2017-01-10 15:29:02 -0800199 def get_udp_src(f):
200 if f.haslayer(UDP):
201 return f.getlayer(UDP).sport
202
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800203 def get_udp_dst(f):
204 if f.haslayer(UDP):
205 return f.getlayer(UDP).dport
206
207 for field in get_ofb_fields(flow):
208
209 if field.type == IN_PORT:
210 if field.port != ingress_port:
211 return False
212
213 elif field.type == ETH_TYPE:
214 if field.eth_type != get_non_shim_ether_type(frame):
215 return False
216
217 elif field.type == IP_PROTO:
218 if field.ip_proto != get_ip_proto(frame):
219 return False
220
221 elif field.type == VLAN_VID:
222 expected_vlan = field.vlan_vid
223 tagged = frame.haslayer(Dot1Q)
224 if bool(expected_vlan & 4096) != bool(tagged):
225 return False
226 if tagged:
227 actual_vid = frame.getlayer(Dot1Q).vlan
228 if actual_vid != expected_vlan & 4095:
229 return False
230
231 elif field.type == VLAN_PCP:
232 if field.vlan_pcp != get_vlan_pcp(frame):
233 return False
234
235 elif field.type == IPV4_DST:
236 if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
237 return False
238
Zsolt Haraszti3578a1c2017-01-10 15:29:02 -0800239 elif field.type == UDP_SRC:
240 if field.udp_src != get_udp_src(frame):
241 return False
242
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800243 elif field.type == UDP_DST:
Zsolt Haraszti6a5107c2017-01-09 23:42:41 -0800244 if field.udp_dst != get_udp_dst(frame):
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800245 return False
246
Zsolt Haraszti6a5107c2017-01-09 23:42:41 -0800247 elif field.type == METADATA:
248 pass # safe to ignore
249
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800250 else:
251 raise NotImplementedError('field.type=%d' % field.type)
252
253 return True
254
255 @staticmethod
256 def process_actions(flow, frame):
257 egress_port = None
258 for action in get_actions(flow):
259
260 if action.type == OUTPUT:
261 egress_port = action.output.port
262
263 elif action.type == POP_VLAN:
264 if frame.haslayer(Dot1Q):
265 shim = frame.getlayer(Dot1Q)
266 frame = Ether(
267 src=frame.src,
268 dst=frame.dst,
269 type=shim.type) / shim.payload
270
271 elif action.type == PUSH_VLAN:
272 frame = (
273 Ether(src=frame.src, dst=frame.dst,
274 type=action.push.ethertype) /
275 Dot1Q(type=frame.type) /
276 frame.payload
277 )
278
279 elif action.type == SET_FIELD:
280 assert (action.set_field.field.oxm_class ==
281 ofp.OFPXMC_OPENFLOW_BASIC)
282 field = action.set_field.field.ofb_field
283
284 if field.type == VLAN_VID:
285 shim = frame.getlayer(Dot1Q)
286 shim.vlan = field.vlan_vid & 4095
287
288 elif field.type == VLAN_PCP:
289 shim = frame.getlayer(Dot1Q)
290 shim.prio = field.vlan_pcp
291
292 else:
293 raise NotImplementedError('set_field.field.type=%d'
294 % field.type)
295
296 else:
297 raise NotImplementedError('action.type=%d' % action.type)
298
299 return egress_port, frame
300
301
302class PonSim(object):
303
304 def __init__(self, onus, egress_fun):
305 self.egress_fun = egress_fun
306
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400307 self.log = structlog.get_logger()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800308 # Create OLT and hook NNI port up for egress
309 self.olt = SimDevice('olt', 0)
310 self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
311 self.devices = dict()
312 self.devices[0] = self.olt
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400313 # TODO: This can be removed, it's for debugging purposes
314 self.lc = LoopingCall(self.olt.counter.log_counts)
315 self.lc.start(90) # To correlate with Kafka
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800316
317 # Create ONUs of the requested number and hook them up with OLT
318 # and with egress fun
319 def mk_egress_fun(port_no):
320 return lambda _, frame: self.egress_fun(port_no, frame)
321
322 def mk_onu_ingress(onu):
323 return lambda _, frame: onu.ingress(1, frame)
324
325 for i in range(onus):
326 port_no = 128 + i
327 onu = SimDevice('onu%d' % i, port_no)
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400328 onu.link(1, lambda _, frame: self.olt.ingress(1, frame)) # Send to the OLT
329 onu.link(2, mk_egress_fun(port_no)) # Send from the ONU to the world
330 self.olt.link(1, mk_onu_ingress(onu)) # Internal send to the ONU
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800331 self.devices[port_no] = onu
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400332 for d in self.devices:
333 self.log.info("pon-sim-init", port=d, name=self.devices[d].name,
334 links=self.devices[d].links)
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800335
336 def get_ports(self):
337 return sorted(self.devices.keys())
338
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400339 def get_stats(self):
340 return self.olt.counter.make_proto()
341
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800342 def olt_install_flows(self, flows):
343 self.olt.install_flows(flows)
344
345 def onu_install_flows(self, onu_port, flows):
346 self.devices[onu_port].install_flows(flows)
347
348 def ingress(self, port, frame):
349 if not isinstance(frame, Packet):
350 frame = Ether(frame)
351 self.devices[port].ingress(2, frame)
352