Refactor flow to simplify logic and support enable/disable of adapter
Change-Id: Ife96e5f50ddbb1f49f7d00e95e471fdc65cfc7df
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index e8a64bd..04adb42 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -51,12 +51,12 @@
_MANAGEMENT_VLAN = 4093
_is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(_PACKET_IN_VLAN))
-_DEFAULT_RESTCONF_USERNAME = ""
-_DEFAULT_RESTCONF_PASSWORD = ""
+_DEFAULT_RESTCONF_USERNAME = "ADMIN"
+_DEFAULT_RESTCONF_PASSWORD = "PASSWORD"
_DEFAULT_RESTCONF_PORT = 8081
-_DEFAULT_NETCONF_USERNAME = ""
-_DEFAULT_NETCONF_PASSWORD = ""
+_DEFAULT_NETCONF_USERNAME = "hsvroot"
+_DEFAULT_NETCONF_PASSWORD = "BOSCO"
_DEFAULT_NETCONF_PORT = 830
@@ -163,7 +163,7 @@
self.is_virtual_olt = False
# Installed flows
- self.evcs = {} # Flow ID/name -> FlowEntry
+ self._evcs = {} # Flow ID/name -> FlowEntry
def __del__(self):
# Kill any startup or heartbeat defers
@@ -200,6 +200,18 @@
def rest_client(self):
return self._rest_client
+ @property
+ def evcs(self):
+ return list(self._evcs.values())
+
+ def add_evc(self, evc):
+ if self._evcs is not None:
+ self._evcs[evc.name] = evc
+
+ def remove_evc(self, evc):
+ if self._evcs is not None and evc.name in self._evcs:
+ del self._evcs[evc.name]
+
def parse_provisioning_options(self, device):
from net.adtran_zmq import DEFAULT_ZEROMQ_OMCI_TCP_PORT
@@ -735,13 +747,17 @@
# TODO: What else (delete logical device, ???)
- @inlineCallbacks
def disable(self):
"""
This is called when a previously enabled device needs to be disabled based on a NBI call.
"""
self.log.info('disabling', device_id=self.device_id)
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -775,12 +791,6 @@
# Remove the peer references from this device
self.adapter_agent.delete_all_peer_references(self.device_id)
- # Disable all flows TODO: Do we want to delete them?
- # TODO: Create a bulk disable-all by device-id
-
- for evc in self.evcs.itervalues():
- evc.disable()
-
# Set all ports to disabled
self.adapter_agent.disable_all_ports(self.device_id)
@@ -791,29 +801,30 @@
for port in self.southbound_ports.itervalues():
dl.append(port.stop())
+ # NOTE: Flows removed before this method is called
+ # Wait for completion
+
self.startup = defer.gatherResults(dl)
- results = yield self.startup
- # Shutdown communications with OLT
-
- if self.netconf_client is not None:
- try:
- yield self.netconf_client.close()
- except Exception as e:
- self.log.exception('NETCONF-shutdown', e=e)
+ def _drop_netconf():
+ return self.netconf_client.close() if \
+ self.netconf_client is not None else defer.succeed('NOP')
def _null_clients():
self._netconf_client = None
self._rest_client = None
- reactor.callLater(0, _null_clients)
+ # Shutdown communications with OLT
- # Update the logice device mapping
+ self.startup.addCallbacks(_drop_netconf, _null_clients)
+ self.startup.addCallbacks(_null_clients, _null_clients)
+
+ # Update the logical device mapping
if ldi in self.adapter.logical_device_id_to_root_device_id:
del self.adapter.logical_device_id_to_root_device_id[ldi]
self.log.info('disabled', device_id=device.id)
- returnValue(results)
+ return self.startup
@inlineCallbacks
def reenable(self):
@@ -822,6 +833,11 @@
"""
self.log.info('re-enabling', device_id=self.device_id)
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -872,16 +888,18 @@
for port in self.southbound_ports.itervalues():
dl.append(port.start())
+ # Flows should not exist on re-enable. They are re-pushed
+ if len(self._evcs):
+ self.log.error('evcs-found', evcs=self._evcs)
+ self._evcs.clear()
+
+ # Wait for completion
+
self.startup = defer.gatherResults(dl)
results = yield self.startup
# TODO:
# 1) Restart health check / pings
- # Enable all flows
- # TODO: Create a bulk enable-all by device-id
-
- for evc in self.evcs:
- evc.enable()
# Activate in-band packets
self._activate_io_port()
@@ -897,6 +915,11 @@
"""
self.log.debug('reboot')
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
# Update the operational status to ACTIVATING and connect status to
# UNREACHABLE
@@ -1011,6 +1034,32 @@
# Update the child devices connect state to REACHABLE
self.adapter_agent.update_child_devices_state(self.device_id,
connect_status=ConnectStatus.REACHABLE)
+ # Restart ports to previous state
+
+ dl = []
+
+ for port in self.northbound_ports.itervalues():
+ dl.append(port.restart())
+
+ for port in self.southbound_ports.itervalues():
+ dl.append(port.restart())
+
+ try:
+ yield defer.gatherResults(dl)
+ except Exception as e:
+ self.log.exception('port-restart', e=e)
+
+ # Request reflow of any EVC/EVC-MAPs
+
+ if len(self._evcs) > 0:
+ dl = []
+ for evc in self.evcs:
+ dl.append(evc.reflow())
+
+ try:
+ yield defer.gatherResults(dl)
+ except Exception as e:
+ self.log.exception('flow-restart', e=e)
self.log.info('rebooted', device_id=self.device_id)
returnValue('Rebooted')
@@ -1036,10 +1085,11 @@
# Remove all flows from the device
# TODO: Create a bulk remove-all by device-id
- for evc in self.evcs.itervalues():
- evc.remove()
+ evcs = self._evcs()
+ self._evcs.clear()
- self.evcs.clear()
+ for evc in evcs:
+ evc.delete() # TODO: implement bulk-flow procedures
# Remove all child devices
self.adapter_agent.delete_all_child_devices(self.device_id)
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index 36dfaa5..a7daa52 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -171,7 +171,7 @@
:param device: A Voltha.Device object.
:return: (Deferred) Shall be fired to acknowledge disabling the device.
"""
- log.debug('disable_device', device=device)
+ log.info('disable-device', device=device)
reactor.callLater(0, self.devices_handlers[device.id].disable)
return device
@@ -183,7 +183,7 @@
:param device: A Voltha.Device object.
:return: (Deferred) Shall be fired to acknowledge re-enabling the device.
"""
- log.debug('reenable_device', device=device)
+ log.info('reenable-device', device=device)
reactor.callLater(0, self.devices_handlers[device.id].reenable)
return device
@@ -241,7 +241,7 @@
:param device: A Voltha.Device object.
:return: (Deferred) Shall be fired to acknowledge the deletion.
"""
- log.info('delete_device', device=device)
+ log.info('delete-device', device=device)
reactor.callLater(0, self.devices_handlers[device.id].delete)
return device
@@ -268,7 +268,7 @@
:return: (Deferred or None)
"""
log.info('bulk-flow-update', device_id=device.id, flows=flows,
- groups=groups)
+ groups=groups, num_flows=len(flows.items))
assert len(groups.items) == 0, "Cannot yet deal with groups"
handler = self.devices_handlers[device.id]
@@ -310,7 +310,7 @@
:return: (Deferred(None) or None) The return of this method should
indicate that the message was successfully *sent*.
"""
- log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ log.debug('send-proxied-message', proxy_address=proxy_address, msg=msg)
handler = self.devices_handlers[proxy_address.device_id]
handler.send_proxied_message(proxy_address, msg)
@@ -339,8 +339,8 @@
:param msg: actual message
:return: None
"""
- log.info('packet-out', logical_device_id=logical_device_id,
- egress_port_no=egress_port_no, msg_len=len(msg))
+ log.debug('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
def ldi_to_di(ldi):
di = self.logical_device_id_to_root_device_id.get(ldi)
@@ -391,7 +391,7 @@
API to create various interfaces (only some PON interfaces as of now)
in the devices
"""
- log.info('create_interface', data=data)
+ log.info('create-interface', data=data)
handler = self.devices_handlers[device.id]
handler.create_interface(device, data)
@@ -400,6 +400,7 @@
API to update various interfaces (only some PON interfaces as of now)
in the devices
"""
+ log.info('update-interface', data=data)
raise NotImplementedError()
def remove_interface(self, device, data):
@@ -407,6 +408,7 @@
API to delete various interfaces (only some PON interfaces as of now)
in the devices
"""
+ log.info('remove-interface', data=data)
raise NotImplementedError()
def receive_onu_detect_state(self, device_id, state):
@@ -419,37 +421,52 @@
raise NotImplementedError()
def create_tcont(self, device, tcont_data, traffic_descriptor_data):
+ log.info('create-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
raise NotImplementedError()
def update_tcont(self, device, tcont_data, traffic_descriptor_data):
+ log.info('update-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
raise NotImplementedError()
def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
+ log.info('remove-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
raise NotImplementedError()
def create_gemport(self, device, data):
+ log.info('create-gemport', data=data)
raise NotImplementedError()
def update_gemport(self, device, data):
+ log.info('update-gemport', data=data)
raise NotImplementedError()
def remove_gemport(self, device, data):
+ log.info('remove-gemport', data=data)
raise NotImplementedError()
def create_multicast_gemport(self, device, data):
+ log.info('create-mcast-gemport', data=data)
raise NotImplementedError()
def update_multicast_gemport(self, device, data):
+ log.info('update-mcast-gemport', data=data)
raise NotImplementedError()
def remove_multicast_gemport(self, device, data):
+ log.info('remove-mcast-gemport', data=data)
raise NotImplementedError()
def create_multicast_distribution_set(self, device, data):
+ log.info('create-mcast-distribution-set', data=data)
raise NotImplementedError()
def update_multicast_distribution_set(self, device, data):
+ log.info('update-mcast-distribution-set', data=data)
raise NotImplementedError()
def remove_multicast_distribution_set(self, device, data):
+ log.info('remove-mcast-distribution-set', data=data)
raise NotImplementedError()
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index be83dc4..0cd919b 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -276,11 +276,17 @@
def disable(self):
c, self.zmq_client = self.zmq_client, None
if c is not None:
- c.shutdown()
+ try:
+ c.shutdown()
+ except:
+ pass
d, self.status_poll = self.status_poll, None
if d is not None:
- d.cancel()
+ try:
+ d.cancel()
+ except:
+ pass
super(AdtranOltHandler, self).disable()
@@ -348,10 +354,10 @@
self.rest_client is not None:
uri = AdtranOltHandler.GPON_OLT_HW_STATE_URI
name = 'pon-status-poll'
- self.startup = self.rest_client.request('GET', uri, name=name)
- self.startup.addBoth(self.status_poll_complete)
+ self.status_poll = self.rest_client.request('GET', uri, name=name)
+ self.status_poll.addBoth(self.status_poll_complete)
else:
- self.startup = reactor.callLater(0, self.status_poll_complete, 'inactive')
+ self.status_poll = reactor.callLater(0, self.status_poll_complete, 'inactive')
def status_poll_complete(self, results):
"""
@@ -410,8 +416,6 @@
valid_flows = []
for flow in flows:
- # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
-
try:
# Try to create an EVC.
#
@@ -431,12 +435,7 @@
if evc is not None:
try:
evc.schedule_install()
-
- if evc.name not in self.evcs:
- self.evcs[evc.name] = evc
- else:
- # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
- pass
+ self.add_evc(evc)
except Exception as e:
evc.status = 'EVC Install Exception: {}'.format(e.message)
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
index 4fa404c..acea0ee 100644
--- a/voltha/adapters/adtran_olt/flow/evc.py
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -15,7 +15,7 @@
import xmltodict
import re
from enum import Enum
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from voltha.core.flow_decomposer import *
@@ -182,7 +182,7 @@
Get all EVC Maps that reference this EVC
:return: list of EVCMap
"""
- return self._evc_maps.values()
+ return list(self._evc_maps.values())
def add_evc_map(self, evc_map):
if self._evc_maps is not None:
@@ -192,10 +192,20 @@
if self._evc_maps is not None and evc_map.name in self._evc_maps:
del self._evc_maps[evc_map.name]
+ def cancel_defers(self):
+ d, self._install_deferred = self._install_deferred, None
+ if d is not None:
+ try:
+ d.cancel()
+ except:
+ pass
+
def schedule_install(self):
"""
Try to install EVC and all MAPs in a single operational sequence
"""
+ self.cancel_defers()
+
if self._valid and self._install_deferred is None:
self._install_deferred = reactor.callLater(0, self._do_install)
@@ -203,8 +213,8 @@
@staticmethod
def _xml_header(operation=None):
- return '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"><evc{}>'.\
- format('' if operation is None else ' operation="{}"'.format(operation))
+ return '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"{}><evc>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
@staticmethod
def _xml_trailer():
@@ -247,11 +257,7 @@
self._installed = True
results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
self._installed = results.ok
-
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
+ self.status = '' if results.ok else results.error
except Exception as e:
log.exception('Failed to install EVC', name=self.name, e=e)
@@ -271,96 +277,76 @@
returnValue(self._installed and self._valid)
- @inlineCallbacks
- def remove(self):
- d, self._install_deferred = self._install_deferred, None
- if d is not None:
- d.cancel()
-
- if self._installed:
- xml = EVC._xml_header('delete') + '<name>{}</name>'.format(self.name) + EVC._xml_trailer()
-
- log.debug('removing', evc=self.name, xml=xml)
-
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._installed = not results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('removing', name=self.name, e=e)
- raise
-
- # TODO: Do we remove evc-maps as well reference here or maybe have a 'delete' function?
- pass
-
- returnValue(not self._installed)
-
- @inlineCallbacks
- def enable(self):
- if self.installed and not self._enabled:
- xml = EVC._xml_header() + '<name>{}</name>'.format(self.name)
- xml += '<enabled>true</enabled>' + EVC._xml_trailer()
-
- log.debug('enabling', evc=self.name, xml=xml)
-
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._enabled = results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('enabling', name=self.name, e=e)
- raise
-
- returnValue(self.installed and self._enabled)
-
- @inlineCallbacks
- def disable(self):
- if self.installed and self._enabled:
- xml = EVC._xml_header() + '<name>{}</name>'.format(self.name)
- xml += '<enabled>false</enabled>' + EVC._xml_trailer()
-
- log.debug('disabling', evc=self.name, xml=xml)
-
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._enabled = not results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('disabling', name=self.name, e=e)
- raise
-
- returnValue(self.installed and not self._enabled)
-
- @inlineCallbacks
- def delete(self):
+ def remove(self, remove_maps=True):
"""
- Remove from hardware and delete/clean-up
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
"""
+ self.cancel_defers()
+
+ if not self.installed:
+ return succeed('Not installed')
+
+ log.info('removing', evc=self, remove_maps=remove_maps)
+ dl = []
+
+ def _success(rpc_reply):
+ log.debug('remove-success', rpc_reply=rpc_reply)
+ self._installed = False
+
+ def _failure(results):
+ log.error('remove-failed', results=results)
+
+ xml = EVC._xml_header('delete') + '<name>{}</name>'.format(self.name) + EVC._xml_trailer()
+ d = self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
+ d.addCallbacks(_success, _failure)
+ dl.append(d)
+
+ if remove_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.remove())
+
+ return defer.gatherResults(dl)
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
try:
- self._valid = False
- succeeded = yield self.remove()
- # TODO: On timeout or other NETCONF error, should we schedule cleanup later?
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete()) # TODO: implement bulk-flow procedures
- except Exception:
- succeeded = False
+ yield defer.gatherResults(dl)
- finally:
- self._flow = None
- self._evc_maps = None
+ except Exception as e:
+ log.exception('removal', e=e)
- returnValue(succeeded)
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ returnValue(succeed('Done'))
+
+ def reflow(self, reflow_maps=True):
+ """
+ Attempt to install/re-install a flow
+ :param reflow_maps: (boolean) Flag indication if EVC-MAPs should be reflowed as well
+ :return: (deferred)
+ """
+ self.cancel_defers()
+ self._installed = False
+ if reflow_maps:
+ for evc_map in self.evc_maps:
+ evc_map.installed = False
+
+ return self.schedule_install()
def _decode(self):
"""
@@ -404,7 +390,7 @@
</evcs>
</filter>
"""
- log.debug('query', xml=get_xml)
+ log.info('query', xml=get_xml, regex=regex_)
def request_failed(results, operation):
log.error('{}-failed'.format(operation), results=results)
@@ -428,18 +414,21 @@
and p.match(entry['name'])}
else:
names = set()
- for item in entries['evc-map'].items():
+ for item in entries['evc'].items():
if isinstance(item, tuple) and item[0] == 'name':
names.add(item[1])
break
if len(names) > 0:
- del_xml = EVC._xml_header('delete')
+ del_xml = '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"' + \
+ ' xc:operation = "delete">'
for name in names:
+ del_xml += '<evc>'
del_xml += '<name>{}</name>'.format(name)
- del_xml += EVC._xml_trailer()
-
+ del_xml += '</evc>'
+ del_xml += '</evcs>'
log.debug('removing', xml=del_xml)
+
return client.edit_config(del_xml, lock_timeout=30)
return succeed('no entries')
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 14a66b1..e3a2753 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -124,6 +124,11 @@
def installed(self):
return self._installed
+ @installed.setter
+ def installed(self, value):
+ assert not value # Can only reset
+ self._installed = False
+
@property
def name(self):
return self._name
@@ -143,8 +148,8 @@
@staticmethod
def _xml_header(operation=None):
- return '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"><evc-map{}>'.\
- format('' if operation is None else ' operation="{}"'.format(operation))
+ return '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"{}><evc-map>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
@staticmethod
def _xml_trailer():
@@ -152,232 +157,127 @@
@inlineCallbacks
def install(self):
- if self._gem_ids is not None:
- self.pon_install()
+ if self._valid and not self._installed:
+ def _common_xml():
+ xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
+ xml += '<uni>{}</uni>'.format(self._uni_port)
- elif self._valid and not self._installed:
- xml = EVCMap._xml_header()
- xml += '<name>{}</name>'.format(self.name)
- xml += '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
- xml += '<uni>{}</uni>'.format(self._uni_port)
+ if self._evc_name is not None:
+ xml += '<evc>{}</evc>'.format(self._evc_name)
+ else:
+ xml += EVCMap.EvcConnection.xml(self._evc_connection)
- if self._evc_name is not None:
- xml += '<evc>{}</evc>'.format(self._evc_name)
- else:
- xml += EVCMap.EvcConnection.xml(self._evc_connection)
+ # if self._match_untagged:
+ # xml += '<match-untagged>True</match-untagged>'
+ if self._c_tag is not None:
+ xml += '<ctag>{}</ctag>'.format(self._c_tag)
- if self._match_untagged:
- xml += '<match-untagged>True</match-untagged>'
- elif self._c_tag is not None:
- xml += '<ctag>{}</ctag>'.format(self._c_tag)
+ # TODO: The following is not yet supported
+ # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_pri = 0 # If Explicit Priority
+ #
+ # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_ctag_pri = 0 # If Explicit Priority
+ #
+ # self._match_ce_vlan_id = None
+ # self._match_untagged = True
+ # self._match_destination_mac_address = None
+ # self._eth_type = None
+ # self._ip_protocol = None
+ # self._ipv4_dst = None
+ # self._udp_dst = None
+ # self._udp_src = None
+ return xml
- xml += EVCMap._xml_trailer()
+ def _ingress_xml():
+ from ..onu import Onu
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
+ for onu_id, gem_ids in self._gem_ids.iteritems():
+ for gem_id in gem_ids:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
+ xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
+ xml += _common_xml()
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
- log.debug('creating', name=self.name, xml=xml)
+ def _egress_xml():
+ xml = EVCMap._xml_header()
+ xml += '<name>{}</name>'.format(self.name)
+ xml += _common_xml()
+ xml += EVCMap._xml_trailer()
+ return xml
- if self._needs_acl_support:
- self._installed = True # TODO: Support ACLs
- else:
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._installed = results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
+ try:
+ # TODO: create generator of XML once we have MANY to install at once
+ map_xml = _ingress_xml() if self._is_ingress_map else _egress_xml()
- except Exception as e:
- log.exception('install', name=self.name, e=e)
- raise
+ log.debug('install', xml=map_xml, name=self.name)
+ results = yield self._flow.handler.netconf_client.edit_config(map_xml,
+ lock_timeout=30)
+ self._installed = results.ok
+ self.status = '' if results.ok else results.error
- # TODO: The following is not yet supported
- # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_pri = 0 # If Explicit Priority
- #
- # self._c_tag = None
- # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_ctag_pri = 0 # If Explicit Priority
- #
- # self._match_ce_vlan_id = None
- # self._match_untagged = True
- # self._match_destination_mac_address = None
- # self._eth_type = None
- # self._ip_protocol = None
- # self._ipv4_dst = None
- # self._udp_dst = None
- # self._udp_src = None
+ except Exception as e:
+ log.exception('install', name=self.name, e=e)
+ raise
returnValue(self._installed and self._valid)
- @inlineCallbacks
- def pon_install(self):
- """
- Install a flow on all ONU's of a PON port
- """
- from ..onu import Onu
+ def remove(self):
+ if not self.installed:
+ return succeed('Not installed')
- if self._valid and not self._installed:
- # Install in per ONU batches
+ log.info('removing', evc_map=self)
- self._installed = True
+ def _ingress_xml():
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation = "delete">'
for onu_id, gem_ids in self._gem_ids.iteritems():
- xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
-
for gem_id in gem_ids:
xml += '<evc-map>'
xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
- xml += '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
- xml += '<uni>{}</uni>'.format(self._uni_port)
-
- if self._evc_name is not None:
- xml += '<evc>{}</evc>'.format(self._evc_name)
- else:
- xml += EVCMap.EvcConnection.xml(self._evc_connection)
-
- xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
-
- # if self._match_untagged:
- # xml += '<match-untagged>True</match-untagged>'
- if self._c_tag is not None:
- xml += '<ctag>{}</ctag>'.format(self._c_tag)
-
xml += '</evc-map>'
- xml += '</evc-maps>'
+ xml += '</evc-maps>'
- log.debug('creating', name=self.name, onu_id=onu_id, xml=xml)
+ return xml
- try:
- # Set installed to true while request is in progress
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._installed = results.ok # TODO: Need per-ONU results?
+ def _egress_xml():
+ return EVCMap._xml_header('delete') + \
+ '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
+ def _success(rpc_reply):
+ log.debug('remove-success', rpc_reply=rpc_reply)
+ self._installed = False
- except Exception as e:
- log.exception('install', name=self.name, onu_id=onu_id, e=e)
- self._installed = False
- raise
+ def _failure(results):
+ log.error('remove-failed', results=results)
- # TODO: The following is not yet supported
- # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_pri = 0 # If Explicit Priority
- #
- # self._c_tag = None
- # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_ctag_pri = 0 # If Explicit Priority
- #
- # self._match_untagged = True
- # self._match_destination_mac_address = None
- # self._eth_type = None
- # self._ip_protocol = None
- # self._ipv4_dst = None
- # self._udp_dst = None
- # self._udp_src = None
-
- returnValue(self._installed and self._valid)
-
- @inlineCallbacks
- def remove(self):
- if self._installed:
- xml = EVCMap._xml_header('remove') + '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
-
- log.debug('removing', name=self.name, xml=xml)
-
- if self._needs_acl_support:
- self._installed = False # TODO: Support ACLs
- else:
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml,
- default_operation='remove',
- lock_timeout=30)
- self._installed = not results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('removing', name=self.name, e=e)
- raise
-
- # TODO: Do we remove evc reference here or maybe have a 'delete' function?
-
- returnValue(self._installed)
-
- @inlineCallbacks
- def enable(self):
- if self.installed and not self._enabled:
- xml = EVCMap._xml_header() + '<name>{}</name>'.format(self.name)
- xml += '<enabled>true</enabled>' + EVCMap._xml_trailer()
-
- log.debug('enabling', name=self.name, xml=xml)
-
- if self._needs_acl_support:
- self._enabled = True # TODO: Support ACLs
- else:
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._enabled = results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('enabling', name=self.name, e=e)
- raise
-
- returnValue(self.installed and self._enabled)
-
- @inlineCallbacks
- def disable(self):
- if self.installed and self._enabled:
- xml = EVCMap._xml_header() + '<name>{}</name>'.format(self.name)
- xml += '<enabled>false</enabled>' + EVCMap._xml_trailer()
-
- log.debug('disabling', name=self.name, xml=xml)
-
- if self._needs_acl_support:
- self._enabled = False # TODO: Support ACLs
- else:
- try:
- results = yield self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
- self._enabled = not results.ok
- if results.ok:
- self.status = ''
- else:
- self.status = results.error # TODO: Save off error status
-
- except Exception as e:
- log.exception('disabling', name=self.name, e=e)
- raise
-
- returnValue(self.installed and not self._enabled)
+ # TODO: create generator of XML once we have MANY to install at once
+ map_xml = _ingress_xml() if self._is_ingress_map else _egress_xml()
+ d = self._flow.handler.netconf_client.edit_config(map_xml, lock_timeout=30)
+ d.addCallbacks(_success, _failure)
+ return d
@inlineCallbacks
def delete(self):
"""
- Remove from hardware and delete/clean-up
+ Remove from hardware and delete/clean-up EVC-MAP Object
"""
+ if self._evc is not None:
+ self._evc.remove_evc_map(self)
+
try:
- self._valid = False
- succeeded = yield self.remove()
- # TODO: On timeout or other NETCONF error, should we schedule cleanup later?
+ yield self.remove()
- except Exception:
- succeeded = False
+ except Exception as e:
+ log.exception('removal', e=e)
- finally:
- self._flow = None
- evc, self._evc = self._evc, None
- if evc is not None:
- evc.remove_evc_map(self)
-
- returnValue(succeeded)
+ self._flow = None
+ self._evc = None
+ returnValue('Done')
def _decode(self):
from evc import EVC
@@ -423,10 +323,10 @@
pon_port = flow.handler.get_southbound_port(flow.in_port)
if pon_port is not None:
- self._gem_ids = pon_port.gem_ids(self._needs_acl_support)
+ self._gem_ids = pon_port.gem_ids(self._flow.onu_vid, self._needs_acl_support)
# TODO: Only EAPOL ACL support for the first demo
if self._needs_acl_support and self._eth_type != FlowEntry.EtherType.EAPOL.value:
- self._gem_ids = set()
+ self._gem_ids = dict()
# if flow.vlan_id is None and flow.inner_vid is None:
# self._match_untagged = True
@@ -450,6 +350,8 @@
return True
+ # Bulk operations
+
@staticmethod
def remove_all(client, regex_=EVC_MAP_NAME_REGEX_ALL):
"""
@@ -469,7 +371,7 @@
</evc-maps>
</filter>
"""
- log.debug('query', xml=get_xml)
+ log.info('query', xml=get_xml, regex=regex_)
def request_failed(results, operation):
log.error('{}-failed'.format(operation), results=results)
@@ -499,12 +401,15 @@
break
if len(names) > 0:
- del_xml = EVCMap._xml_header('delete')
+ del_xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation = "delete">'
for name in names:
+ del_xml += '<evc-map>'
del_xml += '<name>{}</name>'.format(name)
- del_xml += EVCMap._xml_trailer()
-
+ del_xml += '</evc-map>'
+ del_xml += '</evc-maps>'
log.debug('removing', xml=del_xml)
+
return client.edit_config(del_xml, lock_timeout=30)
return succeed('no entries')
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index ac53659..0cf59ba 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -19,7 +19,7 @@
import voltha.core.flow_decomposer as fd
from voltha.core.flow_decomposer import *
from voltha.protos.openflow_13_pb2 import OFPP_CONTROLLER, OFPP_LOCAL, OFPP_ANY, OFPP_MAX
-from twisted.internet.defer import returnValue, inlineCallbacks
+from twisted.internet.defer import returnValue, inlineCallbacks, succeed, gatherResults
log = structlog.get_logger()
@@ -77,6 +77,7 @@
self.evc = None # EVC this flow is part of
self.evc_map = None # EVC-MAP this flow is part of
self._flow_direction = FlowEntry.FlowDirection.OTHER
+ self.onu_vid = None
self._name = self._create_flow_name()
# A value used to locate possible related flow entries
@@ -238,10 +239,10 @@
if status:
# Determine direction of the flow
- def port_type(port):
- if port in self._handler.northbound_ports:
+ def port_type(port_number):
+ if port_number in self._handler.northbound_ports:
return FlowEntry.FlowDirection.NNI
- elif port <= OFPP_MAX:
+ elif port_number <= OFPP_MAX:
return FlowEntry.FlowDirection.UNI
return FlowEntry.FlowDirection.OTHER
@@ -271,6 +272,7 @@
inner = self.inner_vid
else:
inner = self.vlan_id if (push_len > 0 and outer is not None) else None
+ self.onu_vid = inner if self._flow_direction == FlowEntry.FlowDirection.UPSTREAM else None
self.signature = '{}'.format(dev_id)
for port in ports:
@@ -375,16 +377,14 @@
@staticmethod
def drop_missing_flows(device_id, valid_flow_ids):
flow_table = _existing_flow_entries.get(device_id, None)
+ if flow_table is None:
+ return succeed('No table')
- if flow_table is not None:
- flows_to_drop = [flow for flow_id, flow in flow_table.items() if flow_id not in valid_flow_ids]
+ flows_to_drop = [flow for flow_id, flow in flow_table.items() if flow_id not in valid_flow_ids]
+ if len(flows_to_drop) == 0:
+ return succeed('No flows')
- for flow in flows_to_drop:
- try:
- yield flow.remove()
-
- except Exception as e:
- log.exception('stale-flow', flow=flow, e=e)
+ return gatherResults([flow.remove() for flow in flows_to_drop])
@inlineCallbacks
def remove(self):
@@ -397,41 +397,35 @@
flow_id = self._flow.id
flow_table = _existing_flow_entries.get(device_id, None)
- if flow_table is not None and flow_id in flow_table:
- del flow_table[flow_id]
- if len(flow_table) == 0:
- del _existing_flow_entries[device_id]
+ if flow_table is None or flow_id not in flow_table:
+ returnValue(succeed('NOP'))
- # Remove flow from the hardware
+ del flow_table[flow_id]
+ if len(flow_table) == 0:
+ del _existing_flow_entries[device_id]
- evc_map, self.evc_map = self.evc_map, None
- evc, self.evc = self.evc, None
+ # Remove flow from the hardware
+ try:
+ dl = []
+ if self.evc_map is not None:
+ dl.append(self.evc_map.delete())
- if evc_map is not None:
- yield evc_map.delete()
+ if self.evc is not None:
+ dl.append(self.evc.delete())
- if evc is not None:
- yield evc.delete()
+ yield gatherResults(dl)
- self._flow = None
- self._handler = None
+ except Exception as e:
+ log.exception('removal', e=e)
- returnValue('done')
+ self.evc_map = None
+ self.evc = None
+ returnValue(succeed('Done'))
######################################################
# Bulk operations
@staticmethod
- def enable_all():
- # TODO: May want to be device specific or regex based
- raise NotImplemented("TODO: Implement this")
-
- @staticmethod
- def disable_all():
- # TODO: May want to be device specific or regex based
- raise NotImplemented("TODO: Implement this")
-
- @staticmethod
def remove_all():
"""
Remove all matching EVCs and associated EVC MAPs from hardware
diff --git a/voltha/adapters/adtran_olt/net/adtran_netconf.py b/voltha/adapters/adtran_olt/net/adtran_netconf.py
index e31811f..1665a70 100644
--- a/voltha/adapters/adtran_olt/net/adtran_netconf.py
+++ b/voltha/adapters/adtran_olt/net/adtran_netconf.py
@@ -282,7 +282,8 @@
# raise
try:
if config[:7] != '<config':
- config = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ config = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0"' + \
+ ' xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
config + '</config>'
rpc_reply = yield threads.deferToThread(self._do_edit_config, target,
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index 0e20009..1867f6f 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -35,11 +35,10 @@
TODO: Merge this with the Port class or cleanup where possible
so we do not duplicate fields/properties/methods
"""
-
class State(Enum):
- INITIAL = 0 # Created and initialization in progress
- RUNNING = 1 # PON port contacted, ONU discovery active
- STOPPED = 2 # Disabled
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
DELETING = 3 # Cleanup
def __init__(self, parent, **kwargs):
@@ -167,6 +166,7 @@
return succeed('Running')
self.log.info('Starting NNI port')
+ self._cancel_deferred()
# TODO: Start up any watchdog/polling tasks here
@@ -182,7 +182,7 @@
if self._state != NniPort.State.INITIAL:
returnValue('Done')
- returnValue('TODO: Implement startup of each NNI port')
+ # returnValue('TODO: Implement startup of each NNI port')
if self._enabled:
self._admin_state = AdminState.ENABLED
@@ -207,8 +207,8 @@
return succeed('Stopped')
self.log.info('Stopping NNI port')
-
self._cancel_deferred()
+
# NOTE: Leave all NNI ports active (may have inband management)
# TODO: Revisit leaving NNI Ports active on disable
@@ -222,6 +222,13 @@
self._state = NniPort.State.STOPPED
return self._deferred
+ def restart(self):
+ if self._state == NniPort.State.RUNNING or self._state == NniPort.State.STOPPED:
+ start_it = (self._state == NniPort.State.RUNNING)
+ self._state = NniPort.State.INITIAL
+ return self.start() if start_it else self.stop()
+ return succeed('nop')
+
def delete(self):
"""
Parent device is being deleted. Do not change any config but
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index 6ba06c2..403460b 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -66,6 +66,7 @@
self._xpon_name = onu_info['xpon-name']
self._gem_ports = {} # gem-id -> GemPort
self._tconts = {} # alloc-id -> TCont
+ self._onu_vid = onu_info['onu-vid']
# TODO: enable and upstream-channel-speed not yet supported
@@ -110,6 +111,10 @@
return self._name
@property
+ def onu_vid(self):
+ return self._onu_vid
+
+ @property
def serial_number(self):
return self._serial_number_base64
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index f03ef78..48f0260 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -40,9 +40,9 @@
DEFAULT_ENABLED = False
class State(Enum):
- INITIAL = 0 # Created and initialization in progress
- RUNNING = 1 # PON port contacted, ONU discovery active
- STOPPED = 2 # Disabled
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
DELETING = 3 # Cleanup
_SUPPORTED_ACTIVATION_METHODS = ['autodiscovery', 'autoactivate']
@@ -387,6 +387,13 @@
returnValue('Reset complete')
+ def restart(self):
+ if self._state == PonPort.State.RUNNING or self._state == PonPort.State.STOPPED:
+ start_it = (self._state == PonPort.State.RUNNING)
+ self._state = PonPort.State.INITIAL
+ return self.start() if start_it else self.stop()
+ return succeed('nop')
+
def delete(self):
"""
Parent device is being deleted. Do not change any config but
@@ -397,15 +404,19 @@
self._cancel_deferred()
# @property
- def gem_ids(self, exception_gems):
+ def gem_ids(self, onu_vid, exception_gems):
"""
Get all GEM Port IDs used on a given PON
+ :param onu_vid: (int) ONU VLAN ID if customer ONU specific. None if for all ONUs
+ on PON
+ :param exception_gems: (boolean) Select from special purpose ACL GEM-Portas
:return: (dict) key -> onu-id, value -> frozenset of GEM Port IDs
"""
gem_ids = {}
for onu_id, onu in self._onu_by_id.iteritems():
- gem_ids[onu_id] = onu.gem_ids(exception_gems)
+ if onu_vid is None or onu_vid == onu.onu_vid:
+ gem_ids[onu_id] = onu.gem_ids(exception_gems)
return gem_ids
def get_pon_config(self):
@@ -572,6 +583,7 @@
'password': Onu.DEFAULT_PASSWORD,
't-conts': get_tconts(self.pon_id, serial_number, onu_id),
'gem-ports': get_gem_ports(self.pon_id, serial_number, onu_id),
+ 'onu-vid': self.olt.get_channel_id(self._pon_id, onu_id)
}
return onu_info
@@ -620,7 +632,7 @@
"""
olt = self.olt
adapter = self.adapter_agent
- channel_id = self.olt.get_channel_id(self._pon_id, onu.onu_id)
+ channel_id = onu.onu_vid
proxy = Device.ProxyAddress(device_id=olt.device_id, channel_id=channel_id)