blob: e3290f87034556ca484e999eb90ce918378f470c [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import arrow
import structlog
from voltha.adapters.adtran_olt.xpon.adtran_xpon import AdtranXPON
from pon_port import PonPort
from uni_port import UniPort
from heartbeat import HeartBeat
from voltha.adapters.adtran_olt.alarms.adapter_alarms import AdapterAlarms
from onu_pm_metrics import OnuPmMetrics
from uuid import uuid4
from twisted.internet import reactor
from twisted.internet.defer import DeferredQueue, inlineCallbacks
from twisted.internet.defer import returnValue
from voltha.protos import third_party
from voltha.protos.common_pb2 import OperStatus, ConnectStatus
from voltha.protos.device_pb2 import Image
from common.utils.indexpool import IndexPool
from voltha.extensions.omci.openomci_agent import OpenOMCIAgent
_ = third_party
_MAXIMUM_PORT = 128 # PON and UNI ports
_ONU_REBOOT_MIN = 60
_ONU_REBOOT_RETRY = 10
class AdtranOnuHandler(AdtranXPON):
def __init__(self, adapter, device_id):
kwargs = dict()
super(AdtranOnuHandler, self).__init__(**kwargs)
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
self.logical_device_id = None
self.proxy_address = None
self._event_messages = None
self._enabled = False
self.pm_metrics = None
self.alarms = None
self._mgmt_gemport_aes = False
self._upstream_channel_speed = 0
self._unis = dict() # Port # -> UniPort
self._pon = None
self._heartbeat = HeartBeat.create(self, device_id)
self._deferred = None
self._event_deferred = None
# TODO: Remove next two lines if/when OpenOMCI is in the core or a container
# in order to support multiple ONUs per instance
self._omci_agent = OpenOMCIAgent(self.adapter_agent.core)
self._omci_agent.start()
self._port_number_pool = IndexPool(_MAXIMUM_PORT, 1)
self._olt_created = False # True if deprecated method of OLT creating DA is used
self._is_mock = False
def __str__(self):
return "AdtranOnuHandler: {}".format(self.device_id)
def _cancel_deferred(self):
d1, self._deferred = self._deferred, None
d2, self._event_deferred = self._event_deferred, None
for d in [d1, d2]:
try:
if d is not None and not d.called:
d.cancel()
except:
pass
@property
def enabled(self):
return self._enabled
@enabled.setter
def enabled(self, value):
assert isinstance(value, bool), 'enabled is a boolean'
if self._enabled != value:
self._enabled = value
if self._enabled:
self.start()
else:
self.stop()
@property
def mgmt_gemport_aes(self):
return self._mgmt_gemport_aes
@mgmt_gemport_aes.setter
def mgmt_gemport_aes(self, value):
if self._mgmt_gemport_aes != value:
self._mgmt_gemport_aes = value
# TODO: Anything else
@property
def upstream_channel_speed(self):
return self._upstream_channel_speed
@upstream_channel_speed.setter
def upstream_channel_speed(self, value):
if self._upstream_channel_speed != value:
self._upstream_channel_speed = value
# TODO: Anything else
@property
def is_mock(self):
return self._is_mock # Not pointing to real hardware
@property
def olt_created(self):
return self._olt_created # ONU was created with deprecated 'child_device_detected' call
@property
def omci_agent(self):
return self._omci_agent
@property
def omci(self):
# TODO: Decrement access to Communications channel at this point? What about current PM stuff?
_onu_omci_device = self._pon.onu_omci_device
return _onu_omci_device.omci_cc if _onu_omci_device is not None else None
@property
def heartbeat(self):
return self._heartbeat
@property
def uni_ports(self):
return self._unis.values()
def uni_port(self, port_no_or_name):
if isinstance(port_no_or_name, (str, unicode)):
return next((uni for uni in self.uni_ports
if uni.name == port_no_or_name), None)
assert isinstance(port_no_or_name, int), 'Invalid parameter type'
return self._unis.get(port_no_or_name)
@property
def pon_port(self):
return self._pon
@property
def _next_port_number(self):
return self._port_number_pool.get_next()
def _release_port_number(self, number):
self._port_number_pool.release(number)
def start(self):
assert self._enabled, 'Start should only be called if enabled'
self._cancel_deferred()
# Handle received ONU event messages TODO: Deprecate this....
self._event_messages = DeferredQueue()
self._event_deferred = reactor.callLater(0, self._handle_onu_events)
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
# Port startup
if self._pon is not None:
self._pon.enabled = True
for port in self.uni_ports:
port.enabled = True
# Heartbeat
self._heartbeat.enabled = True
def stop(self):
assert not self._enabled, 'Stop should only be called if disabled'
#
# TODO: Perform common shutdown tasks here
#
self._cancel_deferred()
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
# Heartbeat
self._heartbeat.stop()
# OMCI Communications
# if self._onu_omci_device is not None:
# self._onu_omci_device.stop()
# Port shutdown
for port in self.uni_ports:
port.enabled = False
if self._pon is not None:
self._pon.enabled = False
queue, self._event_deferred = self._event_deferred, None
if queue is not None:
while queue.pending:
_ = yield queue.get()
def receive_message(self, msg):
if self.omci is not None and self.enabled:
self.omci.receive_message(msg)
def activate(self, device):
self.log.info('activating')
# first we verify that we got parent reference and proxy info
assert device.parent_id, 'Invalid Parent ID'
assert device.proxy_address.device_id, 'Invalid Device ID'
if device.vlan:
# vlan non-zero if created via legacy method (not xPON). Also
# Set a random serial number since not xPON based
self._olt_created = True
# register for proxied messages right away
self.proxy_address = device.proxy_address
self.adapter_agent.register_for_proxied_messages(device.proxy_address)
# initialize device info
device.root = True
device.vendor = 'Adtran Inc.'
device.model = 'n/a'
device.hardware_version = 'n/a'
device.firmware_version = 'n/a'
device.reason = ''
device.connect_status = ConnectStatus.UNKNOWN
############################################################################
# Setup PM configuration for this device
self.pm_metrics = OnuPmMetrics(self, device, grouped=True, freq_override=False)
pm_config = self.pm_metrics.make_proto()
self.log.info("initial-pm-config", pm_config=pm_config)
self.adapter_agent.update_device_pm_config(pm_config, init=True)
############################################################################
# Setup Alarm handler
self.alarms = AdapterAlarms(self.adapter, device.id)
# reference of uni_port is required when re-enabling the device if
# it was disabled previously
# Need to query ONU for number of supported uni ports
# For now, temporarily set number of ports to 1 - port #2
parent_device = self.adapter_agent.get_device(device.parent_id)
self.logical_device_id = parent_device.parent_id
assert self.logical_device_id, 'Invalid logical device ID'
# Register physical ports. Should have at least one of each
self._pon = PonPort.create(self, self._next_port_number)
self.adapter_agent.add_port(device.id, self._pon.get_port())
if self._olt_created:
# vlan non-zero if created via legacy method (not xPON). Also
# Set a random serial number since not xPON based
uni_port = UniPort.create(self, self._next_port_number, device.vlan,
'deprecated', device.vlan, None)
self._unis[uni_port.port_number] = uni_port
self.adapter_agent.add_port(device.id, uni_port.get_port())
device.serial_number = uuid4().hex
uni_port.add_logical_port(device.vlan, subscriber_vlan=device.vlan)
# Start things up for this ONU Handler.
self.enabled = True
# Start collecting stats from the device after a brief pause
reactor.callLater(30, self.start_kpi_collection, device.id)
self.adapter_agent.update_device(device)
def reconcile(self, device):
self.log.info('reconciling-ONU-device-starts')
# first we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
# assert device.proxy_address.channel_id
self._cancel_deferred()
# register for proxied messages right away
self.proxy_address = device.proxy_address
self.adapter_agent.register_for_proxied_messages(device.proxy_address)
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
# Set the connection status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
self.enabled = True
# TODO: Verify that the uni, pon and logical ports exists
# Mark the device as REACHABLE and ACTIVE
device = self.adapter_agent.get_device(device.id)
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.adapter_agent.update_device(device)
self.log.info('reconciling-ONU-device-ends')
def update_pm_config(self, device, pm_config):
# TODO: This has not been tested
self.log.info('update_pm_config', pm_config=pm_config)
self.pm_metrics.update(pm_config)
def start_kpi_collection(self, device_id):
# TODO: This has not been tested
def _collect(device_id, prefix):
from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
if self.enabled:
try:
# Step 1: gather metrics from device
port_metrics = self.pm_metrics.collect_port_metrics()
# Step 2: prepare the KpiEvent for submission
# we can time-stamp them here or could use time derived from OLT
ts = arrow.utcnow().timestamp
kpi_event = KpiEvent(
type=KpiEventType.slice,
ts=ts,
prefixes={
prefix + '.{}'.format(k): MetricValuePairs(metrics=port_metrics[k])
for k in port_metrics.keys()}
)
# Step 3: submit
self.adapter_agent.submit_kpis(kpi_event)
except Exception as e:
self.log.exception('failed-to-submit-kpis', e=e)
self.pm_metrics.start_collector(_collect)
@inlineCallbacks
def update_flow_table(self, device, flows):
#
# We need to proxy through the OLT to get to the ONU
# Configuration from here should be using OMCI
#
# self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
import voltha.core.flow_decomposer as fd
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC
def is_downstream(port):
return port == 100 # Need a better way
def is_upstream(port):
return not is_downstream(port)
omci = self.omci
for flow in flows:
_type = None
_port = None
_vlan_vid = None
_udp_dst = None
_udp_src = None
_ipv4_dst = None
_ipv4_src = None
_metadata = None
_output = None
_push_tpid = None
_field = None
_set_vlan_vid = None
self.log.info('bulk-flow-update', device_id=device.id, flow=flow)
try:
_in_port = fd.get_in_port(flow)
assert _in_port is not None
if is_downstream(_in_port):
self.log.info('downstream-flow')
elif is_upstream(_in_port):
self.log.info('upstream-flow')
else:
raise Exception('port should be 1 or 2 by our convention')
_out_port = fd.get_out_port(flow) # may be None
self.log.info('out-port', out_port=_out_port)
for field in fd.get_ofb_fields(flow):
if field.type == fd.ETH_TYPE:
_type = field.eth_type
self.log.info('field-type-eth-type',
eth_type=_type)
elif field.type == fd.IP_PROTO:
_proto = field.ip_proto
self.log.info('field-type-ip-proto',
ip_proto=_proto)
elif field.type == fd.IN_PORT:
_port = field.port
self.log.info('field-type-in-port',
in_port=_port)
elif field.type == fd.VLAN_VID:
_vlan_vid = field.vlan_vid & 0xfff
self.log.info('field-type-vlan-vid',
vlan=_vlan_vid)
elif field.type == fd.VLAN_PCP:
_vlan_pcp = field.vlan_pcp
self.log.info('field-type-vlan-pcp',
pcp=_vlan_pcp)
elif field.type == fd.UDP_DST:
_udp_dst = field.udp_dst
self.log.info('field-type-udp-dst',
udp_dst=_udp_dst)
elif field.type == fd.UDP_SRC:
_udp_src = field.udp_src
self.log.info('field-type-udp-src',
udp_src=_udp_src)
elif field.type == fd.IPV4_DST:
_ipv4_dst = field.ipv4_dst
self.log.info('field-type-ipv4-dst',
ipv4_dst=_ipv4_dst)
elif field.type == fd.IPV4_SRC:
_ipv4_src = field.ipv4_src
self.log.info('field-type-ipv4-src',
ipv4_dst=_ipv4_src)
elif field.type == fd.METADATA:
_metadata = field.table_metadata
self.log.info('field-type-metadata',
metadata=_metadata)
else:
raise NotImplementedError('field.type={}'.format(
field.type))
for action in fd.get_actions(flow):
if action.type == fd.OUTPUT:
_output = action.output.port
self.log.info('action-type-output',
output=_output, in_port=_in_port)
elif action.type == fd.POP_VLAN:
self.log.info('action-type-pop-vlan',
in_port=_in_port)
elif action.type == fd.PUSH_VLAN:
_push_tpid = action.push.ethertype
self.log.info('action-type-push-vlan',
push_tpid=_push_tpid, in_port=_in_port)
if action.push.ethertype != 0x8100:
self.log.error('unhandled-tpid',
ethertype=action.push.ethertype)
elif action.type == fd.SET_FIELD:
_field = action.set_field.field.ofb_field
assert (action.set_field.field.oxm_class ==
OFPXMC_OPENFLOW_BASIC)
self.log.info('action-type-set-field',
field=_field, in_port=_in_port)
if _field.type == fd.VLAN_VID:
_set_vlan_vid = _field.vlan_vid & 0xfff
self.log.info('set-field-type-valn-vid', _set_vlan_vid)
else:
self.log.error('unsupported-action-set-field-type',
field_type=_field.type)
else:
self.log.error('unsupported-action-type',
action_type=action.type, in_port=_in_port)
#
# All flows created from ONU adapter should be OMCI based
#
if _vlan_vid == 0 and _set_vlan_vid != None and _set_vlan_vid != 0:
# allow priority tagged packets
# Set AR - ExtendedVlanTaggingOperationConfigData
# 514 - RxVlanTaggingOperationTable - add VLAN <cvid> to priority tagged pkts - c-vid
results = yield omci.send_delete_vlan_tagging_filter_data(0x2102)
# self.send_set_vlan_tagging_filter_data(0x2102, _set_vlan_vid)
results = yield omci.send_create_vlan_tagging_filter_data(
0x2102,
_set_vlan_vid)
results = yield omci.send_set_extended_vlan_tagging_operation_vlan_configuration_data_untagged(
0x202,
0x1000,
_set_vlan_vid)
results = yield omci.send_set_extended_vlan_tagging_operation_vlan_configuration_data_single_tag(
0x202,
8,
0,
0,
1,
8,
_set_vlan_vid)
# Set AR - ExtendedVlanTaggingOperationConfigData
# 514 - RxVlanTaggingOperationTable - add VLAN <cvid> to priority tagged pkts - c-vid
'''
results = yield omci.send_set_extended_vlan_tagging_operation_vlan_configuration_data_single_tag(0x205, 8, 0, 0,
'''
except Exception as e:
self.log.exception('failed-to-install-flow', e=e, flow=flow)
@inlineCallbacks
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
self._cancel_deferred()
reregister = True
try:
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
except KeyError:
reregister = False
# Update the operational status to ACTIVATING and connect status to
# UNREACHABLE
device = self.adapter_agent.get_device(self.device_id)
previous_oper_status = device.oper_status
previous_conn_status = device.connect_status
device.oper_status = OperStatus.ACTIVATING
device.connect_status = ConnectStatus.UNREACHABLE
device.reason = 'Attempting reboot'
self.adapter_agent.update_device(device)
# TODO: send alert and clear alert after the reboot
if not self.is_mock:
from twisted.internet.defer import TimeoutError
try:
######################################################
# MIB Reset - For ADTRAN ONU, we do not get a response
# back (because we are rebooting)
pass
yield self.omci.send_reboot(timeout=0.1)
except TimeoutError:
# This is expected
returnValue('reboot-in-progress')
except Exception as e:
self.log.exception('send-reboot', e=e)
raise
# Reboot in progress. A reboot may take up to 3 min 30 seconds
# Go ahead and pause less than that and start to look
# for it being alive
device.reason = 'reboot in progress'
self.adapter_agent.update_device(device)
self._deferred = reactor.callLater(_ONU_REBOOT_MIN,
self._finish_reboot,
previous_oper_status,
previous_conn_status,
reregister)
@inlineCallbacks
def _finish_reboot(self, previous_oper_status, previous_conn_status,
reregister):
from common.utils.asleep import asleep
if not self.is_mock:
# TODO: Do a simple poll and call this again if we timeout
# _ONU_REBOOT_RETRY
yield asleep(180) # 3 minutes ...
# Change the operational status back to its previous state. With a
# real OLT the operational state should be the state the device is
# after a reboot.
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
device.oper_status = previous_oper_status
device.connect_status = previous_conn_status
device.reason = ''
self.adapter_agent.update_device(device)
if reregister:
self.adapter_agent.register_for_inter_adapter_messages()
self.log.info('reboot-complete', device_id=self.device_id)
def self_test_device(self, device):
"""
This is called to Self a device based on a NBI call.
:param device: A Voltha.Device object.
:return: Will return result of self test
"""
from voltha.protos.voltha_pb2 import SelfTestResponse
self.log.info('self-test-device', device=device.id)
# TODO: Support self test?
return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
def disable(self):
self.log.info('disabling', device_id=self.device_id)
self.enabled = False
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
# Disable all ports on that device
self.adapter_agent.disable_all_ports(self.device_id)
# Update the device operational status to UNKNOWN
device.oper_status = OperStatus.UNKNOWN
device.connect_status = ConnectStatus.UNREACHABLE
device.reason = 'Disabled'
self.adapter_agent.update_device(device)
# Remove the uni logical port from the OLT, if still present
parent_device = self.adapter_agent.get_device(device.parent_id)
assert parent_device
logical_device_id = parent_device.parent_id
assert logical_device_id
for uni in self.uni_ports:
port_id = 'uni-{}'.format(uni.port_number)
try:
port = self.adapter_agent.get_logical_port(logical_device_id,
port_id)
self.adapter_agent.delete_logical_port(logical_device_id, port)
except KeyError:
self.log.info('logical-port-not-found', device_id=self.device_id,
portid=port_id)
# Remove pon port from parent
if self._pon is not None:
self.adapter_agent.delete_port_reference_from_parent(self.device_id,
self._pon.get_port())
# Just updating the port status may be an option as well
# port.ofp_port.config = OFPPC_NO_RECV
# yield self.adapter_agent.update_logical_port(logical_device_id,
# port)
# Unregister for proxied message
self.adapter_agent.unregister_for_proxied_messages(
device.proxy_address)
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
self.log.info('disabled', device_id=device.id)
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
try:
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
self._cancel_deferred()
# First we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
# assert device.proxy_address.channel_id
# Re-register for proxied messages right away
self.proxy_address = device.proxy_address
self.adapter_agent.register_for_proxied_messages(
device.proxy_address)
# Re-enable the ports on that device
self.adapter_agent.enable_all_ports(self.device_id)
# Refresh the port reference
# self.uni_port = self._get_uni_port() deprecated
# Add the pon port reference to the parent
if self._pon is not None:
# TODO: Send 'enable' to PonPort?
self.adapter_agent.add_port_reference_to_parent(device.id,
self._pon.get_port())
# Update the connect status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
# re-add uni port to logical device
parent_device = self.adapter_agent.get_device(device.parent_id)
self.logical_device_id = parent_device.parent_id
assert self.logical_device_id, 'Invalid logical device ID'
if self.olt_created:
# vlan non-zero if created via legacy method (not xPON)
self.uni_port('deprecated').add_logical_port(device.vlan, device.vlan,
subscriber_vlan=device.vlan)
device = self.adapter_agent.get_device(device.id)
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.enabled = True
self.adapter_agent.update_device(device)
self.log.info('re-enabled', device_id=device.id)
except Exception, e:
self.log.exception('error-reenabling', e=e)
def delete(self):
self.log.info('deleting', device_id=self.device_id)
for uni in self._unis.itervalues():
uni.stop()
uni.delete()
self._pon.stop()
self._pon.delete()
# OpenOMCI cleanup
if self._omci_agent is not None:
self._omci_agent.remove_device(self.device_id, cleanup=True)
self._omci_agent = None
#
# handling needed here
# self.enabled = False
#
# # TODO: Need to implement this
# # 1) Remove all flows from the device
#
# self.log.info('deleted', device_id=self.device_id)
#
# # Drop device ID
# self.device_id = None @inlineCallbacks
# def delete_v_ont_ani(self, data):
# self.log.info('deleting-v_ont_ani')
#
# device = self.adapter_agent.get_device(self.device_id)
# # construct message
# # MIB Reset - OntData - 0
# if device.connect_status != ConnectStatus.REACHABLE:
# self.log.error('device-unreachable')
# returnValue(None)
#
# self.send_mib_reset()
# yield self.wait_for_response()
# self.proxy_address = device.proxy_address
# self.adapter_agent.unregister_for_proxied_messages(device.proxy_address)
#
# ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
# if ports is not None:
# for port in ports:
# if port.label == 'PON port':
# self.adapter_agent.delete_port(self.device_id, port)
# break
def _check_for_mock_config(self, data):
# Check for MOCK configuration
description = data.get('description')
if description is not None and 'mock' in description.lower():
self._is_mock = True
def on_ont_ani_create(self, ont_ani):
"""
A new ONT-ani is being created. You can override this method to
perform custom operations as needed. If you override this method, you can add
additional items to the item dictionary to track additional implementation
key/value pairs.
:param ont_ani: (dict) new ONT-ani
:return: (dict) Updated ONT-ani dictionary, None if item should be deleted
"""
self.log.info('ont-ani-create', ont_ani=ont_ani)
self._check_for_mock_config(ont_ani)
self.enabled = ont_ani['enabled']
return ont_ani # Implement in your OLT, if needed
def on_ont_ani_modify(self, ont_ani, update, diffs):
"""
A existing ONT-ani is being updated. You can override this method to
perform custom operations as needed. If you override this method, you can add
additional items to the item dictionary to track additional implementation
key/value pairs.
:param ont_ani: (dict) existing ONT-ani item dictionary
:param update: (dict) updated (changed) ONT-ani
:param diffs: (dict) collection of items different in the update
:return: (dict) Updated ONT-ani dictionary, None if item should be deleted
"""
valid_keys = ['enabled', 'mgnt-gemport-aes'] # Modify of these keys supported
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("ont_ani leaf '{}' is read-only or write-once".format(invalid_key))
keys = [k for k in diffs.keys() if k in valid_keys]
for k in keys:
if k == 'enabled':
self.enabled = update[k]
elif k == 'mgnt-gemport-aes':
self.mgmt_gemport_aes = update[k]
return update
def on_ont_ani_delete(self, ont_ani):
"""
A existing ONT-ani is being deleted. You can override this method to
perform custom operations as needed. If you override this method, you can add
additional items to the item dictionary to track additional implementation
key/value pairs.
:param ont_ani: (dict) ONT-ani to delete
:return: (dict) None if item should be deleted
"""
# TODO: Is this ever called or is the iAdapter 'delete' called first?
return None # Implement in your OLT, if needed
def on_vont_ani_create(self, vont_ani):
self.log.info('vont-ani-create', vont_ani=vont_ani)
self._check_for_mock_config(vont_ani)
# TODO: look up PON port and update 'upstream-channel-speed'
return vont_ani # Implement in your OLT, if needed
def on_vont_ani_modify(self, vont_ani, update, diffs):
valid_keys = ['upstream-channel-speed'] # Modify of these keys supported
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("vont_ani leaf '{}' is read-only or write-once".format(invalid_key))
keys = [k for k in diffs.keys() if k in valid_keys]
for k in keys:
if k == 'upstream-channel-speed':
self.upstream_channel_speed = update[k]
return update
def on_vont_ani_delete(self, vont_ani):
return self.delete()
def on_venet_create(self, venet):
self.log.info('venet-create', venet=venet)
self._check_for_mock_config(venet)
# TODO: This first set is copied over from BroadCOM ONU. For testing, actual work
# is the last 7 lines. The 'test' code below assumes we have not registered
# any UNI ports during 'activate' but we want to create them as the vEnet
# information comes in.
# onu_device = self.adapter_agent.get_device(self.device_id)
# existing_uni_ports = self.adapter_agent.get_ports(onu_device.parent_id, Port.ETHERNET_UNI)
#
# parent_port_num = None
# for uni in existing_uni_ports:
# if uni.label == venet['name']: # TODO: was -> data.interface.name:
# parent_port_num = uni.port_no
# break
#
# # Create both the physical and logical ports for the UNI now
# parent_device = self.adapter_agent.get_device(onu_device.parent_id)
# logical_device_id = parent_device.parent_id
# assert logical_device_id, 'Invalid logical device ID'
# # self.add_uni_port(onu_device, logical_device_id, venet['name'], parent_port_num)
#
# pon_ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
# if pon_ports:
# # TODO: Assumed only one PON port and UNI port per ONU.
# pon_port = pon_ports[0]
# else:
# self.log.error("No-Pon-port-configured-yet")
# return
#
# self.adapter_agent.delete_port_reference_from_parent(self.device_id, pon_port)
# pon_port.peers[0].device_id = onu_device.parent_id
# pon_port.peers[0].port_no = parent_port_num
# self.adapter_agent.add_port_reference_to_parent(self.device_id, pon_port)
#################################################################################
# Start of actual work (what actually does something)
# TODO: Clean this up. Use looked up UNI
if self._olt_created:
uni_port = self.uni_port('deprecated')
else:
# vlan non-zero if created via legacy method (not xPON). Also
# Set a random serial number since not xPON based
device = self.adapter_agent.get_device(self.device_id)
ofp_port_no, subscriber_vlan, untagged_vlan = UniPort.decode_venet(venet)
uni_port = UniPort.create(self, venet['name'],
self._next_port_number,
ofp_port_no,
subscriber_vlan,
untagged_vlan)
self._unis[uni_port.port_number] = uni_port
self.adapter_agent.add_port(device.id, uni_port.get_port())
# If the PON has already synchronized, add the logical port now
# since we know we have been activated
if self._pon is not None and self._pon.connected:
uni_port.add_logical_port(ofp_port_no, subscriber_vlan=subscriber_vlan)
# TODO: Next is just for debugging to see what this call returns after
# we add a UNI
# existing_uni_ports = self.adapter_agent.get_ports(onu_device.parent_id, Port.ETHERNET_UNI)
uni_port.enabled = venet['enabled']
return venet
def on_venet_modify(self, venet, update, diffs):
# Look up the associated UNI port
if self._olt_created:
uni_port = self.uni_port('deprecated')
else:
uni_port = self.uni_port(venet['name'])
if uni_port is not None:
valid_keys = ['enabled'] # Modify of these keys supported
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("venet leaf '{}' is read-only or write-once".format(invalid_key))
keys = [k for k in diffs.keys() if k in valid_keys]
for k in keys:
if k == 'enabled':
uni_port.enabled = update[k]
return update
def on_venet_delete(self, venet):
# Look up the associated UNI port
if self._olt_created:
uni_port = self.uni_port('deprecated')
else:
uni_port = self.uni_port(venet['name'])
if uni_port is not None:
port_no = uni_port.port_number
del self._unis[port_no]
uni_port.delete()
self._release_port_number(port_no)
return None
def on_tcont_create(self, tcont):
from onu_tcont import OnuTCont
self.log.info('create-tcont')
td = self.traffic_descriptors.get(tcont.get('td-ref'))
traffic_descriptor = td['object'] if td is not None else None
tcont['object'] = OnuTCont.create(self, tcont, traffic_descriptor,
is_mock=self.is_mock)
if self._pon is not None:
self._pon.add_tcont(tcont['object'])
return tcont
def on_tcont_modify(self, tcont, update, diffs):
valid_keys = ['td-ref'] # Modify of these keys supported
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("TCONT leaf '{}' is read-only or write-once".format(invalid_key))
tc = tcont.get('object')
assert tc is not None, 'TCONT not found'
update['object'] = tc
if self._pon is not None:
keys = [k for k in diffs.keys() if k in valid_keys]
for k in keys:
if k == 'td-ref':
td = self.traffic_descriptors.get(update['td-ref'])
if td is not None:
self._pon.update_tcont_td(tcont['alloc-id'], td)
return update
def on_tcont_delete(self, tcont):
if self._pon is not None:
self._pon.remove_tcont(tcont['alloc-id'])
return None
def on_td_create(self, traffic_disc):
from onu_traffic_descriptor import OnuTrafficDescriptor
traffic_disc['object'] = OnuTrafficDescriptor.create(traffic_disc)
return traffic_disc
def on_td_modify(self, traffic_disc, update, diffs):
from onu_traffic_descriptor import OnuTrafficDescriptor
valid_keys = ['fixed-bandwidth',
'assured-bandwidth',
'maximum-bandwidth',
'priority',
'weight',
'additional-bw-eligibility-indicator']
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("traffic-descriptor leaf '{}' is read-only or write-once".format(invalid_key))
# New traffic descriptor
update['object'] = OnuTrafficDescriptor.create(update)
td_name = traffic_disc['name']
tconts = {key: val for key, val in self.tconts.iteritems()
if val['td-ref'] == td_name and td_name is not None}
for tcont in tconts.itervalues():
if self._pon is not None:
self._pon.update_tcont_td(tcont['alloc-id'], update['object'])
return update
def on_td_delete(self, traffic_desc):
# TD may be used by more than one TCONT. Only delete if the last one
td_name = traffic_desc['name']
num_tconts = len([val for val in self.tconts.itervalues()
if val['td-ref'] == td_name and td_name is not None])
return None if num_tconts <= 1 else traffic_desc
def on_gemport_create(self, gem_port):
from onu_gem_port import OnuGemPort
assert self._pon is not None, 'No PON port'
gem_port['object'] = OnuGemPort.create(self, gem_port,
self._pon.next_gem_entity_id,
is_mock=self.is_mock)
self._pon.add_gem_port(gem_port['object'])
return gem_port
def on_gemport_modify(self, gem_port, update, diffs):
valid_keys = ['encryption',
'traffic-class'] # Modify of these keys supported
invalid_key = next((key for key in diffs.keys() if key not in valid_keys), None)
if invalid_key is not None:
raise KeyError("GEM Port leaf '{}' is read-only or write-once".format(invalid_key))
port = gem_port.get('object')
assert port is not None, 'GemPort not found'
keys = [k for k in diffs.keys() if k in valid_keys]
update['object'] = port
for k in keys:
if k == 'encryption':
port.encryption = update[k]
elif k == 'traffic-class':
pass # TODO: Implement
return update
def on_gemport_delete(self, gem_port):
if self._pon is not None:
self._pon.remove_gem_id(gem_port['gemport-id'])
return None
def on_mcast_gemport_create(self, mcast_gem_port):
return mcast_gem_port # Implement in your OLT, if needed
def on_mcast_gemport_modify(self, mcast_gem_port, update, diffs):
return mcast_gem_port # Implement in your OLT, if needed
def on_mcast_gemport_delete(self, mcast_gem_port):
return None # Implement in your OLT, if needed
def on_mcast_dist_set_create(self, dist_set):
return dist_set # Implement in your OLT, if needed
def on_mcast_dist_set_modify(self, dist_set, update, diffs):
return update # Implement in your OLT, if needed
def on_mcast_dist_set_delete(self, dist_set):
return None # Implement in your OLT, if needed
def rx_inter_adapter_message(self, msg):
if self.enabled and self._event_messages is not None:
self._event_messages.put(msg)
@inlineCallbacks
def _handle_onu_events(self):
#
# TODO: From broadcom ONU. This is from the 'receive_inter_adapter_message()'
# method.
#
event_msg = yield self._event_messages.get()
if self._event_deferred is None:
returnValue('cancelled')
if event_msg['event'] == 'activation-completed':
# if event_msg['event_data']['activation_successful']:
# for uni in self.uni_ports:
# port_no = self.proxy_address.channel_id + uni
# reactor.callLater(1,
# self.message_exchange,
# self.proxy_address.onu_id,
# self.proxy_address.onu_session_id,
# port_no)
#
# device = self.adapter_agent.get_device(self.device_id)
# device.oper_status = OperStatus.ACTIVE
# self.adapter_agent.update_device(device)
#
# else:
# device = self.adapter_agent.get_device(self.device_id)
# device.oper_status = OperStatus.FAILED
# self.adapter_agent.update_device(device)
pass
elif event_msg['event'] == 'deactivation-completed':
# device = self.adapter_agent.get_device(self.device_id)
# device.oper_status = OperStatus.DISCOVERED
# self.adapter_agent.update_device(device)
pass
elif event_msg['event'] == 'ranging-completed':
# if event_msg['event_data']['ranging_successful']:
# device = self.adapter_agent.get_device(self.device_id)
# device.oper_status = OperStatus.ACTIVATING
# self.adapter_agent.update_device(device)
#
# else:
# device = self.adapter_agent.get_device(self.device_id)
# device.oper_status = OperStatus.FAILED
# self.adapter_agent.update_device(device)
pass
# Handle next event (self._event_deferred is None if we got stopped)
self._event_deferred = reactor.callLater(0, self.handle_onu_events)