blob: 21be9bfbfa6389cc3c41dc197f4f3927ab31e735 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Model that captures the current state of a logical device
19"""
Zsolt Haraszti66862032016-11-28 14:28:39 -080020from collections import OrderedDict
21
22import structlog
23
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080024from common.event_bus import EventBusClient
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080025from common.frameio.frameio import hexify
Zsolt Haraszti66862032016-11-28 14:28:39 -080026from voltha.core.config.config_proxy import CallbackType
27from voltha.core.device_graph import DeviceGraph
28from voltha.core.flow_decomposer import FlowDecomposer, \
29 flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080030 mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
Khen Nursimulud068d812017-03-06 11:44:18 -050031 push_vlan, mk_simple_flow_mod
Zsolt Haraszti66862032016-11-28 14:28:39 -080032from voltha.protos import third_party
33from voltha.protos import openflow_13_pb2 as ofp
34from voltha.protos.device_pb2 import Port
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080035from voltha.protos.logical_device_pb2 import LogicalPort
Zsolt Haraszti66862032016-11-28 14:28:39 -080036from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
Zsolt Haraszti66862032016-11-28 14:28:39 -080037
Zsolt Haraszti66862032016-11-28 14:28:39 -080038_ = third_party
39
40def mac_str_to_tuple(mac):
41 return tuple(int(d, 16) for d in mac.split(':'))
42
43
44class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
45
46 def __init__(self, core, logical_device):
Khen Nursimulu49792142017-03-17 12:34:05 -040047 try:
48 self.core = core
49 self.local_handler = core.get_local_handler()
50 self.logical_device_id = logical_device.id
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080051
Khen Nursimulu49792142017-03-17 12:34:05 -040052 self.root_proxy = core.get_proxy('/')
53 self.flows_proxy = core.get_proxy(
54 '/logical_devices/{}/flows'.format(logical_device.id))
55 self.groups_proxy = core.get_proxy(
56 '/logical_devices/{}/flow_groups'.format(logical_device.id))
57 self.self_proxy = core.get_proxy(
58 '/logical_devices/{}'.format(logical_device.id))
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080059
Khen Nursimulu49792142017-03-17 12:34:05 -040060 self.flows_proxy.register_callback(
61 CallbackType.POST_UPDATE, self._flow_table_updated)
62 self.groups_proxy.register_callback(
63 CallbackType.POST_UPDATE, self._group_table_updated)
64 self.self_proxy.register_callback(
65 CallbackType.POST_ADD, self._port_added)
66 self.self_proxy.register_callback(
67 CallbackType.POST_REMOVE, self._port_removed)
Zsolt Haraszti66862032016-11-28 14:28:39 -080068
Khen Nursimulu49792142017-03-17 12:34:05 -040069 self.port_proxy = {}
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080070
Khen Nursimulu49792142017-03-17 12:34:05 -040071 self.event_bus = EventBusClient()
72 self.packet_in_subscription = self.event_bus.subscribe(
73 topic='packet-in:{}'.format(logical_device.id),
74 callback=self.handle_packet_in_event)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080075
Khen Nursimulu49792142017-03-17 12:34:05 -040076 self.log = structlog.get_logger(logical_device_id=logical_device.id)
77
78 self._routes = None
79 except Exception, e:
80 self.log.exception('init-error', e=e)
Zsolt Haraszti91730da2016-12-12 12:54:38 -080081
Zsolt Haraszti66862032016-11-28 14:28:39 -080082 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080083 self.log.debug('starting')
84 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080085 return self
86
87 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080088 self.log.debug('stopping')
Khen Nursimuluc60afa12017-03-13 14:33:50 -040089 try:
90 self.flows_proxy.unregister_callback(
91 CallbackType.POST_UPDATE, self._flow_table_updated)
92 self.groups_proxy.unregister_callback(
93 CallbackType.POST_UPDATE, self._group_table_updated)
94 self.self_proxy.unregister_callback(
Khen Nursimulu49792142017-03-17 12:34:05 -040095 CallbackType.POST_ADD, self._port_added)
Khen Nursimuluc60afa12017-03-13 14:33:50 -040096 self.self_proxy.unregister_callback(
Khen Nursimulu49792142017-03-17 12:34:05 -040097 CallbackType.POST_REMOVE, self._port_removed)
Khen Nursimulud068d812017-03-06 11:44:18 -050098
Khen Nursimuluc60afa12017-03-13 14:33:50 -040099 # Remove subscription to the event bus
100 self.event_bus.unsubscribe(self.packet_in_subscription)
101 except Exception, e:
102 self.log.info('stop-exception', e=e)
103
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800104 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -0800105
106 def announce_flows_deleted(self, flows):
107 for f in flows:
108 self.announce_flow_deleted(f)
109
110 def announce_flow_deleted(self, flow):
111 if flow.flags & ofp.OFPFF_SEND_FLOW_REM:
112 raise NotImplementedError("announce_flow_deleted")
113
114 def signal_flow_mod_error(self, code, flow_mod):
115 pass # TODO
116
117 def signal_flow_removal(self, code, flow):
118 pass # TODO
119
120 def signal_group_mod_error(self, code, group_mod):
121 pass # TODO
122
123 def update_flow_table(self, flow_mod):
124
125 command = flow_mod.command
126
127 if command == ofp.OFPFC_ADD:
128 self.flow_add(flow_mod)
129
130 elif command == ofp.OFPFC_DELETE:
131 self.flow_delete(flow_mod)
132
133 elif command == ofp.OFPFC_DELETE_STRICT:
134 self.flow_delete_strict(flow_mod)
135
136 elif command == ofp.OFPFC_MODIFY:
137 self.flow_modify(flow_mod)
138
139 elif command == ofp.OFPFC_MODIFY_STRICT:
140 self.flow_modify_strict(flow_mod)
141
142 else:
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800143 self.log.warn('unhandled-flow-mod',
144 command=command, flow_mod=flow_mod)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800145
146 def update_group_table(self, group_mod):
147
148 command = group_mod.command
149
150 if command == ofp.OFPGC_DELETE:
151 self.group_delete(group_mod)
152
153 elif command == ofp.OFPGC_ADD:
154 self.group_add(group_mod)
155
156 elif command == ofp.OFPGC_MODIFY:
157 self.group_modify(group_mod)
158
159 else:
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800160 self.log.warn('unhandled-group-mod',
161 command=command, group_mod=group_mod)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800162
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800163 # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800164
165 def flow_add(self, mod):
166 assert isinstance(mod, ofp.ofp_flow_mod)
167 assert mod.cookie_mask == 0
168
169 # read from model
170 flows = list(self.flows_proxy.get('/').items)
171
172 changed = False
173 check_overlap = mod.flags & ofp.OFPFF_CHECK_OVERLAP
174 if check_overlap:
175 if self.find_overlapping_flows(flows, mod, True):
176 self.signal_flow_mod_error(
177 ofp.OFPFMFC_OVERLAP, mod)
178 else:
179 # free to add as new flow
180 flow = flow_stats_entry_from_flow_mod_message(mod)
181 flows.append(flow)
182 changed = True
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800183 self.log.debug('flow-added', flow=mod)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800184
185 else:
186 flow = flow_stats_entry_from_flow_mod_message(mod)
187 idx = self.find_flow(flows, flow)
188 if idx >= 0:
189 old_flow = flows[idx]
190 if not (mod.flags & ofp.OFPFF_RESET_COUNTS):
191 flow.byte_count = old_flow.byte_count
192 flow.packet_count = old_flow.packet_count
193 flows[idx] = flow
194 changed = True
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800195 self.log.debug('flow-updated', flow=flow)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800196
197 else:
198 flows.append(flow)
199 changed = True
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800200 self.log.debug('flow-added', flow=mod)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800201
202 # write back to model
203 if changed:
204 self.flows_proxy.update('/', Flows(items=flows))
205
206 def flow_delete(self, mod):
207 assert isinstance(mod, ofp.ofp_flow_mod)
208
209 # read from model
210 flows = list(self.flows_proxy.get('/').items)
211
212 # build a list of what to keep vs what to delete
213 to_keep = []
214 to_delete = []
215 for f in flows:
216 if self.flow_matches_spec(f, mod):
217 to_delete.append(f)
218 else:
219 to_keep.append(f)
220
221 # replace flow table with keepers
222 flows = to_keep
223
224 # write back
225 if to_delete:
226 self.flows_proxy.update('/', Flows(items=flows))
227
228 # send notifications for discarded flow as required by OpenFlow
229 self.announce_flows_deleted(to_delete)
230
231 def flow_delete_strict(self, mod):
232 assert isinstance(mod, ofp.ofp_flow_mod)
233
234 # read from model
235 flows = list(self.flows_proxy.get('/').items)
236 changed = False
237
238 flow = flow_stats_entry_from_flow_mod_message(mod)
239 idx = self.find_flow(flows, flow)
240 if (idx >= 0):
241 del flows[idx]
242 changed = True
243 else:
244 # TODO need to check what to do with this case
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800245 self.log.warn('flow-cannot-delete', flow=flow)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800246
247 if changed:
248 self.flows_proxy.update('/', Flows(items=flows))
249
250 def flow_modify(self, mod):
251 raise NotImplementedError()
252
253 def flow_modify_strict(self, mod):
254 raise NotImplementedError()
255
256 def find_overlapping_flows(self, flows, mod, return_on_first=False):
257 """
258 Return list of overlapping flow(s)
259 Two flows overlap if a packet may match both and if they have the
260 same priority.
261 :param mod: Flow request
262 :param return_on_first: if True, return with the first entry
263 :return:
264 """
265 return [] # TODO finish implementation
266
267 @classmethod
268 def find_flow(cls, flows, flow):
269 for i, f in enumerate(flows):
270 if cls.flow_match(f, flow):
271 return i
272 return -1
273
274 @staticmethod
275 def flow_match(f1, f2):
276 keys_matter = ('table_id', 'priority', 'flags', 'cookie', 'match')
277 for key in keys_matter:
278 if getattr(f1, key) != getattr(f2, key):
279 return False
280 return True
281
282 @classmethod
283 def flow_matches_spec(cls, flow, flow_mod):
284 """
285 Return True if given flow (ofp_flow_stats) is "covered" by the
286 wildcard flow_mod (ofp_flow_mod), taking into consideration of
287 both exact mactches as well as masks-based match fields if any.
288 Otherwise return False
289 :param flow: ofp_flow_stats
290 :param mod: ofp_flow_mod
291 :return: Bool
292 """
293
294 assert isinstance(flow, ofp.ofp_flow_stats)
295 assert isinstance(flow_mod, ofp.ofp_flow_mod)
296
297 # Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
298 if (flow.cookie & flow_mod.cookie_mask) != \
299 (flow_mod.cookie & flow_mod.cookie_mask):
300 return False
301
302 # Check if flow.table_id is covered by flow_mod.table_id
303 if flow_mod.table_id != ofp.OFPTT_ALL and \
304 flow.table_id != flow_mod.table_id:
305 return False
306
307 # Check out_port
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800308 if (flow_mod.out_port & 0x7fffffff) != ofp.OFPP_ANY and \
Zsolt Haraszti66862032016-11-28 14:28:39 -0800309 not cls.flow_has_out_port(flow, flow_mod.out_port):
310 return False
311
312 # Check out_group
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800313 if (flow_mod.out_group & 0x7fffffff) != ofp.OFPG_ANY and \
Zsolt Haraszti66862032016-11-28 14:28:39 -0800314 not cls.flow_has_out_group(flow, flow_mod.out_group):
315 return False
316
317 # Priority is ignored
318
319 # Check match condition
320 # If the flow_mod match field is empty, that is a special case and
321 # indicates the flow entry matches
322 match = flow_mod.match
323 assert isinstance(match, ofp.ofp_match)
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800324 if not match.oxm_fields:
Zsolt Haraszti66862032016-11-28 14:28:39 -0800325 # If we got this far and the match is empty in the flow spec,
326 # than the flow matches
327 return True
328 else:
329 raise NotImplementedError(
330 "flow_matches_spec(): No flow match analysis yet")
331
332 @staticmethod
333 def flow_has_out_port(flow, out_port):
334 """
335 Return True if flow has a output command with the given out_port
336 """
337 assert isinstance(flow, ofp.ofp_flow_stats)
338 for instruction in flow.instructions:
339 assert isinstance(instruction, ofp.ofp_instruction)
340 if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
341 for action in instruction.actions.actions:
342 assert isinstance(action, ofp.ofp_action)
343 if action.type == ofp.OFPAT_OUTPUT and \
344 action.output.port == out_port:
345 return True
346
347 # otherwise...
348 return False
349
350 @staticmethod
351 def flow_has_out_group(flow, group_id):
352 """
353 Return True if flow has a output command with the given out_group
354 """
355 assert isinstance(flow, ofp.ofp_flow_stats)
356 for instruction in flow.instructions:
357 assert isinstance(instruction, ofp.ofp_instruction)
358 if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
359 for action in instruction.actions.actions:
360 assert isinstance(action, ofp.ofp_action)
361 if action.type == ofp.OFPAT_GROUP and \
362 action.group.group_id == group_id:
363 return True
364
365 # otherwise...
366 return False
367
368 def flows_delete_by_group_id(self, flows, group_id):
369 """
370 Delete any flow(s) referring to given group_id
371 :param group_id:
372 :return: None
373 """
374 to_keep = []
375 to_delete = []
376 for f in flows:
377 if self.flow_has_out_group(f, group_id):
378 to_delete.append(f)
379 else:
380 to_keep.append(f)
381
382 # replace flow table with keepers
383 flows = to_keep
384
385 # send notification to deleted ones
386 self.announce_flows_deleted(to_delete)
387
388 return bool(to_delete), flows
389
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800390 # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800391
392 def group_add(self, group_mod):
393 assert isinstance(group_mod, ofp.ofp_group_mod)
394
395 groups = OrderedDict((g.desc.group_id, g)
396 for g in self.groups_proxy.get('/').items)
397 changed = False
398
399 if group_mod.group_id in groups:
400 self.signal_group_mod_error(ofp.OFPGMFC_GROUP_EXISTS, group_mod)
401 else:
402 group_entry = group_entry_from_group_mod(group_mod)
403 groups[group_mod.group_id] = group_entry
404 changed = True
405
406 if changed:
407 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
408
409 def group_delete(self, group_mod):
410 assert isinstance(group_mod, ofp.ofp_group_mod)
411
412 groups = OrderedDict((g.desc.group_id, g)
413 for g in self.groups_proxy.get('/').items)
414 groups_changed = False
415 flows_changed = False
416
417 group_id = group_mod.group_id
418 if group_id == ofp.OFPG_ALL:
419 # TODO we must delete all flows that point to this group and
420 # signal controller as requested by flow's flag
421 groups = OrderedDict()
422 groups_changed = True
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800423 self.log.debug('all-groups-deleted')
Zsolt Haraszti66862032016-11-28 14:28:39 -0800424
425 else:
426 if group_id not in groups:
427 # per openflow spec, this is not an error
428 pass
429
430 else:
431 flows = list(self.flows_proxy.get('/').items)
Zsolt Haraszti85f12852016-12-24 08:30:58 -0800432 flows_changed, flows = self.flows_delete_by_group_id(
433 flows, group_id)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800434 del groups[group_id]
435 groups_changed = True
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800436 self.log.debug('group-deleted', group_id=group_id)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800437
438 if groups_changed:
439 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
440 if flows_changed:
441 self.flows_proxy.update('/', Flows(items=flows))
442
443 def group_modify(self, group_mod):
444 assert isinstance(group_mod, ofp.ofp_group_mod)
445
446 groups = OrderedDict((g.desc.group_id, g)
447 for g in self.groups_proxy.get('/').items)
448 changed = False
449
450 if group_mod.group_id not in groups:
451 self.signal_group_mod_error(
452 ofp.OFPGMFC_INVALID_GROUP, group_mod)
453 else:
454 # replace existing group entry with new group definition
455 group_entry = group_entry_from_group_mod(group_mod)
456 groups[group_mod.group_id] = group_entry
457 changed = True
458
459 if changed:
460 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
461
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800462 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800463
464 def packet_out(self, ofp_packet_out):
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800465 self.log.info('packet-out', packet=ofp_packet_out)
466 topic = 'packet-out:{}'.format(self.logical_device_id)
467 self.event_bus.publish(topic, ofp_packet_out)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800468
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800469 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
470
471 def handle_packet_in_event(self, _, msg):
472 self.log.debug('handle-packet-in', msg=msg)
473 logical_port_no, packet = msg
474 packet_in = ofp.ofp_packet_in(
475 # buffer_id=0,
476 reason=ofp.OFPR_ACTION,
477 # table_id=0,
478 # cookie=0,
479 match=ofp.ofp_match(
480 type=ofp.OFPMT_OXM,
481 oxm_fields=[
482 ofp.ofp_oxm_field(
483 oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
484 ofb_field=in_port(logical_port_no)
485 )
486 ]
487 ),
488 data=packet
489 )
490 self.packet_in(packet_in)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800491
492 def packet_in(self, ofp_packet_in):
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800493 self.log.info('packet-in', logical_device_id=self.logical_device_id,
494 pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800495 self.local_handler.send_packet_in(
496 self.logical_device_id, ofp_packet_in)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800497
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800498 # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800499
500 def _flow_table_updated(self, flows):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800501 self.log.debug('flow-table-updated',
Zsolt Haraszti66862032016-11-28 14:28:39 -0800502 logical_device_id=self.logical_device_id, flows=flows)
503
504 # TODO we have to evolve this into a policy-based, event based pattern
505 # This is a raw implementation of the specific use-case with certain
506 # built-in assumptions, and not yet device vendor specific. The policy-
507 # based refinement will be introduced that later.
508
509 groups = self.groups_proxy.get('/').items
510 device_rules_map = self.decompose_rules(flows.items, groups)
511 for device_id, (flows, groups) in device_rules_map.iteritems():
512 self.root_proxy.update('/devices/{}/flows'.format(device_id),
513 Flows(items=flows.values()))
514 self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
515 FlowGroups(items=groups.values()))
516
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800517 # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800518
519 def _group_table_updated(self, flow_groups):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800520 self.log.debug('group-table-updated',
Zsolt Haraszti66862032016-11-28 14:28:39 -0800521 logical_device_id=self.logical_device_id,
522 flow_groups=flow_groups)
523
524 flows = self.flows_proxy.get('/').items
525 device_flows_map = self.decompose_rules(flows, flow_groups.items)
526 for device_id, (flows, groups) in device_flows_map.iteritems():
527 self.root_proxy.update('/devices/{}/flows'.format(device_id),
528 Flows(items=flows.values()))
529 self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
530 FlowGroups(items=groups.values()))
531
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800532 # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti66862032016-11-28 14:28:39 -0800533
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800534 def _port_added(self, port):
Khen Nursimulud068d812017-03-06 11:44:18 -0500535 self.log.debug('port-added', port=port)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800536 assert isinstance(port, LogicalPort)
537 self._port_list_updated(port)
Khen Nursimulu49792142017-03-17 12:34:05 -0400538
539 # Set a proxy and callback for that specific port
540 self.port_proxy[port.id] = self.core.get_proxy(
541 '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
542 port.id))
543 self.port_proxy[port.id].register_callback(
544 CallbackType.POST_UPDATE, self._port_changed)
545
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800546 self.local_handler.send_port_change_event(
547 device_id=self.logical_device_id,
548 port_status=ofp.ofp_port_status(
549 reason=ofp.OFPPR_ADD,
550 desc=port.ofp_port
551 )
552 )
553
554 def _port_removed(self, port):
Khen Nursimulud068d812017-03-06 11:44:18 -0500555 self.log.debug('port-removed', port=port)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800556 assert isinstance(port, LogicalPort)
557 self._port_list_updated(port)
Khen Nursimulu49792142017-03-17 12:34:05 -0400558
559 # Remove the proxy references
560 self.port_proxy[port.id].unregister_callback(
561 CallbackType.POST_UPDATE, self._port_changed)
562 del self.port_proxy[port.id]
563
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800564 self.local_handler.send_port_change_event(
565 device_id=self.logical_device_id,
566 port_status=ofp.ofp_port_status(
567 reason=ofp.OFPPR_DELETE,
568 desc=port.ofp_port
569 )
570 )
571
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800572 def _port_changed(self, port):
Khen Nursimulu49792142017-03-17 12:34:05 -0400573 self.log.debug('port-changed', port=port)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800574 assert isinstance(port, LogicalPort)
575 self.local_handler.send_port_change_event(
576 device_id=self.logical_device_id,
577 port_status=ofp.ofp_port_status(
578 reason=ofp.OFPPR_MODIFY,
579 desc=port.ofp_port
580 )
581 )
582
Zsolt Haraszti66862032016-11-28 14:28:39 -0800583 def _port_list_updated(self, _):
584 # invalidate the graph and the route table
585 self._invalidate_cached_tables()
586
587 def _invalidate_cached_tables(self):
588 self._routes = None
589 self._default_rules = None
590 self._nni_logical_port_no = None
591
592 def _assure_cached_tables_up_to_date(self):
593 if self._routes is None:
594 logical_ports = self.self_proxy.get('/ports')
595 graph, self._routes = self.compute_routes(
596 self.root_proxy, logical_ports)
597 self._default_rules = self._generate_default_rules(graph)
598 root_ports = [p for p in logical_ports if p.root_port]
599 assert len(root_ports) == 1
600 self._nni_logical_port_no = root_ports[0].ofp_port.port_no
601
602
603 def _generate_default_rules(self, graph):
604
605 def root_device_default_rules(device):
606 ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
607 upstream_ports = [
608 port for port in ports if port.type == Port.ETHERNET_NNI
609 ]
610 assert len(upstream_ports) == 1
611 downstream_ports = [
612 port for port in ports if port.type == Port.PON_OLT
613 ]
614 assert len(downstream_ports) == 1, \
615 'Initially, we only handle one PON port'
616 flows = OrderedDict((f.id, f) for f in [
617 mk_flow_stat(
618 priority=2000,
619 match_fields=[
620 in_port(upstream_ports[0].port_no),
621 vlan_vid(ofp.OFPVID_PRESENT | 4000),
622 vlan_pcp(0)
623 ],
624 actions=[
625 pop_vlan(),
626 output(downstream_ports[0].port_no)
627 ]
628 )
629 ])
630 groups = OrderedDict()
631 return flows, groups
632
633 def leaf_device_default_rules(device):
634 ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
635 upstream_ports = [
636 port for port in ports if port.type == Port.PON_ONU
637 ]
638 assert len(upstream_ports) == 1
639 downstream_ports = [
640 port for port in ports if port.type == Port.ETHERNET_UNI
641 ]
642 assert len(downstream_ports) == 1
643 flows = OrderedDict((f.id, f) for f in [
644 mk_flow_stat(
Zsolt Harasztic69bd212016-12-13 15:13:41 -0800645 priority=500,
Zsolt Haraszti66862032016-11-28 14:28:39 -0800646 match_fields=[
647 in_port(downstream_ports[0].port_no),
648 vlan_vid(ofp.OFPVID_PRESENT | 0)
649 ],
650 actions=[
651 set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
652 output(upstream_ports[0].port_no)
653 ]
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800654 ),
655 mk_flow_stat(
656 priority=500,
657 match_fields=[
658 in_port(downstream_ports[0].port_no),
659 vlan_vid(0)
660 ],
661 actions=[
662 push_vlan(0x8100),
663 set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
664 output(upstream_ports[0].port_no)
665 ]
666 ),
667 mk_flow_stat(
668 priority=500,
669 match_fields=[
670 in_port(upstream_ports[0].port_no),
671 vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
672 ],
673 actions=[
674 set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
675 output(downstream_ports[0].port_no)
676 ]
677 ),
Zsolt Haraszti66862032016-11-28 14:28:39 -0800678 ])
679 groups = OrderedDict()
680 return flows, groups
681
682 root_device_id = self.self_proxy.get('/').root_device_id
683 rules = {}
684 for node_key in graph.nodes():
685 node = graph.node[node_key]
686 device = node.get('device', None)
687 if device is None:
688 continue
689 if device.id == root_device_id:
690 rules[device.id] = root_device_default_rules(device)
691 else:
692 rules[device.id] = leaf_device_default_rules(device)
693 return rules
694
695 def get_route(self, ingress_port_no, egress_port_no):
696 self._assure_cached_tables_up_to_date()
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800697 if egress_port_no is not None and \
698 (egress_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
Zsolt Haraszti66862032016-11-28 14:28:39 -0800699 # treat it as if the output port is the NNI of the OLT
700 egress_port_no = self._nni_logical_port_no
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800701
702 # If ingress_port is not specified (None), it may be a wildcarded
703 # route if egress_port is OFPP_CONTROLLER or _nni_logical_port,
704 # in which case we need to create a half-route where only the egress
705 # hop is filled, the first hope is None
706 if ingress_port_no is None and \
707 egress_port_no == self._nni_logical_port_no:
708 # We can use the 2nd hop of any upstream route, so just find the
709 # first upstream:
710 for (ingress, egress), route in self._routes.iteritems():
711 if egress == self._nni_logical_port_no:
712 return [None, route[1]]
713 raise Exception('not a single upstream route')
714
715 # If egress_port is not specified (None), we can also can return a
716 # "half" route
Zsolt Harasztiafafff32016-12-12 23:14:09 -0800717 if egress_port_no is None:
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800718 for (ingress, egress), route in self._routes.iteritems():
Zsolt Harasztiafafff32016-12-12 23:14:09 -0800719 if ingress == ingress_port_no:
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800720 return [route[0], None]
Khen Nursimulud58f91b2017-03-21 15:07:57 -0400721
722 # This can occur is a leaf device is disabled
723 self.log.exception('no-downstream-route',
724 ingress_port_no=ingress_port_no,
725 egress_port_no= egress_port_no
726 )
727 return None
728
Zsolt Haraszti91730da2016-12-12 12:54:38 -0800729
Zsolt Harasztiee5c4c82017-01-09 14:37:57 -0800730 return self._routes.get((ingress_port_no, egress_port_no))
Zsolt Haraszti66862032016-11-28 14:28:39 -0800731
732 def get_all_default_rules(self):
733 self._assure_cached_tables_up_to_date()
734 return self._default_rules