blob: 10ec66c83412189bf21cad815930a1bd5ca46b6b [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Model that captures the current state of a logical device
19"""
20from collections import OrderedDict
21
22import structlog
23
24from common.event_bus import EventBusClient
25from common.frameio.frameio import hexify
26from voltha.registry import registry
27from voltha.core.config.config_proxy import CallbackType
28from voltha.core.device_graph import DeviceGraph
29from voltha.core.flow_decomposer import FlowDecomposer, \
30 flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
31 mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
32 push_vlan, mk_simple_flow_mod
33from voltha.protos import third_party
34from voltha.protos import openflow_13_pb2 as ofp
35from voltha.protos.device_pb2 import Port
36from voltha.protos.logical_device_pb2 import LogicalPort
37from voltha.protos.openflow_13_pb2 import Flows, Meters, FlowGroups, ofp_meter_config
38
39_ = third_party
40
41def mac_str_to_tuple(mac):
42 return tuple(int(d, 16) for d in mac.split(':'))
43
44
45class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
46
47 def __init__(self, core, logical_device):
48 try:
49 self.core = core
50 self.local_handler = core.get_local_handler()
51 self.logical_device_id = logical_device.id
52
53 self.root_proxy = core.get_proxy('/')
54 self.flows_proxy = core.get_proxy(
55 '/logical_devices/{}/flows'.format(logical_device.id))
56 self.meters_proxy = core.get_proxy(
57 '/logical_devices/{}/meters'.format(logical_device.id))
58 self.groups_proxy = core.get_proxy(
59 '/logical_devices/{}/flow_groups'.format(logical_device.id))
60 self.self_proxy = core.get_proxy(
61 '/logical_devices/{}'.format(logical_device.id))
62
63 self.flows_proxy.register_callback(
64 CallbackType.PRE_UPDATE, self._pre_process_flows)
65 self.flows_proxy.register_callback(
66 CallbackType.POST_UPDATE, self._flow_table_updated)
67 self.groups_proxy.register_callback(
68 CallbackType.POST_UPDATE, self._group_table_updated)
69 self.self_proxy.register_callback(
70 CallbackType.POST_ADD, self._port_added)
71 self.self_proxy.register_callback(
72 CallbackType.POST_REMOVE, self._port_removed)
73
74 self.port_proxy = {}
75 self.port_status_has_changed = {}
76
77 self.event_bus = EventBusClient()
78 self.packet_in_subscription = self.event_bus.subscribe(
79 topic='packet-in:{}'.format(logical_device.id),
80 callback=self.handle_packet_in_event)
81
82 self.log = structlog.get_logger(logical_device_id=logical_device.id)
83
84 self._routes = None
85 self._no_flow_changes_required = False
86 self._flows_ids_to_add = []
87 self._flows_ids_to_remove = []
88 self._flows_to_remove = []
89
90 self.accepts_direct_logical_flows = False
91 self.device_id = self.self_proxy.get('/').root_device_id
92 device_adapter_type = self.root_proxy.get('/devices/{}'.format(
93 self.device_id)).adapter
94 device_type = self.root_proxy.get('/device_types/{}'.format(
95 device_adapter_type))
96
97 if device_type is not None:
98 self.accepts_direct_logical_flows = \
99 device_type.accepts_direct_logical_flows_update
100
101 if self.accepts_direct_logical_flows:
102
103 self.device_adapter_agent = registry(
104 'adapter_loader').get_agent(device_adapter_type).adapter
105
106 self.log.debug('this device accepts direct logical flows',
107 device_adapter_type=device_adapter_type)
108
109
110
111 except Exception, e:
112 self.log.exception('init-error', e=e)
113
114 def start(self, reconcile=False):
115 self.log.debug('starting')
116 if reconcile:
117 # Register the callbacks for the ports
118 ports = self.self_proxy.get('/ports')
119 for port in ports:
120 self._reconcile_port(port)
121 self.log.debug('ports-reconciled', ports=ports)
122 self.log.debug('started')
123 return self
124
125 def stop(self):
126 self.log.debug('stopping')
127 try:
128 self.flows_proxy.unregister_callback(
129 CallbackType.POST_UPDATE, self._flow_table_updated)
130 self.groups_proxy.unregister_callback(
131 CallbackType.POST_UPDATE, self._group_table_updated)
132 self.self_proxy.unregister_callback(
133 CallbackType.POST_ADD, self._port_added)
134 self.self_proxy.unregister_callback(
135 CallbackType.POST_REMOVE, self._port_removed)
136
137 # Remove subscription to the event bus
138 self.event_bus.unsubscribe(self.packet_in_subscription)
139 except Exception, e:
140 self.log.info('stop-exception', e=e)
141
142 self.log.debug('stopped')
143
144 def announce_flows_deleted(self, flows):
145 for f in flows:
146 self.announce_flow_deleted(f)
147
148 def announce_flow_deleted(self, flow):
149 if flow.flags & ofp.OFPFF_SEND_FLOW_REM:
150 raise NotImplementedError("announce_flow_deleted")
151
152 def signal_flow_mod_error(self, code, flow_mod):
153 pass # TODO
154
155 def signal_flow_removal(self, code, flow):
156 pass # TODO
157
158 def signal_group_mod_error(self, code, group_mod):
159 pass # TODO
160
161 def update_flow_table(self, flow_mod):
162
163 command = flow_mod.command
164
165 if command == ofp.OFPFC_ADD:
166 self.flow_add(flow_mod)
167
168 elif command == ofp.OFPFC_DELETE:
169 self.flow_delete(flow_mod)
170
171 elif command == ofp.OFPFC_DELETE_STRICT:
172 self.flow_delete_strict(flow_mod)
173
174 elif command == ofp.OFPFC_MODIFY:
175 self.flow_modify(flow_mod)
176
177 elif command == ofp.OFPFC_MODIFY_STRICT:
178 self.flow_modify_strict(flow_mod)
179
180 else:
181 self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
182
183 def update_meter_table(self, meter_mod):
184 command = meter_mod.command
185
186 if command == ofp.OFPMC_ADD:
187 self.meter_add(meter_mod)
188
189 elif command == ofp.OFPMC_MODIFY:
190 self.meter_modify(meter_mod)
191
192 elif command == ofp.OFPMC_DELETE:
193 self.meter_delete(meter_mod)
194 else:
195 self.log.warn('unhandled-meter-mod', command=command, flow_mod=meter_mod)
196
197 def update_group_table(self, group_mod):
198
199 command = group_mod.command
200
201 if command == ofp.OFPGC_DELETE:
202 self.group_delete(group_mod)
203
204 elif command == ofp.OFPGC_ADD:
205 self.group_add(group_mod)
206
207 elif command == ofp.OFPGC_MODIFY:
208 self.group_modify(group_mod)
209
210 else:
211 self.log.warn('unhandled-group-mod',
212 command=command, group_mod=group_mod)
213
214 # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL METER HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
215
216 def meter_add(self, meter_mod):
217 assert isinstance(meter_mod, ofp.ofp_meter_mod)
218 # read from model
219 meters = list(self.meters_proxy.get('/').items)
220 if not self.check_meter_id_overlapping(meters, meter_mod):
221 meters.append(ofp_meter_config(flags=meter_mod.flags, \
222 meter_id=meter_mod.meter_id, \
223 bands=meter_mod.bands))
224
225 self.meters_proxy.update('/', Meters(items=meters))
226 else:
227 self.signal_meter_mod_error(ofp.OFPMMFC_METER_EXISTS, meter_mod)
228
229 def meter_modify(self, meter_mod):
230 assert isinstance(meter_mod, ofp.ofp_meter_mod)
231 meters = list(self.meters_proxy.get('/').items)
232 existing_meter = self.check_meter_id_overlapping(meters, meter_mod)
233 if existing_meter:
234 existing_meter.flags = meter_mod.flags
235 existing_meter.bands = meter_mod.bands
236 self.meters_proxy.update('/', Meters(items=meters))
237 else:
238 self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
239
240 def meter_delete(self, meter_mod):
241 assert isinstance(meter_mod, ofp.ofp_meter_mod)
242 meters = list(self.meters_proxy.get('/').items)
243 to_keep = list()
244 to_delete = 0
245
246 for meter in meters:
247 if meter.meter_id != meter_mod.meter_id:
248 to_keep.append(meter)
249 else:
250 to_delete += 1
251
252 if to_delete == 1:
253 self.meters_proxy.update('/', Meters(items=to_keep))
254 if to_delete == 0:
255 self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
256 elif to_delete > 1:
257 raise Exception('More than one meter_config sharing the same meter_id cannot exist')
258
259 @staticmethod
260 def check_meter_id_overlapping(meters, meter_mod):
261 for meter in meters:
262 if meter.meter_id == meter_mod.meter_id:
263 return meter
264 return False
265
266 def signal_meter_mod_error(self, error_code, meter_mod):
267 pass # TODO
268
269
270
271
272 # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
273
274 def flow_add(self, mod):
275 assert isinstance(mod, ofp.ofp_flow_mod)
276 assert mod.cookie_mask == 0
277
278 # read from model
279 flows = list(self.flows_proxy.get('/').items)
280
281 changed = False
282 check_overlap = mod.flags & ofp.OFPFF_CHECK_OVERLAP
283 if check_overlap:
284 if self.find_overlapping_flows(flows, mod, True):
285 self.signal_flow_mod_error(
286 ofp.OFPFMFC_OVERLAP, mod)
287 else:
288 # free to add as new flow
289 flow = flow_stats_entry_from_flow_mod_message(mod)
290 flows.append(flow)
291 changed = True
292 self.log.debug('flow-added', flow=mod)
293
294 else:
295 flow = flow_stats_entry_from_flow_mod_message(mod)
296 idx = self.find_flow(flows, flow)
297 if idx >= 0:
298 old_flow = flows[idx]
299 if not (mod.flags & ofp.OFPFF_RESET_COUNTS):
300 flow.byte_count = old_flow.byte_count
301 flow.packet_count = old_flow.packet_count
302 flows[idx] = flow
303 changed = True
304 self.log.debug('flow-updated', flow=flow)
305
306 else:
307 flows.append(flow)
308 changed = True
309 self.log.debug('flow-added', flow=mod)
310
311 # write back to model
312 if changed:
313 self.flows_proxy.update('/', Flows(items=flows))
314
315 def flow_delete(self, mod):
316 assert isinstance(mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
317
318 # read from model
319 flows = list(self.flows_proxy.get('/').items)
320
321 # build a list of what to keep vs what to delete
322 to_keep = []
323 to_delete = []
324 for f in flows:
325 if self.flow_matches_spec(f, mod):
326 to_delete.append(f)
327 else:
328 to_keep.append(f)
329
330 # replace flow table with keepers
331 flows = to_keep
332
333 # write back
334 if to_delete:
335 self.flows_proxy.update('/', Flows(items=flows))
336
337 # from mod send announcement
338 if isinstance(mod, ofp.ofp_flow_mod):
339 # send notifications for discarded flow as required by OpenFlow
340 self.announce_flows_deleted(to_delete)
341
342 def flow_delete_strict(self, mod):
343 assert isinstance(mod, ofp.ofp_flow_mod)
344
345 # read from model
346 flows = list(self.flows_proxy.get('/').items)
347 changed = False
348
349 flow = flow_stats_entry_from_flow_mod_message(mod)
350 idx = self.find_flow(flows, flow)
351 if (idx >= 0):
352 del flows[idx]
353 changed = True
354 else:
355 # TODO need to check what to do with this case
356 self.log.warn('flow-cannot-delete', flow=flow)
357
358 if changed:
359 self.flows_proxy.update('/', Flows(items=flows))
360
361 def flow_modify(self, mod):
362 raise NotImplementedError()
363
364 def flow_modify_strict(self, mod):
365 raise NotImplementedError()
366
367 def find_overlapping_flows(self, flows, mod, return_on_first=False):
368 """
369 Return list of overlapping flow(s)
370 Two flows overlap if a packet may match both and if they have the
371 same priority.
372 :param mod: Flow request
373 :param return_on_first: if True, return with the first entry
374 :return:
375 """
376 return [] # TODO finish implementation
377
378 @classmethod
379 def find_flow(cls, flows, flow):
380 for i, f in enumerate(flows):
381 if cls.flow_match(f, flow):
382 return i
383 return -1
384
385 @staticmethod
386 def flow_match(f1, f2):
387 keys_matter = ('table_id', 'priority', 'flags', 'cookie', 'match')
388 for key in keys_matter:
389 if getattr(f1, key) != getattr(f2, key):
390 return False
391 return True
392
393 @classmethod
394 def flow_matches_spec(cls, flow, flow_mod):
395 """
396 Return True if given flow (ofp_flow_stats) is "covered" by the
397 wildcard flow_mod (ofp_flow_mod), taking into consideration of
398 both exact mactches as well as masks-based match fields if any.
399 Otherwise return False
400 :param flow: ofp_flow_stats
401 :param mod: ofp_flow_mod
402 :return: Bool
403 """
404
405 assert isinstance(flow, ofp.ofp_flow_stats)
406 assert isinstance(flow_mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
407
408 if isinstance(flow_mod, ofp.ofp_flow_stats):
409 return cls.flow_match(flow, flow_mod)
410
411 # Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
412 if (flow.cookie & flow_mod.cookie_mask) != \
413 (flow_mod.cookie & flow_mod.cookie_mask):
414 return False
415
416 # Check if flow.table_id is covered by flow_mod.table_id
417 if flow_mod.table_id != ofp.OFPTT_ALL and \
418 flow.table_id != flow_mod.table_id:
419 return False
420
421 # Check out_port
422 if (flow_mod.out_port & 0x7fffffff) != ofp.OFPP_ANY and \
423 not cls.flow_has_out_port(flow, flow_mod.out_port):
424 return False
425
426 # Check out_group
427 if (flow_mod.out_group & 0x7fffffff) != ofp.OFPG_ANY and \
428 not cls.flow_has_out_group(flow, flow_mod.out_group):
429 return False
430 # Priority is ignored
431
432 # Check match condition
433 # If the flow_mod match field is empty, that is a special case and
434 # indicates the flow entry matches
435 match = flow_mod.match
436 assert isinstance(match, ofp.ofp_match)
437 if not match.oxm_fields:
438 # If we got this far and the match is empty in the flow spec,
439 # than the flow matches
440 return True
441 else:
442 raise NotImplementedError(
443 "flow_matches_spec(): No flow match analysis yet")
444
445 @staticmethod
446 def flow_has_out_port(flow, out_port):
447 """
448 Return True if flow has a output command with the given out_port
449 """
450 assert isinstance(flow, ofp.ofp_flow_stats)
451 for instruction in flow.instructions:
452 assert isinstance(instruction, ofp.ofp_instruction)
453 if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
454 for action in instruction.actions.actions:
455 assert isinstance(action, ofp.ofp_action)
456 if action.type == ofp.OFPAT_OUTPUT and \
457 action.output.port == out_port:
458 return True
459
460 # otherwise...
461 return False
462
463 @staticmethod
464 def flow_has_out_group(flow, group_id):
465 """
466 Return True if flow has a output command with the given out_group
467 """
468 assert isinstance(flow, ofp.ofp_flow_stats)
469 for instruction in flow.instructions:
470 assert isinstance(instruction, ofp.ofp_instruction)
471 if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
472 for action in instruction.actions.actions:
473 assert isinstance(action, ofp.ofp_action)
474 if action.type == ofp.OFPAT_GROUP and \
475 action.group.group_id == group_id:
476 return True
477
478 # otherwise...
479 return False
480
481 def flows_delete_by_group_id(self, flows, group_id):
482 """
483 Delete any flow(s) referring to given group_id
484 :param group_id:
485 :return: None
486 """
487 to_keep = []
488 to_delete = []
489 for f in flows:
490 if self.flow_has_out_group(f, group_id):
491 to_delete.append(f)
492 else:
493 to_keep.append(f)
494
495 # replace flow table with keepers
496 flows = to_keep
497
498 # send notification to deleted ones
499 self.announce_flows_deleted(to_delete)
500
501 return bool(to_delete), flows
502
503 # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
504
505 def group_add(self, group_mod):
506 assert isinstance(group_mod, ofp.ofp_group_mod)
507
508 groups = OrderedDict((g.desc.group_id, g)
509 for g in self.groups_proxy.get('/').items)
510 changed = False
511
512 if group_mod.group_id in groups:
513 self.signal_group_mod_error(ofp.OFPGMFC_GROUP_EXISTS, group_mod)
514 else:
515 group_entry = group_entry_from_group_mod(group_mod)
516 groups[group_mod.group_id] = group_entry
517 changed = True
518
519 if changed:
520 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
521
522 def group_delete(self, group_mod):
523 assert isinstance(group_mod, ofp.ofp_group_mod)
524
525 groups = OrderedDict((g.desc.group_id, g)
526 for g in self.groups_proxy.get('/').items)
527 groups_changed = False
528 flows_changed = False
529
530 group_id = group_mod.group_id
531 if group_id == ofp.OFPG_ALL:
532 # TODO we must delete all flows that point to this group and
533 # signal controller as requested by flow's flag
534 groups = OrderedDict()
535 groups_changed = True
536 self.log.debug('all-groups-deleted')
537
538 else:
539 if group_id not in groups:
540 # per openflow spec, this is not an error
541 pass
542
543 else:
544 flows = list(self.flows_proxy.get('/').items)
545 flows_changed, flows = self.flows_delete_by_group_id(
546 flows, group_id)
547 del groups[group_id]
548 groups_changed = True
549 self.log.debug('group-deleted', group_id=group_id)
550
551 if groups_changed:
552 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
553 if flows_changed:
554 self.flows_proxy.update('/', Flows(items=flows))
555
556 def group_modify(self, group_mod):
557 assert isinstance(group_mod, ofp.ofp_group_mod)
558
559 groups = OrderedDict((g.desc.group_id, g)
560 for g in self.groups_proxy.get('/').items)
561 changed = False
562
563 if group_mod.group_id not in groups:
564 self.signal_group_mod_error(
565 ofp.OFPGMFC_INVALID_GROUP, group_mod)
566 else:
567 # replace existing group entry with new group definition
568 group_entry = group_entry_from_group_mod(group_mod)
569 groups[group_mod.group_id] = group_entry
570 changed = True
571
572 if changed:
573 self.groups_proxy.update('/', FlowGroups(items=groups.values()))
574
575 def port_enable(self, port_id):
576 self.log.info("port-enable", port_id=port_id)
577
578 proxy = self.port_proxy[port_id]
579 port = proxy.get('/')
580 port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN
581 proxy.update('/', port)
582
583 def port_disable(self, port_id):
584 self.log.info("port-disable", port_id=port_id)
585
586 proxy = self.port_proxy[port_id]
587 port = proxy.get('/')
588 port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN | ofp.OFPPC_PORT_DOWN
589 proxy.update('/', port)
590
591 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
592
593 def packet_out(self, ofp_packet_out):
594 self.log.debug('packet-out', packet=ofp_packet_out)
595 topic = 'packet-out:{}'.format(self.logical_device_id)
596 self.event_bus.publish(topic, ofp_packet_out)
597
598 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
599
600 def handle_packet_in_event(self, _, msg):
601 self.log.debug('handle-packet-in', msg=msg)
602 logical_port_no, packet = msg
603 packet_in = ofp.ofp_packet_in(
604 # buffer_id=0,
605 reason=ofp.OFPR_ACTION,
606 # table_id=0,
607 # cookie=0,
608 match=ofp.ofp_match(
609 type=ofp.OFPMT_OXM,
610 oxm_fields=[
611 ofp.ofp_oxm_field(
612 oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
613 ofb_field=in_port(logical_port_no)
614 )
615 ]
616 ),
617 data=packet
618 )
619 self.packet_in(packet_in)
620
621 def packet_in(self, ofp_packet_in):
622 self.log.info('packet-in', logical_device_id=self.logical_device_id,
623 pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
624 self.local_handler.send_packet_in(
625 self.logical_device_id, ofp_packet_in)
626
627 # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
628
629 def _pre_process_flows(self, flows):
630 """
631 This method is invoked before a device flow table data model is
632 updated. The resulting data is stored locally and the flow table is
633 updated during the post-processing phase, i.e. via the POST_UPDATE
634 callback
635 :param flows: Desired flows
636 :return: None
637 """
638 current_flows = self.flows_proxy.get('/')
639 self.log.debug('pre-processing-flows',
640 logical_device_id=self.logical_device_id,
641 desired_flows=flows,
642 existing_flows=current_flows)
643
644 current_flow_ids = set(f.id for f in current_flows.items)
645 desired_flow_ids = set(f.id for f in flows.items)
646
647 self._flows_ids_to_add = desired_flow_ids.difference(current_flow_ids)
648 self._flows_ids_to_remove = current_flow_ids.difference(desired_flow_ids)
649 self._flows_to_remove = []
650 for f in current_flows.items:
651 if f.id in self._flows_ids_to_remove:
652 self._flows_to_remove.append(f)
653
654 if len(self._flows_ids_to_add) + len(self._flows_ids_to_remove) == 0:
655 # No changes of flows, just stats are changing
656 self._no_flow_changes_required = True
657 else:
658 self._no_flow_changes_required = False
659
660 self.log.debug('flows-preprocess-output', current_flows=len(
661 current_flow_ids), new_flows=len(desired_flow_ids),
662 adding_flows=len(self._flows_ids_to_add),
663 removing_flows=len(self._flows_ids_to_remove))
664
665
666 def _flow_table_updated(self, flows):
667 self.log.debug('flow-table-updated',
668 logical_device_id=self.logical_device_id, flows=flows)
669
670 if self._no_flow_changes_required:
671 # Stats changes, no need to process further
672 self.log.debug('flow-stats-update')
673 else:
674
675 groups = self.groups_proxy.get('/').items
676 device_rules_map = self.decompose_rules(flows.items, groups)
677
678 # TODO we have to evolve this into a policy-based, event based pattern
679 # This is a raw implementation of the specific use-case with certain
680 # built-in assumptions, and not yet device vendor specific. The policy-
681 # based refinement will be introduced that later.
682
683
684 # Temporary bypass for openolt
685
686 if self.accepts_direct_logical_flows:
687 #give the logical flows directly to the adapter
688 self.log.debug('it is an direct logical flow bypass')
689 if self.device_adapter_agent is None:
690 self.log.error('No device adapter agent',
691 device_id=self.device_id,
692 logical_device_id = self.logical_device_id)
693 return
694
695 flows_to_add = []
696 for f in flows.items:
697 if f.id in self._flows_ids_to_add:
698 flows_to_add.append(f)
699
700
701 self.log.debug('flows to remove',
702 flows_to_remove=self._flows_to_remove,
703 flows_ids=self._flows_ids_to_remove)
704
705 try:
706 self.device_adapter_agent.update_logical_flows(
707 self.device_id, flows_to_add, self._flows_to_remove,
708 groups, device_rules_map)
709 except Exception as e:
710 self.log.error('logical flows bypass error', error=e,
711 flows=flows)
712 else:
713
714 for device_id, (flows, groups) in device_rules_map.iteritems():
715
716 self.root_proxy.update('/devices/{}/flows'.format(device_id),
717 Flows(items=flows.values()))
718 self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
719 FlowGroups(items=groups.values()))
720
721 # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
722
723 def _group_table_updated(self, flow_groups):
724 self.log.debug('group-table-updated',
725 logical_device_id=self.logical_device_id,
726 flow_groups=flow_groups)
727
728 flows = self.flows_proxy.get('/').items
729 device_flows_map = self.decompose_rules(flows, flow_groups.items)
730 for device_id, (flows, groups) in device_flows_map.iteritems():
731 self.root_proxy.update('/devices/{}/flows'.format(device_id),
732 Flows(items=flows.values()))
733 self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
734 FlowGroups(items=groups.values()))
735
736 # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
737
738 def _port_added(self, port):
739 self.log.debug('port-added', port=port)
740 assert isinstance(port, LogicalPort)
741 self._port_list_updated(port)
742
743 # Set a proxy and callback for that specific port
744 self.port_proxy[port.id] = self.core.get_proxy(
745 '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
746 port.id))
747 self.port_status_has_changed[port.id] = True
748 self.port_proxy[port.id].register_callback(
749 CallbackType.PRE_UPDATE, self._pre_port_changed)
750 self.port_proxy[port.id].register_callback(
751 CallbackType.POST_UPDATE, self._port_changed)
752
753 self.local_handler.send_port_change_event(
754 device_id=self.logical_device_id,
755 port_status=ofp.ofp_port_status(
756 reason=ofp.OFPPR_ADD,
757 desc=port.ofp_port
758 )
759 )
760
761 def _reconcile_port(self, port):
762 self.log.debug('reconcile-port', port=port)
763 assert isinstance(port, LogicalPort)
764 self._port_list_updated(port)
765
766 # Set a proxy and callback for that specific port
767 self.port_proxy[port.id] = self.core.get_proxy(
768 '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
769 port.id))
770 self.port_status_has_changed[port.id] = True
771 self.port_proxy[port.id].register_callback(
772 CallbackType.PRE_UPDATE, self._pre_port_changed)
773 self.port_proxy[port.id].register_callback(
774 CallbackType.POST_UPDATE, self._port_changed)
775
776 def _port_removed(self, port):
777 self.log.debug('port-removed', port=port)
778 assert isinstance(port, LogicalPort)
779 self._port_list_updated(port)
780
781 # Remove the proxy references
782 self.port_proxy[port.id].unregister_callback(
783 CallbackType.PRE_UPDATE, self._pre_port_changed)
784 self.port_proxy[port.id].unregister_callback(
785 CallbackType.POST_UPDATE, self._port_changed)
786 del self.port_proxy[port.id]
787 del self.port_status_has_changed[port.id]
788
789
790 self.local_handler.send_port_change_event(
791 device_id=self.logical_device_id,
792 port_status=ofp.ofp_port_status(
793 reason=ofp.OFPPR_DELETE,
794 desc=port.ofp_port
795 )
796 )
797
798 def _pre_port_changed(self, port):
799 old_port = self.port_proxy[port.id].get('/')
800 if old_port.ofp_port != port.ofp_port:
801 self.port_status_has_changed[port.id] = True
802 else :
803 self.port_status_has_changed[port.id] = False
804
805 def _port_changed(self, port):
806 self.log.debug('port-changed', port=port)
807 if self.port_status_has_changed[port.id]:
808 assert isinstance(port, LogicalPort)
809 self.local_handler.send_port_change_event(
810 device_id=self.logical_device_id,
811 port_status=ofp.ofp_port_status(
812 reason=ofp.OFPPR_MODIFY,
813 desc=port.ofp_port
814 )
815 )
816
817 def _port_list_updated(self, _):
818 # invalidate the graph and the route table
819 self._invalidate_cached_tables()
820
821 def _invalidate_cached_tables(self):
822 self._routes = None
823 self._default_rules = None
824 self._nni_logical_port_no = None
825
826 def _assure_cached_tables_up_to_date(self):
827 if self._routes is None:
828 logical_ports = self.self_proxy.get('/ports')
829 graph, self._routes = self.compute_routes(
830 self.root_proxy, logical_ports)
831 self._default_rules = self._generate_default_rules(graph)
832 root_ports = [p for p in logical_ports if p.root_port]
833 assert len(root_ports) == 1, 'Only one root port supported at this time'
834 self._nni_logical_port_no = root_ports[0].ofp_port.port_no
835
836
837 def _generate_default_rules(self, graph):
838
839 def root_device_default_rules(device):
840 flows = OrderedDict()
841 groups = OrderedDict()
842 return flows, groups
843
844 def leaf_device_default_rules(device):
845 ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
846 upstream_ports = [
847 port for port in ports if port.type == Port.PON_ONU \
848 or port.type == Port.VENET_ONU
849 ]
850 assert len(upstream_ports) == 1
851 downstream_ports = [
852 port for port in ports if port.type == Port.ETHERNET_UNI
853 ]
854
855 # it is possible that the downstream ports are not
856 # created, but the flow_decomposition has already
857 # kicked in. In such scenarios, cut short the processing
858 # and return.
859 if len(downstream_ports) == 0:
860 return None, None
861 # assert len(downstream_ports) == 1
862 upstream_port = upstream_ports[0]
863 flows = OrderedDict()
864 for downstream_port in downstream_ports:
865 flows.update(OrderedDict((f.id, f) for f in [
866 mk_flow_stat(
867 priority=500,
868 match_fields=[
869 in_port(downstream_port.port_no),
870 vlan_vid(ofp.OFPVID_PRESENT | 0)
871 ],
872 actions=[
873 set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
874 output(upstream_port.port_no)
875 ]
876 ),
877 mk_flow_stat(
878 priority=500,
879 match_fields=[
880 in_port(downstream_port.port_no),
881 vlan_vid(0)
882 ],
883 actions=[
884 push_vlan(0x8100),
885 set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
886 output(upstream_port.port_no)
887 ]
888 ),
889 mk_flow_stat(
890 priority=500,
891 match_fields=[
892 in_port(upstream_port.port_no),
893 vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
894 ],
895 actions=[
896 set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
897 output(downstream_port.port_no)
898 ]
899 ),
900 ]))
901 groups = OrderedDict()
902 return flows, groups
903
904 root_device_id = self.self_proxy.get('/').root_device_id
905 rules = {}
906 for node_key in graph.nodes():
907 node = graph.node[node_key]
908 device = node.get('device', None)
909 if device is None:
910 continue
911 if device.id == root_device_id:
912 rules[device.id] = root_device_default_rules(device)
913 else:
914 rules[device.id] = leaf_device_default_rules(device)
915 return rules
916
917 def get_route(self, ingress_port_no, egress_port_no):
918 self._assure_cached_tables_up_to_date()
919 self.log.info('getting-route', eg_port=egress_port_no, in_port=ingress_port_no,
920 nni_port=self._nni_logical_port_no)
921 if egress_port_no is not None and \
922 (egress_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
923 self.log.info('controller-flow', eg_port=egress_port_no, in_port=ingress_port_no,
924 nni_port=self._nni_logical_port_no)
925 if ingress_port_no == self._nni_logical_port_no:
926 self.log.info('returning half route')
927 # This is a trap on the NNI Port
928 # Return a 'half' route to make the flow decomp logic happy
929 for (ingress, egress), route in self._routes.iteritems():
930 if egress == self._nni_logical_port_no:
931 return [None, route[1]]
932 raise Exception('not a single upstream route')
933 # treat it as if the output port is the NNI of the OLT
934 egress_port_no = self._nni_logical_port_no
935
936 # If ingress_port is not specified (None), it may be a wildcarded
937 # route if egress_port is OFPP_CONTROLLER or _nni_logical_port,
938 # in which case we need to create a half-route where only the egress
939 # hop is filled, the first hope is None
940 if ingress_port_no is None and \
941 egress_port_no == self._nni_logical_port_no:
942 # We can use the 2nd hop of any upstream route, so just find the
943 # first upstream:
944 for (ingress, egress), route in self._routes.iteritems():
945 if egress == self._nni_logical_port_no:
946 return [None, route[1]]
947 raise Exception('not a single upstream route')
948
949 # If egress_port is not specified (None), we can also can return a
950 # "half" route
951 if egress_port_no is None:
952 for (ingress, egress), route in self._routes.iteritems():
953 if ingress == ingress_port_no:
954 return [route[0], None]
955
956 # This can occur is a leaf device is disabled
957 self.log.exception('no-downstream-route',
958 ingress_port_no=ingress_port_no,
959 egress_port_no= egress_port_no
960 )
961 return None
962
963
964 return self._routes.get((ingress_port_no, egress_port_no))
965
966 def get_all_default_rules(self):
967 self._assure_cached_tables_up_to_date()
968 return self._default_rules
969
970 def get_wildcard_input_ports(self, exclude_port=None):
971 logical_ports = self.self_proxy.get('/ports')
972 return [port.ofp_port.port_no for port in logical_ports
973 if port.ofp_port.port_no != exclude_port]