| # |
| # Copyright 2017 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| Agent to play gateway between CORE and an individual adapter. |
| """ |
| from uuid import uuid4 |
| |
| import arrow |
| import structlog |
| from google.protobuf.json_format import MessageToJson |
| from scapy.packet import Packet |
| from twisted.internet.defer import inlineCallbacks, returnValue |
| from zope.interface import implementer |
| |
| from common.event_bus import EventBusClient |
| from common.frameio.frameio import hexify |
| from common.utils.id_generation import create_cluster_logical_device_ids |
| from voltha.adapters.interface import IAdapterAgent |
| from voltha.protos import third_party |
| from voltha.core.flow_decomposer import OUTPUT |
| from voltha.protos.device_pb2 import Device, Port, PmConfigs |
| from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, \ |
| AlarmEventSeverity, AlarmEventState, AlarmEventCategory |
| from voltha.protos.events_pb2 import KpiEvent |
| from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \ |
| LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey |
| from voltha.registry import registry |
| from common.utils.id_generation import create_cluster_device_id |
| import re |
| |
| |
| class MacAddressError(BaseException): |
| def __init__(self, error): |
| self.error = error |
| |
| |
| class IDError(BaseException): |
| def __init__(self, error): |
| self.error = error |
| |
| |
| @implementer(IAdapterAgent) |
| class AdapterAgent(object): |
| """ |
| Gate-keeper between CORE and device adapters. |
| |
| On one side it interacts with Core's internal model and update/dispatch |
| mechanisms. |
| |
| On the other side, it interacts with the adapters standard interface as |
| defined in |
| """ |
| |
| def __init__(self, adapter_name, adapter_cls): |
| self.adapter_name = adapter_name |
| self.adapter_cls = adapter_cls |
| self.core = registry('core') |
| self.adapter = None |
| self.adapter_node_proxy = None |
| self.root_proxy = self.core.get_proxy('/') |
| self._rx_event_subscriptions = {} |
| self._tx_event_subscriptions = {} |
| self.event_bus = EventBusClient() |
| self.packet_out_subscriptions = {} |
| self.log = structlog.get_logger(adapter_name=adapter_name) |
| self._onu_detect_event_subscriptions = {} |
| |
| @inlineCallbacks |
| def start(self): |
| self.log.debug('starting') |
| config = self._get_adapter_config() # this may be None |
| try: |
| adapter = self.adapter_cls(self, config) |
| yield adapter.start() |
| self.adapter = adapter |
| self.adapter_node_proxy = self._update_adapter_node() |
| self._update_device_types() |
| except Exception, e: |
| self.log.exception(e) |
| self.log.info('started') |
| returnValue(self) |
| |
| @inlineCallbacks |
| def stop(self): |
| self.log.debug('stopping') |
| if self.adapter is not None: |
| yield self.adapter.stop() |
| self.adapter = None |
| self.log.info('stopped') |
| |
| def _get_adapter_config(self): |
| """ |
| Opportunistically load persisted adapter configuration. |
| Return None if no configuration exists yet. |
| """ |
| proxy = self.core.get_proxy('/') |
| try: |
| config = proxy.get('/adapters/' + self.adapter_name) |
| return config |
| except KeyError: |
| return None |
| |
| def _update_adapter_node(self): |
| """ |
| Creates or updates the adapter node object based on self |
| description from the adapter. |
| """ |
| |
| adapter_desc = self.adapter.adapter_descriptor() |
| assert adapter_desc.id == self.adapter_name |
| path = self._make_up_to_date( |
| '/adapters', self.adapter_name, adapter_desc) |
| return self.core.get_proxy(path) |
| |
| def _update_device_types(self): |
| """ |
| Make sure device types are registered in Core |
| """ |
| device_types = self.adapter.device_types() |
| for device_type in device_types.items: |
| key = device_type.id |
| self._make_up_to_date('/device_types', key, device_type) |
| |
| def _make_up_to_date(self, container_path, key, data): |
| full_path = container_path + '/' + str(key) |
| root_proxy = self.core.get_proxy('/') |
| try: |
| root_proxy.get(full_path) |
| root_proxy.update(full_path, data) |
| except KeyError: |
| root_proxy.add(container_path, data) |
| return full_path |
| |
| def _remove_node(self, container_path, key): |
| """ |
| Remove a node from the data model |
| :param container_path: path to node |
| :param key: node |
| :return: None |
| """ |
| full_path = container_path + '/' + str(key) |
| root_proxy = self.core.get_proxy('/') |
| try: |
| root_proxy.get(full_path) |
| root_proxy.remove(full_path) |
| except KeyError: |
| # Node does not exist |
| pass |
| |
| # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| def adopt_device(self, device): |
| return self.adapter.adopt_device(device) |
| |
| def reconcile_device(self, device): |
| return self.adapter.reconcile_device(device) |
| |
| def abandon_device(self, device): |
| return self.adapter.abandon_device(device) |
| |
| def disable_device(self, device): |
| return self.adapter.disable_device(device) |
| |
| def reenable_device(self, device): |
| return self.adapter.reenable_device(device) |
| |
| def reboot_device(self, device): |
| return self.adapter.reboot_device(device) |
| |
| def download_image(self, device, request): |
| return self.adapter.download_image(device, request) |
| |
| def get_image_download_status(self, device, request): |
| return self.adapter.get_image_download_status(device, request) |
| |
| def cancel_image_download(self, device, request): |
| return self.adapter.cancel_image_download(device, request) |
| |
| def activate_image_update(self, device, request): |
| return self.adapter.activate_image_update(device, request) |
| |
| def revert_image_update(self, device, request): |
| return self.adapter.revert_image_update(device, request) |
| |
| def self_test(self, device): |
| return self.adapter.self_test_device(device) |
| |
| def delete_device(self, device): |
| # Remove all child devices |
| self.delete_all_child_devices(device.id) |
| # Removing device |
| return self.adapter.delete_device(device) |
| |
| def get_device_details(self, device): |
| return self.adapter.get_device_details(device) |
| |
| def update_flows_bulk(self, device, flows, groups): |
| return self.adapter.update_flows_bulk(device, flows, groups) |
| |
| def update_flows_incrementally(self, device, flow_changes, group_changes): |
| return self.adapter.update_flows_incrementally(device, flow_changes, group_changes) |
| |
| def suppress_alarm(self, filter): |
| return self.adapter.suppress_alarm(filter) |
| |
| def unsuppress_alarm(self, filter): |
| return self.adapter.unsuppress_alarm(filter) |
| |
| # def update_pm_collection(self, device, pm_collection_config): |
| # return self.adapter.update_pm_collection(device, pm_collection_config) |
| |
| def create_interface(self, device, data): |
| return self.adapter.create_interface(device, data) |
| |
| def update_interface(self, device, data): |
| return self.adapter.update_interface(device, data) |
| |
| def remove_interface(self, device, data): |
| return self.adapter.remove_interface(device, data) |
| |
| def create_tcont(self, device, tcont_data, traffic_descriptor_data): |
| return self.adapter.create_tcont(device, tcont_data, |
| traffic_descriptor_data) |
| |
| def update_tcont(self, device, tcont_data, traffic_descriptor_data): |
| return self.adapter.update_tcont(device, tcont_data, |
| traffic_descriptor_data) |
| |
| def remove_tcont(self, device, tcont_data, traffic_descriptor_data): |
| return self.adapter.remove_tcont(device, tcont_data, |
| traffic_descriptor_data) |
| |
| def create_gemport(self, device, data): |
| return self.adapter.create_gemport(device, data) |
| |
| def update_gemport(self, device, data): |
| return self.adapter.update_gemport(device, data) |
| |
| def remove_gemport(self, device, data): |
| return self.adapter.remove_gemport(device, data) |
| |
| def create_multicast_gemport(self, device, data): |
| return self.adapter.create_multicast_gemport(device, data) |
| |
| def update_multicast_gemport(self, device, data): |
| return self.adapter.update_multicast_gemport(device, data) |
| |
| def remove_multicast_gemport(self, device, data): |
| return self.adapter.remove_multicast_gemport(device, data) |
| |
| def create_multicast_distribution_set(self, device, data): |
| return self.adapter.create_multicast_distribution_set(device, data) |
| |
| def update_multicast_distribution_set(self, device, data): |
| return self.adapter.update_multicast_distribution_set(device, data) |
| |
| def remove_multicast_distribution_set(self, device, data): |
| return self.adapter.remove_multicast_distribution_set(device, data) |
| |
| # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| def get_device(self, device_id): |
| return self.root_proxy.get('/devices/{}'.format(device_id)) |
| |
| def get_child_device(self, parent_device_id, **kwargs): |
| """ |
| Retrieve a child device object belonging |
| to the specified parent device based on some match |
| criteria. The first child device that matches the |
| provided criteria is returned. |
| :param parent_device_id: parent's device id |
| :param **kwargs: arbitrary list of match criteria |
| :return: Child Device Object or None |
| """ |
| # Get all arguments to be used for comparison |
| # Note that for now we are only matching on the |
| # PON Interface Id (parent_port_no), ONU ID & SERIAL NUMBER |
| # Other matching fields can be added as required in the future |
| parent_port_no = kwargs.pop('parent_port_no', None) |
| onu_id = kwargs.pop('onu_id', None) |
| serial_number = kwargs.pop('serial_number', None) |
| if onu_id is None and serial_number is None: return None |
| |
| # Get all devices |
| devices = self.root_proxy.get('/devices') |
| |
| # Get all child devices with the same parent ID |
| children_ids = set( |
| d.id for d in devices if d.parent_id == parent_device_id) |
| |
| # Loop through all the child devices with this parent ID |
| for child_id in children_ids: |
| found = True |
| device = self.get_device(child_id) |
| |
| # Does this child device match the passed in ONU ID? |
| found_onu_id = False |
| if onu_id is not None: |
| if device.proxy_address.onu_id == onu_id: |
| # Does the ONU ID belong to the right PON port? |
| # (ONU IDs are not unique across PON ports) |
| if parent_port_no is not None: |
| if device.parent_port_no == parent_port_no: |
| found_onu_id = True |
| else: |
| found_onu_id = True |
| |
| # Does this child device match the passed in SERIAL NUMBER? |
| found_serial_number = False |
| if serial_number is not None: |
| if device.serial_number == serial_number: |
| found_serial_number = True |
| # Match ONU ID and SERIAL NUMBER |
| if onu_id is not None and serial_number is not None: |
| found = found_onu_id & found_serial_number |
| # Otherwise ONU ID or SERIAL NUMBER |
| else: |
| found = found_onu_id | found_serial_number |
| |
| # Return the matched child device |
| if found is True: |
| return device |
| |
| return None |
| |
| def add_device(self, device): |
| assert isinstance(device, Device) |
| self._make_up_to_date('/devices', device.id, device) |
| |
| # Ultimately, assign devices to device grpups. |
| # see https://jira.opencord.org/browse/CORD-838 |
| |
| dg = DeviceGroup(id='1') |
| self._make_up_to_date('/device_groups', dg.id, dg) |
| |
| # add device to device group |
| # see https://jira.opencord.org/browse/CORD-838 |
| |
| def update_device(self, device): |
| assert isinstance(device, Device) |
| |
| # we run the update through the device_agent so that the change |
| # does not loop back to the adapter unnecessarily |
| device_agent = self.core.get_device_agent(device.id) |
| device_agent.update_device(device) |
| |
| def update_device_pm_config(self, device_pm_config, init=False): |
| assert isinstance(device_pm_config, PmConfigs) |
| |
| # we run the update through the device_agent so that the change |
| # does not loop back to the adapter unnecessarily |
| device_agent = self.core.get_device_agent(device_pm_config.id) |
| device_agent.update_device_pm_config(device_pm_config, init) |
| |
| def update_adapter_pm_config(self, device_id, device_pm_config): |
| device = self.get_device(device_id) |
| self.adapter.update_pm_config(device, device_pm_config) |
| |
| def update_image_download(self, img_dnld): |
| self.log.info('update-image-download', img_dnld=img_dnld) |
| try: |
| # we run the update through the device_agent so that the change |
| # does not loop back to the adapter unnecessarily |
| device_agent = self.core.get_device_agent(img_dnld.id) |
| device_agent.update_device_image_download(img_dnld) |
| except Exception as e: |
| self.log.exception(e.message) |
| |
| def delete_image_download(self, img_dnld): |
| self.log.info('delete-image-download', img_dnld=img_dnld) |
| try: |
| root_proxy = self.core.get_proxy('/') |
| path = '/devices/{}/image_downloads/{}'. \ |
| format(img_dnld.id, img_dnld.name) |
| root_proxy.get(path) |
| root_proxy.remove(path) |
| device_agent = self.core.get_device_agent(img_dnld.id) |
| device_agent.unregister_device_image_download(img_dnld.name) |
| except Exception as e: |
| self.log.exception(e.message) |
| |
| def _add_peer_reference(self, device_id, port): |
| # for referential integrity, add/augment references |
| port.device_id = device_id |
| me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no) |
| for peer in port.peers: |
| peer_port_path = '/devices/{}/ports/{}'.format( |
| peer.device_id, peer.port_no) |
| peer_port = self.root_proxy.get(peer_port_path) |
| if me_as_peer not in peer_port.peers: |
| new = peer_port.peers.add() |
| new.CopyFrom(me_as_peer) |
| self.root_proxy.update(peer_port_path, peer_port) |
| |
| def _del_peer_reference(self, device_id, port): |
| me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no) |
| for peer in port.peers: |
| peer_port_path = '/devices/{}/ports/{}'.format( |
| peer.device_id, peer.port_no) |
| peer_port = self.root_proxy.get(peer_port_path) |
| if me_as_peer in peer_port.peers: |
| peer_port.peers.remove(me_as_peer) |
| self.root_proxy.update(peer_port_path, peer_port) |
| |
| def add_port(self, device_id, port): |
| assert isinstance(port, Port) |
| |
| # for referential integrity, add/augment references |
| self._add_peer_reference(device_id, port) |
| |
| # Add port |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def get_ports(self, device_id, port_type=None): |
| # assert Port.PortType.DESCRIPTOR.values_by_name[port_type] |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| if port_type is None: |
| return ports |
| else: |
| return [p for p in ports if p.type == port_type] |
| |
| def get_port(self, device_id, port_no=None, label=None): |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| for p in ports: |
| if p.label == label or p.port_no == port_no: |
| return p |
| return None |
| |
| def delete_port(self, device_id, port): |
| assert isinstance(port, Port) |
| # for referential integrity, add/augment references |
| self._del_peer_reference(device_id, port) |
| # Delete port |
| self._remove_node('/devices/{}/ports'.format(device_id), port.port_no) |
| |
| def disable_all_ports(self, device_id): |
| """ |
| Disable all ports on that device, i.e. change the admin status to |
| disable and operational status to UNKNOWN. |
| :param device_id: device id |
| :return: None |
| """ |
| |
| # get all device ports |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| for port in ports: |
| port.admin_state = AdminState.DISABLED |
| port.oper_status = OperStatus.UNKNOWN |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def enable_all_ports(self, device_id): |
| """ |
| Re-enable all ports on that device, i.e. change the admin status to |
| enabled and operational status to ACTIVE |
| :param device_id: device id |
| :return: None |
| """ |
| |
| # get all device ports |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| for port in ports: |
| port.admin_state = AdminState.ENABLED |
| port.oper_status = OperStatus.ACTIVE |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def update_operstatus_all_ports(self, device_id, oper_status): |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| for port in ports: |
| port.oper_status = oper_status |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def delete_all_peer_references(self, device_id): |
| """ |
| Remove all peer port references for that device |
| :param device_id: device_id of device |
| :return: None |
| """ |
| ports = self.root_proxy.get('/devices/{}/ports'.format(device_id)) |
| for port in ports: |
| port_path = '/devices/{}/ports/{}'.format(device_id, port.port_no) |
| for peer in port.peers: |
| port.peers.remove(peer) |
| self.root_proxy.update(port_path, port) |
| |
| def delete_port_reference_from_parent(self, device_id, port): |
| """ |
| Delete the port reference from the parent device |
| :param device_id: id of device containing the port |
| :param port: port to remove |
| :return: None |
| """ |
| assert isinstance(port, Port) |
| self.log.debug('delete-port-reference', device_id=device_id, port=port) |
| self._del_peer_reference(device_id, port) |
| |
| # update child port details |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def add_port_reference_to_parent(self, device_id, port): |
| """ |
| Add the port reference to the parent device |
| :param device_id: id of device containing the port |
| :param port: port to add |
| :return: None |
| """ |
| assert isinstance(port, Port) |
| self.log.debug('add-port-reference', device_id=device_id, port=port) |
| self._add_peer_reference(device_id, port) |
| # update child port details |
| self._make_up_to_date('/devices/{}/ports'.format(device_id), |
| port.port_no, port) |
| |
| def get_logical_device(self, logical_device_id): |
| return self.root_proxy.get('/logical_devices/{}'.format( |
| logical_device_id)) |
| |
| def get_logical_port(self, logical_device_id, port_id): |
| return self.root_proxy.get('/logical_devices/{}/ports/{}'.format( |
| logical_device_id, port_id)) |
| |
| def _create_cluster_ids_from_dpid(self, dpid): |
| """ |
| Create a logical device id using a datapath id. |
| :param dpid: Must be present and formatted as a mac address |
| :return: a unique logical device id and a formatted datapath id. If |
| the dpid was already registered then an exception will be raised. |
| """ |
| switch_id = int(dpid.replace(':', ''), 16) |
| logical_devices = self.root_proxy.get('/logical_devices') |
| existing_ids = set(ld.id for ld in logical_devices) |
| existing_datapath_ids = set(ld.datapath_id for ld in logical_devices) |
| core_id = registry('core').core_store_id |
| |
| ld_id, dp_id = create_cluster_logical_device_ids(core_id, switch_id) |
| ids_exist = dp_id in existing_datapath_ids or \ |
| ld_id in existing_ids |
| if not ids_exist: |
| return ld_id, dp_id |
| else: |
| self.log.error('ID-already-registered', logical_id=ld_id, |
| dpid=dpid) |
| raise IDError('ID-already-registered') |
| |
| def _is_valid_mac_address(self, data): |
| return re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", |
| data) |
| |
| def create_logical_device(self, logical_device, dpid=None): |
| """ |
| Allow the adapters to provide their own datapath id. This must |
| be the OLT MAC address. If the dpid is None or is not a mac |
| address then an exception will be raised. |
| :param logical_device: logical device |
| :param dpid: OLT MAC address. dpid default param is None just to be |
| backward compatible with existing adapters. |
| :return: updated logical device |
| """ |
| assert isinstance(logical_device, LogicalDevice) |
| |
| # Validate the dpid - it needs to be present and formatted as a mac |
| # address |
| if dpid: |
| dpid = dpid.lower() |
| if not self._is_valid_mac_address(dpid): |
| self.log.error('DPID-not-a-mac-address', dpid=dpid) |
| raise MacAddressError('DPID-not-a-mac-address') |
| else: |
| self.log.error('DPID-cannot-be-none') |
| raise MacAddressError("DPID-cannot-be-none") |
| |
| if not logical_device.id: |
| ld_id, dp_id = self._create_cluster_ids_from_dpid(dpid) |
| logical_device.id = ld_id |
| logical_device.datapath_id = dp_id |
| |
| if not logical_device.desc.mfr_desc: |
| logical_device.desc.mfr_desc = "VOLTHA Project" |
| |
| self._make_up_to_date('/logical_devices', |
| logical_device.id, logical_device) |
| |
| # Keep a reference to the packet out subscription as it will be |
| # referred during removal |
| self.packet_out_subscriptions[logical_device.id] = self.event_bus.subscribe( |
| topic='packet-out:{}'.format(logical_device.id), |
| callback=lambda _, p: self.receive_packet_out(logical_device.id, p) |
| ) |
| |
| return logical_device |
| |
| def reconcile_logical_device(self, logical_device_id): |
| """ |
| This is called by the adapter to reconcile the physical device with |
| the logical device. For now, we only set the packet-out subscription |
| :param logical_device_id: |
| :return: |
| """ |
| # Keep a reference to the packet out subscription as it will be |
| # referred during removal |
| self.packet_out_subscriptions[logical_device_id] = self.event_bus.subscribe( |
| topic='packet-out:{}'.format(logical_device_id), |
| callback=lambda _, p: self.receive_packet_out(logical_device_id, p) |
| ) |
| |
| def delete_logical_device(self, logical_device): |
| """ |
| This will remove the logical device as well as all logical ports |
| associated with it |
| :param logical_device: The logical device to remove |
| :return: None |
| """ |
| assert isinstance(logical_device, LogicalDevice) |
| |
| # Remove packet out subscription |
| self.event_bus.unsubscribe(self.packet_out_subscriptions[logical_device.id]) |
| del self.packet_out_subscriptions[logical_device.id] |
| |
| # Remove node from the data model - this will trigger the logical |
| # device 'remove callbacks' as well as logical ports 'remove |
| # callbacks' if present |
| self._remove_node('/logical_devices', logical_device.id) |
| |
| def receive_packet_out(self, logical_device_id, ofp_packet_out): |
| |
| def get_port_out(opo): |
| for action in opo.actions: |
| if action.type == OUTPUT: |
| return action.output.port |
| |
| out_port = get_port_out(ofp_packet_out) |
| frame = ofp_packet_out.data |
| self.adapter.receive_packet_out(logical_device_id, out_port, frame) |
| |
| def add_logical_port(self, logical_device_id, port): |
| assert isinstance(port, LogicalPort) |
| self._make_up_to_date( |
| '/logical_devices/{}/ports'.format(logical_device_id), |
| port.id, port) |
| |
| def delete_logical_port(self, logical_device_id, port): |
| assert isinstance(port, LogicalPort) |
| self._remove_node('/logical_devices/{}/ports'.format( |
| logical_device_id), port.id) |
| |
| def delete_logical_port_by_id(self, logical_device_id, port_id): |
| self._remove_node('/logical_devices/{}/ports'.format( |
| logical_device_id), port_id) |
| |
| def update_logical_port(self, logical_device_id, port): |
| assert isinstance(port, LogicalPort) |
| self.log.debug('update-logical-port', |
| logical_device_id=logical_device_id, |
| port=port) |
| self._make_up_to_date( |
| '/logical_devices/{}/ports'.format(logical_device_id), |
| port.id, port) |
| |
| def get_child_devices(self, parent_device_id): |
| try: |
| devices = self.root_proxy.get('/devices') |
| children = [d for d in devices if d.parent_id == parent_device_id] |
| return children |
| except Exception, e: |
| self.log.exception('failure', e=e) |
| |
| def subscribe_to_proxy_child_messages(self, proxy_address): |
| topic = self._gen_tx_proxy_address_topic(proxy_address) |
| self._tx_event_subscriptions[topic] = self.event_bus.subscribe( |
| topic, lambda t, m: self._send_proxied_message(proxy_address, m)) |
| |
| def reconcile_child_devices(self, parent_device_id): |
| children = self.get_child_devices(parent_device_id) |
| for child in children: |
| # First subscribe to proxy messages from a chile device |
| self.subscribe_to_proxy_child_messages(child.proxy_address) |
| |
| # Then trigger the reconciliation of the existing child device |
| device_agent = self.core.get_device_agent(child.id) |
| device_agent.reconcile_existing_device(child) |
| |
| # Obselete API - discouraged to be decommissioned after |
| # adapters are align to new APIs |
| def child_device_detected(self, |
| parent_device_id, |
| parent_port_no, |
| child_device_type, |
| proxy_address, |
| admin_state, |
| **kw): |
| # we create new ONU device objects and insert them into the config |
| device = Device( |
| id=create_cluster_device_id(self.core.core_store_id), |
| # id=uuid4().hex[:12], |
| type=child_device_type, |
| parent_id=parent_device_id, |
| parent_port_no=parent_port_no, |
| proxy_address=proxy_address, |
| admin_state=admin_state, |
| **kw |
| ) |
| self._make_up_to_date( |
| '/devices', device.id, device) |
| |
| topic = self._gen_tx_proxy_address_topic(proxy_address) |
| self._tx_event_subscriptions[topic] = self.event_bus.subscribe( |
| topic, lambda t, m: self._send_proxied_message(proxy_address, m)) |
| |
| def add_onu_device(self, |
| parent_device_id, |
| parent_port_no, |
| vendor_id, |
| proxy_address, |
| admin_state, |
| **kw): |
| |
| device_type = None |
| |
| for dt in self.root_proxy.get('/device_types'): |
| if (dt.vendor_id == vendor_id or vendor_id in dt.vendor_ids) and \ |
| dt.id.endswith("_onu"): |
| device_type = dt |
| |
| assert device_type is not None |
| |
| # we create new ONU device objects and insert them into the config |
| device = Device( |
| id=create_cluster_device_id(self.core.core_store_id), |
| # id=uuid4().hex[:12], |
| type=device_type.id, |
| vendor_id=vendor_id, |
| parent_id=parent_device_id, |
| parent_port_no=parent_port_no, |
| proxy_address=proxy_address, |
| admin_state=admin_state, |
| adapter=device_type.adapter, |
| **kw |
| ) |
| self._make_up_to_date('/devices', device.id, device) |
| |
| topic = self._gen_tx_proxy_address_topic(proxy_address) |
| self._tx_event_subscriptions[topic] = self.event_bus.subscribe( |
| topic, lambda t, m: self._send_proxied_message(proxy_address, m)) |
| |
| def get_child_device_with_proxy_address(self, proxy_address): |
| # Proxy address is defined as {parent id, channel_id} |
| devices = self.root_proxy.get('/devices') |
| children_ids = set(d.id for d in devices if d.parent_id == |
| proxy_address.device_id) |
| for child_id in children_ids: |
| device = self.get_device(child_id) |
| if device.proxy_address == proxy_address: |
| return device |
| |
| def remove_all_logical_ports(self, logical_device_id): |
| """ Remove all logical ports from a given logical device""" |
| ports = self.root_proxy.get('/logical_devices/{}/ports') |
| for port in ports: |
| self._remove_node('/logical_devices/{}/ports', port.id) |
| |
| def delete_all_child_devices(self, parent_device_id): |
| """ Remove all ONUs from a given OLT """ |
| devices = self.root_proxy.get('/devices') |
| children_ids = set() |
| for device in devices: |
| if device.parent_id == parent_device_id: |
| children_ids.add(device.id) |
| topic = self._gen_tx_proxy_address_topic(device.proxy_address) |
| self.event_bus.unsubscribe(self._tx_event_subscriptions[topic]) |
| del self._tx_event_subscriptions[topic] |
| |
| self.log.debug('devices-to-delete', |
| parent_id=parent_device_id, |
| children_ids=children_ids) |
| for child_id in children_ids: |
| self._remove_node('/devices', child_id) |
| |
| def update_child_devices_state(self, |
| parent_device_id, |
| oper_status=None, |
| connect_status=None, |
| admin_state=None): |
| """ Update status of all child devices """ |
| devices = self.root_proxy.get('/devices') |
| children_ids = set( |
| d.id for d in devices if d.parent_id == parent_device_id) |
| self.log.debug('update-devices', |
| parent_id=parent_device_id, |
| children_ids=children_ids, |
| oper_status=oper_status, |
| connect_status=connect_status, |
| admin_state=admin_state) |
| |
| for child_id in children_ids: |
| device = self.get_device(child_id) |
| if oper_status is not None: |
| device.oper_status = oper_status |
| if connect_status: |
| device.connect_status = connect_status |
| if admin_state: |
| device.admin_state = admin_state |
| self._make_up_to_date( |
| '/devices', device.id, device) |
| |
| def delete_child_device(self, parent_device_id, child_device_id): |
| onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id)) |
| if onu_device is not None: |
| if onu_device.parent_id == parent_device_id: |
| self.log.debug('deleting-child-device', |
| parent_device_id=parent_device_id, |
| child_device_id=child_device_id) |
| topic = self._gen_tx_proxy_address_topic( |
| onu_device.proxy_address) |
| self.event_bus.unsubscribe(self._tx_event_subscriptions[topic]) |
| del self._tx_event_subscriptions[topic] |
| self._remove_node('/devices', child_device_id) |
| |
| def _gen_rx_proxy_address_topic(self, proxy_address): |
| """Generate unique topic name specific to this proxy address for rx""" |
| topic = 'rx:' + MessageToJson(proxy_address) |
| return topic |
| |
| def _gen_tx_proxy_address_topic(self, proxy_address): |
| """Generate unique topic name specific to this proxy address for tx""" |
| topic = 'tx:' + MessageToJson(proxy_address) |
| return topic |
| |
| def register_for_proxied_messages(self, proxy_address): |
| topic = self._gen_rx_proxy_address_topic(proxy_address) |
| self._rx_event_subscriptions[topic] = self.event_bus.subscribe( |
| topic, |
| lambda t, m: self._receive_proxied_message(proxy_address, m)) |
| |
| def unregister_for_proxied_messages(self, proxy_address): |
| topic = self._gen_rx_proxy_address_topic(proxy_address) |
| self.event_bus.unsubscribe(self._rx_event_subscriptions[topic]) |
| del self._rx_event_subscriptions[topic] |
| |
| def _receive_proxied_message(self, proxy_address, msg): |
| self.adapter.receive_proxied_message(proxy_address, msg) |
| |
| def send_proxied_message(self, proxy_address, msg): |
| topic = self._gen_tx_proxy_address_topic(proxy_address) |
| self.event_bus.publish(topic, msg) |
| |
| def _send_proxied_message(self, proxy_address, msg): |
| self.adapter.send_proxied_message(proxy_address, msg) |
| |
| def receive_proxied_message(self, proxy_address, msg): |
| topic = self._gen_rx_proxy_address_topic(proxy_address) |
| self.event_bus.publish(topic, msg) |
| |
| def register_for_inter_adapter_messages(self): |
| self.event_bus.subscribe(self.adapter_name, |
| lambda t, |
| m: self.adapter.receive_inter_adapter_message( |
| m)) |
| |
| def unregister_for_inter_adapter_messages(self): |
| self.event_bus.unsubscribe(self.adapter_name) |
| |
| def publish_inter_adapter_message(self, device_id, msg): |
| # Get the device from the device_id |
| device = self.get_device(device_id) |
| assert device is not None |
| |
| # Publish a message to the adapter that is responsible |
| # for managing this device |
| self.event_bus.publish(device.type, msg) |
| |
| # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~ |
| |
| def send_packet_in(self, logical_device_id, logical_port_no, packet): |
| self.log.debug('send-packet-in', logical_device_id=logical_device_id, |
| logical_port_no=logical_port_no, packet=hexify(packet)) |
| |
| if isinstance(packet, Packet): |
| packet = str(packet) |
| |
| topic = 'packet-in:' + logical_device_id |
| self.event_bus.publish(topic, (logical_port_no, packet)) |
| |
| # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~ |
| |
| def submit_kpis(self, kpi_event_msg): |
| try: |
| assert isinstance(kpi_event_msg, KpiEvent) |
| self.event_bus.publish('kpis', kpi_event_msg) |
| except Exception as e: |
| self.log.exception('failed-kpi-submission', |
| type=type(kpi_event_msg)) |
| |
| # # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Handle flow stats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| def update_flow_stats(self, logical_device_id, flow_id, packet_count=0, |
| byte_count=0): |
| flows = self.root_proxy.get( |
| 'logical_devices/{}/flows'.format(logical_device_id)) |
| flow_to_update = None |
| for flow in flows: |
| if flow.id == flow_id: |
| flow_to_update = flow |
| flow_to_update.packet_count = packet_count |
| flow_to_update.byte_count = byte_count |
| break |
| if flow_to_update is not None: |
| self._make_up_to_date( |
| 'logical_devices/{}/flows'.format(logical_device_id), |
| flow_to_update.id, flow_to_update) |
| else: |
| self.log.warn('flow-to-update-not-found', |
| logical_device_id=logical_device_id, flow_id=flow_id) |
| |
| # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~ |
| |
| def create_alarm(self, id=None, resource_id=None, description=None, |
| raised_ts=0, changed_ts=0, |
| type=AlarmEventType.EQUIPMENT, |
| category=AlarmEventCategory.PON, |
| severity=AlarmEventSeverity.MINOR, |
| state=AlarmEventState.RAISED, |
| context=None, |
| logical_device_id=None): |
| |
| # Construct the ID if it is not provided |
| if id is None: |
| id = 'voltha.{}.{}'.format(self.adapter_name, resource_id) |
| |
| return AlarmEvent( |
| id=id, |
| resource_id=resource_id, |
| type=type, |
| category=category, |
| severity=severity, |
| state=state, |
| description=description, |
| reported_ts=arrow.utcnow().timestamp, |
| raised_ts=raised_ts, |
| changed_ts=changed_ts, |
| context=context, |
| logical_device_id=logical_device_id |
| ) |
| |
| def filter_alarm(self, device_id, alarm_event): |
| alarm_filters = self.root_proxy.get('/alarm_filters') |
| |
| rule_values = { |
| 'id': alarm_event.id, |
| 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type), |
| 'category': AlarmEventCategory.AlarmEventCategory.Name( |
| alarm_event.category), |
| 'severity': AlarmEventSeverity.AlarmEventSeverity.Name( |
| alarm_event.severity), |
| 'resource_id': alarm_event.resource_id, |
| 'device_id': device_id |
| } |
| |
| for alarm_filter in alarm_filters: |
| if alarm_filter.rules: |
| exclude = True |
| for rule in alarm_filter.rules: |
| self.log.debug("compare-alarm-event", |
| key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name( |
| rule.key), |
| actual=rule_values[ |
| AlarmFilterRuleKey.AlarmFilterRuleKey.Name( |
| rule.key)].lower(), |
| expected=rule.value.lower()) |
| exclude = exclude and \ |
| (rule_values[ |
| AlarmFilterRuleKey.AlarmFilterRuleKey.Name( |
| rule.key)].lower() == rule.value.lower()) |
| if not exclude: |
| break |
| |
| if exclude: |
| self.log.info("filtered-alarm-event", alarm=alarm_event) |
| return True |
| |
| return False |
| |
| def submit_alarm(self, device_id, alarm_event_msg): |
| try: |
| assert isinstance(alarm_event_msg, AlarmEvent) |
| if not self.filter_alarm(device_id, alarm_event_msg): |
| self.event_bus.publish('alarms', alarm_event_msg) |
| |
| except Exception as e: |
| self.log.exception('failed-alarm-submission', |
| type=type(alarm_event_msg)) |
| |
| # ~~~~~~~~~~~~~~~~~~~ Handle ONU detect ~~~~~~~~~~~~~~~~~~~~~ |
| |
| def _gen_onu_detect_proxy_address_topic(self, device_id): |
| """Generate unique topic name specific to this device id for onu detect""" |
| topic = str('onu_detect:{}'.format(device_id)) |
| return topic |
| |
| def register_for_onu_detect_state(self, device_id): |
| topic = self._gen_onu_detect_proxy_address_topic(device_id) |
| self._onu_detect_event_subscriptions[topic] = self.event_bus.subscribe( |
| topic, |
| lambda t, m: self._forward_onu_detect_state(device_id, m)) |
| |
| def unregister_for_onu_detect_state(self, device_id): |
| topic = self._gen_onu_detect_proxy_address_topic(device_id) |
| self.event_bus.unsubscribe(self._onu_detect_event_subscriptions[topic]) |
| del self._onu_detect_event_subscriptions[topic] |
| |
| def _forward_onu_detect_state(self, device_id, state): |
| self.adapter.receive_onu_detect_state(device_id, state) |
| |
| def forward_onu_detect_state(self, device_id, state): |
| topic = self._gen_onu_detect_proxy_address_topic(device_id) |
| self.event_bus.publish(topic, state) |