VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/core/logical_device_agent.py b/python/core/logical_device_agent.py
new file mode 100644
index 0000000..10ec66c
--- /dev/null
+++ b/python/core/logical_device_agent.py
@@ -0,0 +1,973 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Model that captures the current state of a logical device
+"""
+from collections import OrderedDict
+
+import structlog
+
+from common.event_bus import EventBusClient
+from common.frameio.frameio import hexify
+from voltha.registry import registry
+from voltha.core.config.config_proxy import CallbackType
+from voltha.core.device_graph import DeviceGraph
+from voltha.core.flow_decomposer import FlowDecomposer, \
+ flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
+ mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
+ push_vlan, mk_simple_flow_mod
+from voltha.protos import third_party
+from voltha.protos import openflow_13_pb2 as ofp
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import Flows, Meters, FlowGroups, ofp_meter_config
+
+_ = third_party
+
+def mac_str_to_tuple(mac):
+ return tuple(int(d, 16) for d in mac.split(':'))
+
+
+class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
+
+ def __init__(self, core, logical_device):
+ try:
+ self.core = core
+ self.local_handler = core.get_local_handler()
+ self.logical_device_id = logical_device.id
+
+ self.root_proxy = core.get_proxy('/')
+ self.flows_proxy = core.get_proxy(
+ '/logical_devices/{}/flows'.format(logical_device.id))
+ self.meters_proxy = core.get_proxy(
+ '/logical_devices/{}/meters'.format(logical_device.id))
+ self.groups_proxy = core.get_proxy(
+ '/logical_devices/{}/flow_groups'.format(logical_device.id))
+ self.self_proxy = core.get_proxy(
+ '/logical_devices/{}'.format(logical_device.id))
+
+ self.flows_proxy.register_callback(
+ CallbackType.PRE_UPDATE, self._pre_process_flows)
+ self.flows_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.register_callback(
+ CallbackType.POST_ADD, self._port_added)
+ self.self_proxy.register_callback(
+ CallbackType.POST_REMOVE, self._port_removed)
+
+ self.port_proxy = {}
+ self.port_status_has_changed = {}
+
+ self.event_bus = EventBusClient()
+ self.packet_in_subscription = self.event_bus.subscribe(
+ topic='packet-in:{}'.format(logical_device.id),
+ callback=self.handle_packet_in_event)
+
+ self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
+ self._routes = None
+ self._no_flow_changes_required = False
+ self._flows_ids_to_add = []
+ self._flows_ids_to_remove = []
+ self._flows_to_remove = []
+
+ self.accepts_direct_logical_flows = False
+ self.device_id = self.self_proxy.get('/').root_device_id
+ device_adapter_type = self.root_proxy.get('/devices/{}'.format(
+ self.device_id)).adapter
+ device_type = self.root_proxy.get('/device_types/{}'.format(
+ device_adapter_type))
+
+ if device_type is not None:
+ self.accepts_direct_logical_flows = \
+ device_type.accepts_direct_logical_flows_update
+
+ if self.accepts_direct_logical_flows:
+
+ self.device_adapter_agent = registry(
+ 'adapter_loader').get_agent(device_adapter_type).adapter
+
+ self.log.debug('this device accepts direct logical flows',
+ device_adapter_type=device_adapter_type)
+
+
+
+ except Exception, e:
+ self.log.exception('init-error', e=e)
+
+ def start(self, reconcile=False):
+ self.log.debug('starting')
+ if reconcile:
+ # Register the callbacks for the ports
+ ports = self.self_proxy.get('/ports')
+ for port in ports:
+ self._reconcile_port(port)
+ self.log.debug('ports-reconciled', ports=ports)
+ self.log.debug('started')
+ return self
+
+ def stop(self):
+ self.log.debug('stopping')
+ try:
+ self.flows_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_ADD, self._port_added)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_REMOVE, self._port_removed)
+
+ # Remove subscription to the event bus
+ self.event_bus.unsubscribe(self.packet_in_subscription)
+ except Exception, e:
+ self.log.info('stop-exception', e=e)
+
+ self.log.debug('stopped')
+
+ def announce_flows_deleted(self, flows):
+ for f in flows:
+ self.announce_flow_deleted(f)
+
+ def announce_flow_deleted(self, flow):
+ if flow.flags & ofp.OFPFF_SEND_FLOW_REM:
+ raise NotImplementedError("announce_flow_deleted")
+
+ def signal_flow_mod_error(self, code, flow_mod):
+ pass # TODO
+
+ def signal_flow_removal(self, code, flow):
+ pass # TODO
+
+ def signal_group_mod_error(self, code, group_mod):
+ pass # TODO
+
+ def update_flow_table(self, flow_mod):
+
+ command = flow_mod.command
+
+ if command == ofp.OFPFC_ADD:
+ self.flow_add(flow_mod)
+
+ elif command == ofp.OFPFC_DELETE:
+ self.flow_delete(flow_mod)
+
+ elif command == ofp.OFPFC_DELETE_STRICT:
+ self.flow_delete_strict(flow_mod)
+
+ elif command == ofp.OFPFC_MODIFY:
+ self.flow_modify(flow_mod)
+
+ elif command == ofp.OFPFC_MODIFY_STRICT:
+ self.flow_modify_strict(flow_mod)
+
+ else:
+ self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+
+ def update_meter_table(self, meter_mod):
+ command = meter_mod.command
+
+ if command == ofp.OFPMC_ADD:
+ self.meter_add(meter_mod)
+
+ elif command == ofp.OFPMC_MODIFY:
+ self.meter_modify(meter_mod)
+
+ elif command == ofp.OFPMC_DELETE:
+ self.meter_delete(meter_mod)
+ else:
+ self.log.warn('unhandled-meter-mod', command=command, flow_mod=meter_mod)
+
+ def update_group_table(self, group_mod):
+
+ command = group_mod.command
+
+ if command == ofp.OFPGC_DELETE:
+ self.group_delete(group_mod)
+
+ elif command == ofp.OFPGC_ADD:
+ self.group_add(group_mod)
+
+ elif command == ofp.OFPGC_MODIFY:
+ self.group_modify(group_mod)
+
+ else:
+ self.log.warn('unhandled-group-mod',
+ command=command, group_mod=group_mod)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL METER HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def meter_add(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ # read from model
+ meters = list(self.meters_proxy.get('/').items)
+ if not self.check_meter_id_overlapping(meters, meter_mod):
+ meters.append(ofp_meter_config(flags=meter_mod.flags, \
+ meter_id=meter_mod.meter_id, \
+ bands=meter_mod.bands))
+
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_METER_EXISTS, meter_mod)
+
+ def meter_modify(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ existing_meter = self.check_meter_id_overlapping(meters, meter_mod)
+ if existing_meter:
+ existing_meter.flags = meter_mod.flags
+ existing_meter.bands = meter_mod.bands
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+
+ def meter_delete(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ to_keep = list()
+ to_delete = 0
+
+ for meter in meters:
+ if meter.meter_id != meter_mod.meter_id:
+ to_keep.append(meter)
+ else:
+ to_delete += 1
+
+ if to_delete == 1:
+ self.meters_proxy.update('/', Meters(items=to_keep))
+ if to_delete == 0:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+ elif to_delete > 1:
+ raise Exception('More than one meter_config sharing the same meter_id cannot exist')
+
+ @staticmethod
+ def check_meter_id_overlapping(meters, meter_mod):
+ for meter in meters:
+ if meter.meter_id == meter_mod.meter_id:
+ return meter
+ return False
+
+ def signal_meter_mod_error(self, error_code, meter_mod):
+ pass # TODO
+
+
+
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def flow_add(self, mod):
+ assert isinstance(mod, ofp.ofp_flow_mod)
+ assert mod.cookie_mask == 0
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+
+ changed = False
+ check_overlap = mod.flags & ofp.OFPFF_CHECK_OVERLAP
+ if check_overlap:
+ if self.find_overlapping_flows(flows, mod, True):
+ self.signal_flow_mod_error(
+ ofp.OFPFMFC_OVERLAP, mod)
+ else:
+ # free to add as new flow
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ flows.append(flow)
+ changed = True
+ self.log.debug('flow-added', flow=mod)
+
+ else:
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ idx = self.find_flow(flows, flow)
+ if idx >= 0:
+ old_flow = flows[idx]
+ if not (mod.flags & ofp.OFPFF_RESET_COUNTS):
+ flow.byte_count = old_flow.byte_count
+ flow.packet_count = old_flow.packet_count
+ flows[idx] = flow
+ changed = True
+ self.log.debug('flow-updated', flow=flow)
+
+ else:
+ flows.append(flow)
+ changed = True
+ self.log.debug('flow-added', flow=mod)
+
+ # write back to model
+ if changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def flow_delete(self, mod):
+ assert isinstance(mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+
+ # build a list of what to keep vs what to delete
+ to_keep = []
+ to_delete = []
+ for f in flows:
+ if self.flow_matches_spec(f, mod):
+ to_delete.append(f)
+ else:
+ to_keep.append(f)
+
+ # replace flow table with keepers
+ flows = to_keep
+
+ # write back
+ if to_delete:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ # from mod send announcement
+ if isinstance(mod, ofp.ofp_flow_mod):
+ # send notifications for discarded flow as required by OpenFlow
+ self.announce_flows_deleted(to_delete)
+
+ def flow_delete_strict(self, mod):
+ assert isinstance(mod, ofp.ofp_flow_mod)
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+ changed = False
+
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ idx = self.find_flow(flows, flow)
+ if (idx >= 0):
+ del flows[idx]
+ changed = True
+ else:
+ # TODO need to check what to do with this case
+ self.log.warn('flow-cannot-delete', flow=flow)
+
+ if changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def flow_modify(self, mod):
+ raise NotImplementedError()
+
+ def flow_modify_strict(self, mod):
+ raise NotImplementedError()
+
+ def find_overlapping_flows(self, flows, mod, return_on_first=False):
+ """
+ Return list of overlapping flow(s)
+ Two flows overlap if a packet may match both and if they have the
+ same priority.
+ :param mod: Flow request
+ :param return_on_first: if True, return with the first entry
+ :return:
+ """
+ return [] # TODO finish implementation
+
+ @classmethod
+ def find_flow(cls, flows, flow):
+ for i, f in enumerate(flows):
+ if cls.flow_match(f, flow):
+ return i
+ return -1
+
+ @staticmethod
+ def flow_match(f1, f2):
+ keys_matter = ('table_id', 'priority', 'flags', 'cookie', 'match')
+ for key in keys_matter:
+ if getattr(f1, key) != getattr(f2, key):
+ return False
+ return True
+
+ @classmethod
+ def flow_matches_spec(cls, flow, flow_mod):
+ """
+ Return True if given flow (ofp_flow_stats) is "covered" by the
+ wildcard flow_mod (ofp_flow_mod), taking into consideration of
+ both exact mactches as well as masks-based match fields if any.
+ Otherwise return False
+ :param flow: ofp_flow_stats
+ :param mod: ofp_flow_mod
+ :return: Bool
+ """
+
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ assert isinstance(flow_mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+ if isinstance(flow_mod, ofp.ofp_flow_stats):
+ return cls.flow_match(flow, flow_mod)
+
+ # Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
+ if (flow.cookie & flow_mod.cookie_mask) != \
+ (flow_mod.cookie & flow_mod.cookie_mask):
+ return False
+
+ # Check if flow.table_id is covered by flow_mod.table_id
+ if flow_mod.table_id != ofp.OFPTT_ALL and \
+ flow.table_id != flow_mod.table_id:
+ return False
+
+ # Check out_port
+ if (flow_mod.out_port & 0x7fffffff) != ofp.OFPP_ANY and \
+ not cls.flow_has_out_port(flow, flow_mod.out_port):
+ return False
+
+ # Check out_group
+ if (flow_mod.out_group & 0x7fffffff) != ofp.OFPG_ANY and \
+ not cls.flow_has_out_group(flow, flow_mod.out_group):
+ return False
+ # Priority is ignored
+
+ # Check match condition
+ # If the flow_mod match field is empty, that is a special case and
+ # indicates the flow entry matches
+ match = flow_mod.match
+ assert isinstance(match, ofp.ofp_match)
+ if not match.oxm_fields:
+ # If we got this far and the match is empty in the flow spec,
+ # than the flow matches
+ return True
+ else:
+ raise NotImplementedError(
+ "flow_matches_spec(): No flow match analysis yet")
+
+ @staticmethod
+ def flow_has_out_port(flow, out_port):
+ """
+ Return True if flow has a output command with the given out_port
+ """
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ for instruction in flow.instructions:
+ assert isinstance(instruction, ofp.ofp_instruction)
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+ for action in instruction.actions.actions:
+ assert isinstance(action, ofp.ofp_action)
+ if action.type == ofp.OFPAT_OUTPUT and \
+ action.output.port == out_port:
+ return True
+
+ # otherwise...
+ return False
+
+ @staticmethod
+ def flow_has_out_group(flow, group_id):
+ """
+ Return True if flow has a output command with the given out_group
+ """
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ for instruction in flow.instructions:
+ assert isinstance(instruction, ofp.ofp_instruction)
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+ for action in instruction.actions.actions:
+ assert isinstance(action, ofp.ofp_action)
+ if action.type == ofp.OFPAT_GROUP and \
+ action.group.group_id == group_id:
+ return True
+
+ # otherwise...
+ return False
+
+ def flows_delete_by_group_id(self, flows, group_id):
+ """
+ Delete any flow(s) referring to given group_id
+ :param group_id:
+ :return: None
+ """
+ to_keep = []
+ to_delete = []
+ for f in flows:
+ if self.flow_has_out_group(f, group_id):
+ to_delete.append(f)
+ else:
+ to_keep.append(f)
+
+ # replace flow table with keepers
+ flows = to_keep
+
+ # send notification to deleted ones
+ self.announce_flows_deleted(to_delete)
+
+ return bool(to_delete), flows
+
+ # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def group_add(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ changed = False
+
+ if group_mod.group_id in groups:
+ self.signal_group_mod_error(ofp.OFPGMFC_GROUP_EXISTS, group_mod)
+ else:
+ group_entry = group_entry_from_group_mod(group_mod)
+ groups[group_mod.group_id] = group_entry
+ changed = True
+
+ if changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+ def group_delete(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ groups_changed = False
+ flows_changed = False
+
+ group_id = group_mod.group_id
+ if group_id == ofp.OFPG_ALL:
+ # TODO we must delete all flows that point to this group and
+ # signal controller as requested by flow's flag
+ groups = OrderedDict()
+ groups_changed = True
+ self.log.debug('all-groups-deleted')
+
+ else:
+ if group_id not in groups:
+ # per openflow spec, this is not an error
+ pass
+
+ else:
+ flows = list(self.flows_proxy.get('/').items)
+ flows_changed, flows = self.flows_delete_by_group_id(
+ flows, group_id)
+ del groups[group_id]
+ groups_changed = True
+ self.log.debug('group-deleted', group_id=group_id)
+
+ if groups_changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+ if flows_changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def group_modify(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ changed = False
+
+ if group_mod.group_id not in groups:
+ self.signal_group_mod_error(
+ ofp.OFPGMFC_INVALID_GROUP, group_mod)
+ else:
+ # replace existing group entry with new group definition
+ group_entry = group_entry_from_group_mod(group_mod)
+ groups[group_mod.group_id] = group_entry
+ changed = True
+
+ if changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+ def port_enable(self, port_id):
+ self.log.info("port-enable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
+ def port_disable(self, port_id):
+ self.log.info("port-disable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN | ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def packet_out(self, ofp_packet_out):
+ self.log.debug('packet-out', packet=ofp_packet_out)
+ topic = 'packet-out:{}'.format(self.logical_device_id)
+ self.event_bus.publish(topic, ofp_packet_out)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def handle_packet_in_event(self, _, msg):
+ self.log.debug('handle-packet-in', msg=msg)
+ logical_port_no, packet = msg
+ packet_in = ofp.ofp_packet_in(
+ # buffer_id=0,
+ reason=ofp.OFPR_ACTION,
+ # table_id=0,
+ # cookie=0,
+ match=ofp.ofp_match(
+ type=ofp.OFPMT_OXM,
+ oxm_fields=[
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=in_port(logical_port_no)
+ )
+ ]
+ ),
+ data=packet
+ )
+ self.packet_in(packet_in)
+
+ def packet_in(self, ofp_packet_in):
+ self.log.info('packet-in', logical_device_id=self.logical_device_id,
+ pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
+ self.local_handler.send_packet_in(
+ self.logical_device_id, ofp_packet_in)
+
+ # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _pre_process_flows(self, flows):
+ """
+ This method is invoked before a device flow table data model is
+ updated. The resulting data is stored locally and the flow table is
+ updated during the post-processing phase, i.e. via the POST_UPDATE
+ callback
+ :param flows: Desired flows
+ :return: None
+ """
+ current_flows = self.flows_proxy.get('/')
+ self.log.debug('pre-processing-flows',
+ logical_device_id=self.logical_device_id,
+ desired_flows=flows,
+ existing_flows=current_flows)
+
+ current_flow_ids = set(f.id for f in current_flows.items)
+ desired_flow_ids = set(f.id for f in flows.items)
+
+ self._flows_ids_to_add = desired_flow_ids.difference(current_flow_ids)
+ self._flows_ids_to_remove = current_flow_ids.difference(desired_flow_ids)
+ self._flows_to_remove = []
+ for f in current_flows.items:
+ if f.id in self._flows_ids_to_remove:
+ self._flows_to_remove.append(f)
+
+ if len(self._flows_ids_to_add) + len(self._flows_ids_to_remove) == 0:
+ # No changes of flows, just stats are changing
+ self._no_flow_changes_required = True
+ else:
+ self._no_flow_changes_required = False
+
+ self.log.debug('flows-preprocess-output', current_flows=len(
+ current_flow_ids), new_flows=len(desired_flow_ids),
+ adding_flows=len(self._flows_ids_to_add),
+ removing_flows=len(self._flows_ids_to_remove))
+
+
+ def _flow_table_updated(self, flows):
+ self.log.debug('flow-table-updated',
+ logical_device_id=self.logical_device_id, flows=flows)
+
+ if self._no_flow_changes_required:
+ # Stats changes, no need to process further
+ self.log.debug('flow-stats-update')
+ else:
+
+ groups = self.groups_proxy.get('/').items
+ device_rules_map = self.decompose_rules(flows.items, groups)
+
+ # TODO we have to evolve this into a policy-based, event based pattern
+ # This is a raw implementation of the specific use-case with certain
+ # built-in assumptions, and not yet device vendor specific. The policy-
+ # based refinement will be introduced that later.
+
+
+ # Temporary bypass for openolt
+
+ if self.accepts_direct_logical_flows:
+ #give the logical flows directly to the adapter
+ self.log.debug('it is an direct logical flow bypass')
+ if self.device_adapter_agent is None:
+ self.log.error('No device adapter agent',
+ device_id=self.device_id,
+ logical_device_id = self.logical_device_id)
+ return
+
+ flows_to_add = []
+ for f in flows.items:
+ if f.id in self._flows_ids_to_add:
+ flows_to_add.append(f)
+
+
+ self.log.debug('flows to remove',
+ flows_to_remove=self._flows_to_remove,
+ flows_ids=self._flows_ids_to_remove)
+
+ try:
+ self.device_adapter_agent.update_logical_flows(
+ self.device_id, flows_to_add, self._flows_to_remove,
+ groups, device_rules_map)
+ except Exception as e:
+ self.log.error('logical flows bypass error', error=e,
+ flows=flows)
+ else:
+
+ for device_id, (flows, groups) in device_rules_map.iteritems():
+
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+ FlowGroups(items=groups.values()))
+
+ # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _group_table_updated(self, flow_groups):
+ self.log.debug('group-table-updated',
+ logical_device_id=self.logical_device_id,
+ flow_groups=flow_groups)
+
+ flows = self.flows_proxy.get('/').items
+ device_flows_map = self.decompose_rules(flows, flow_groups.items)
+ for device_id, (flows, groups) in device_flows_map.iteritems():
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+ FlowGroups(items=groups.values()))
+
+ # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
+
+ def _port_added(self, port):
+ self.log.debug('port-added', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_status_has_changed[port.id] = True
+ self.port_proxy[port.id].register_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_ADD,
+ desc=port.ofp_port
+ )
+ )
+
+ def _reconcile_port(self, port):
+ self.log.debug('reconcile-port', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_status_has_changed[port.id] = True
+ self.port_proxy[port.id].register_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
+ def _port_removed(self, port):
+ self.log.debug('port-removed', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Remove the proxy references
+ self.port_proxy[port.id].unregister_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].unregister_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+ del self.port_proxy[port.id]
+ del self.port_status_has_changed[port.id]
+
+
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_DELETE,
+ desc=port.ofp_port
+ )
+ )
+
+ def _pre_port_changed(self, port):
+ old_port = self.port_proxy[port.id].get('/')
+ if old_port.ofp_port != port.ofp_port:
+ self.port_status_has_changed[port.id] = True
+ else :
+ self.port_status_has_changed[port.id] = False
+
+ def _port_changed(self, port):
+ self.log.debug('port-changed', port=port)
+ if self.port_status_has_changed[port.id]:
+ assert isinstance(port, LogicalPort)
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_MODIFY,
+ desc=port.ofp_port
+ )
+ )
+
+ def _port_list_updated(self, _):
+ # invalidate the graph and the route table
+ self._invalidate_cached_tables()
+
+ def _invalidate_cached_tables(self):
+ self._routes = None
+ self._default_rules = None
+ self._nni_logical_port_no = None
+
+ def _assure_cached_tables_up_to_date(self):
+ if self._routes is None:
+ logical_ports = self.self_proxy.get('/ports')
+ graph, self._routes = self.compute_routes(
+ self.root_proxy, logical_ports)
+ self._default_rules = self._generate_default_rules(graph)
+ root_ports = [p for p in logical_ports if p.root_port]
+ assert len(root_ports) == 1, 'Only one root port supported at this time'
+ self._nni_logical_port_no = root_ports[0].ofp_port.port_no
+
+
+ def _generate_default_rules(self, graph):
+
+ def root_device_default_rules(device):
+ flows = OrderedDict()
+ groups = OrderedDict()
+ return flows, groups
+
+ def leaf_device_default_rules(device):
+ ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
+ upstream_ports = [
+ port for port in ports if port.type == Port.PON_ONU \
+ or port.type == Port.VENET_ONU
+ ]
+ assert len(upstream_ports) == 1
+ downstream_ports = [
+ port for port in ports if port.type == Port.ETHERNET_UNI
+ ]
+
+ # it is possible that the downstream ports are not
+ # created, but the flow_decomposition has already
+ # kicked in. In such scenarios, cut short the processing
+ # and return.
+ if len(downstream_ports) == 0:
+ return None, None
+ # assert len(downstream_ports) == 1
+ upstream_port = upstream_ports[0]
+ flows = OrderedDict()
+ for downstream_port in downstream_ports:
+ flows.update(OrderedDict((f.id, f) for f in [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(downstream_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | 0)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+ output(upstream_port.port_no)
+ ]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(downstream_port.port_no),
+ vlan_vid(0)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+ output(upstream_port.port_no)
+ ]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(upstream_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
+ output(downstream_port.port_no)
+ ]
+ ),
+ ]))
+ groups = OrderedDict()
+ return flows, groups
+
+ root_device_id = self.self_proxy.get('/').root_device_id
+ rules = {}
+ for node_key in graph.nodes():
+ node = graph.node[node_key]
+ device = node.get('device', None)
+ if device is None:
+ continue
+ if device.id == root_device_id:
+ rules[device.id] = root_device_default_rules(device)
+ else:
+ rules[device.id] = leaf_device_default_rules(device)
+ return rules
+
+ def get_route(self, ingress_port_no, egress_port_no):
+ self._assure_cached_tables_up_to_date()
+ self.log.info('getting-route', eg_port=egress_port_no, in_port=ingress_port_no,
+ nni_port=self._nni_logical_port_no)
+ if egress_port_no is not None and \
+ (egress_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
+ self.log.info('controller-flow', eg_port=egress_port_no, in_port=ingress_port_no,
+ nni_port=self._nni_logical_port_no)
+ if ingress_port_no == self._nni_logical_port_no:
+ self.log.info('returning half route')
+ # This is a trap on the NNI Port
+ # Return a 'half' route to make the flow decomp logic happy
+ for (ingress, egress), route in self._routes.iteritems():
+ if egress == self._nni_logical_port_no:
+ return [None, route[1]]
+ raise Exception('not a single upstream route')
+ # treat it as if the output port is the NNI of the OLT
+ egress_port_no = self._nni_logical_port_no
+
+ # If ingress_port is not specified (None), it may be a wildcarded
+ # route if egress_port is OFPP_CONTROLLER or _nni_logical_port,
+ # in which case we need to create a half-route where only the egress
+ # hop is filled, the first hope is None
+ if ingress_port_no is None and \
+ egress_port_no == self._nni_logical_port_no:
+ # We can use the 2nd hop of any upstream route, so just find the
+ # first upstream:
+ for (ingress, egress), route in self._routes.iteritems():
+ if egress == self._nni_logical_port_no:
+ return [None, route[1]]
+ raise Exception('not a single upstream route')
+
+ # If egress_port is not specified (None), we can also can return a
+ # "half" route
+ if egress_port_no is None:
+ for (ingress, egress), route in self._routes.iteritems():
+ if ingress == ingress_port_no:
+ return [route[0], None]
+
+ # This can occur is a leaf device is disabled
+ self.log.exception('no-downstream-route',
+ ingress_port_no=ingress_port_no,
+ egress_port_no= egress_port_no
+ )
+ return None
+
+
+ return self._routes.get((ingress_port_no, egress_port_no))
+
+ def get_all_default_rules(self):
+ self._assure_cached_tables_up_to_date()
+ return self._default_rules
+
+ def get_wildcard_input_ports(self, exclude_port=None):
+ logical_ports = self.self_proxy.get('/ports')
+ return [port.ofp_port.port_no for port in logical_ports
+ if port.ofp_port.port_no != exclude_port]