VOL-2623 adding update_flows_incrementally support to onu-adapter
With this task queuing of vlan_filter task is done per tp_id and
the signature of the constructor of vlan_filter task is changed to include default_vlan and vlan_pcp.
Change-Id: Ib83f0362cb5923f88d98f25c02241dd3d2019e0c
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
index 0ce8389..2eb1533 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
@@ -51,7 +51,7 @@
supported_device_types = [
DeviceType(
id=name,
- vendor_ids=['OPEN', 'ALCL', 'BRCM', 'TWSH', 'ALPH', 'ISKT', 'SFAA', 'BBSM', 'SCOM'],
+ vendor_ids=['OPEN', 'ALCL', 'BRCM', 'TWSH', 'ALPH', 'ISKT', 'SFAA', 'BBSM', 'SCOM', 'ARPX', 'DACM', 'ERSN', 'HWTC', 'CIGG'],
adapter=name,
accepts_bulk_flow_update=True
)
@@ -215,7 +215,20 @@
return handler.update_flow_table(device, flows.items)
def update_flows_incrementally(self, device, flow_changes, group_changes):
- raise NotImplementedError()
+ self.log.info('incremental-flow-update', device_id=device.id,
+ flows=flow_changes, groups=group_changes)
+ # For now, there is no support for group changes
+ assert len(group_changes.to_add.items) == 0
+ assert len(group_changes.to_remove.items) == 0
+
+ handler = self.devices_handlers[device.id]
+ # Remove flows
+ if len(flow_changes.to_remove.items) != 0:
+ handler.remove_onu_flows(device, flow_changes.to_remove.items)
+
+ # Add flows
+ if len(flow_changes.to_add.items) != 0:
+ handler.add_onu_flows(device, flow_changes.to_add.items)
def send_proxied_message(self, proxy_address, msg):
self.log.debug('send-proxied-message', proxy_address=proxy_address, msg=msg)
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index deb498d..0d6ec4d 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -413,19 +413,22 @@
return new_gem_ports
- def _execute_queued_vlan_filter_tasks(self, uni_id):
+ def _execute_queued_vlan_filter_tasks(self, uni_id, tp_id):
# During OLT Reboots, ONU Reboots, ONU Disable/Enable, it is seen that vlan_filter
# task is scheduled even before tp task. So we queue vlan-filter task if tp_task
# or initial-mib-download is not done. Once the tp_task is completed, we execute
# such queued vlan-filter tasks
try:
- if uni_id in self._queued_vlan_filter_task:
+ if uni_id in self._queued_vlan_filter_task and tp_id in self._queued_vlan_filter_task[uni_id]:
self.log.info("executing-queued-vlan-filter-task",
- uni_id=uni_id)
- filter_info = self._queued_vlan_filter_task[uni_id]
+ uni_id=uni_id, tp_id=tp_id)
+ filter_info = self._queued_vlan_filter_task[uni_id][tp_id]
reactor.callLater(0, self._add_vlan_filter_task, filter_info.get("device"),
- uni_id, filter_info.get("uni_port"), filter_info.get("set_vlan_vid"),
- filter_info.get("tp_id"))
+ uni_id=uni_id, uni_port=filter_info.get("uni_port"),
+ match_vlan = filter_info.get("match_vlan"),
+ _set_vlan_vid= filter_info.get("set_vlan_vid"),
+ _set_vlan_pcp = filter_info.get("set_vlan_pcp"),
+ tp_id = filter_info.get("tp_id"))
# Now remove the entry from the dictionary
self._queued_vlan_filter_task[uni_id].clear()
self.log.debug("executed-queued-vlan-filter-task",
@@ -444,7 +447,7 @@
def load_and_configure_tech_profile(self, uni_id, tp_path):
self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
-
+ tp_id = int ( tp_path.split ( "/" )[1] )
if uni_id not in self._tp_service_specific_task:
self._tp_service_specific_task[uni_id] = dict()
@@ -475,7 +478,7 @@
del self._tp_service_specific_task[uni_id][tp_path]
self._tech_profile_download_done[uni_id][tp_path] = True
# Now execute any vlan filter tasks that were queued for later
- self._execute_queued_vlan_filter_tasks(uni_id)
+ reactor.callInThread(self._execute_queued_vlan_filter_tasks, uni_id, tp_id)
yield self.core_proxy.device_reason_update(self.device_id, 'tech-profile-config-download-success')
@inlineCallbacks
@@ -659,6 +662,234 @@
self.log.info('update_pm_config', pm_config=pm_config)
self._pm_metrics.update(pm_config)
+ def remove_onu_flows(self, device, flows):
+ self.log.debug('remove_onu_flows', device_id=device.id)
+
+
+ # no point in removing omci flows if the device isnt reachable
+ if device.connect_status != ConnectStatus.REACHABLE or \
+ device.admin_state != AdminState.ENABLED:
+ self.log.warn("device-disabled-or-offline-skipping-remove-flow",
+ admin=device.admin_state, connect=device.connect_status)
+ return
+
+ for flow in flows:
+ # if incoming flow contains cookie, then remove from ONU
+ if flow.cookie:
+ self.log.debug("remove-flow", device_id=device.id, flow=flow)
+
+ def is_downstream(port):
+ return port == self._pon_port_number
+
+ def is_upstream(port):
+ return not is_downstream(port)
+
+ try:
+ _in_port = fd.get_in_port(flow)
+ assert _in_port is not None
+
+ _out_port = fd.get_out_port(flow) # may be None
+ _vlan_vid = fd.get_default_vlan(flow)
+
+ if is_downstream(_in_port):
+ self.log.debug('downstream-flow-no-need-to-remove', in_port=_in_port, out_port=_out_port,
+ device_id=device.id)
+ # extended vlan tagging operation will handle it
+ continue
+ elif is_upstream(_in_port):
+ self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
+ if fd.is_dhcp_flow(flow):
+ self.log.debug('The dhcp trap-to-host flow will be discarded', device_id=device.id)
+ return
+
+ uni_port = self.uni_port(_in_port)
+ uni_id = _in_port & 0xF
+ else:
+ raise Exception('port should be 1 or 2 by our convention')
+
+ self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
+
+ tp_id = self.get_tp_id_in_flow(flow)
+ # Deleting flow from ONU.
+ self._remove_vlan_filter_task(device, uni_id, uni_port=uni_port, _set_vlan_vid=_vlan_vid,
+ match_vlan=_vlan_vid, tp_id=tp_id)
+ #TODO:Delete TD task.
+ except Exception as e:
+ self.log.exception('failed-to-remove-flow', e=e)
+
+ def add_onu_flows(self, device, flows):
+ self.log.debug('function-entry', flows=flows)
+
+ #
+ # We need to proxy through the OLT to get to the ONU
+ # Configuration from here should be using OMCI
+ #
+ # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+
+ # no point in pushing omci flows if the device isnt reachable
+ if device.connect_status != ConnectStatus.REACHABLE or \
+ device.admin_state != AdminState.ENABLED:
+ self.log.warn("device-disabled-or-offline-skipping-flow-update",
+ admin=device.admin_state, connect=device.connect_status)
+ return
+ def is_downstream(port):
+ return port == self._pon_port_number
+
+ def is_upstream(port):
+ return not is_downstream(port)
+
+ for flow in flows:
+ # if incoming flow contains cookie, then add to ONU
+ if flow.cookie:
+ _type = None
+ _port = None
+ _vlan_vid = None
+ _udp_dst = None
+ _udp_src = None
+ _ipv4_dst = None
+ _ipv4_src = None
+ _metadata = None
+ _output = None
+ _push_tpid = None
+ _field = None
+ _set_vlan_vid = None
+ _set_vlan_pcp = 0
+ _tunnel_id = None
+ self.log.debug("add-flow", device_id=device.id, flow=flow)
+
+ try:
+ _in_port = fd.get_in_port(flow)
+ assert _in_port is not None
+
+ _out_port = fd.get_out_port(flow) # may be None
+ tp_id = self.get_tp_id_in_flow(flow)
+ if is_downstream(_in_port):
+ self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
+ # NOTE: We don't care downstream flow because we will copy vlan_id to upstream flow
+ # uni_port = self.uni_port(_out_port)
+ # uni_id = _out_port & 0xF
+ continue
+ elif is_upstream(_in_port):
+ self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
+ uni_port = self.uni_port(_in_port)
+ uni_id = _in_port & 0xF
+ else:
+ raise Exception('port should be 1 or 2 by our convention')
+
+ self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
+
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.ETH_TYPE:
+ _type = field.eth_type
+ self.log.debug('field-type-eth-type',
+ eth_type=_type)
+
+ elif field.type == fd.IP_PROTO:
+ _proto = field.ip_proto
+ self.log.debug('field-type-ip-proto',
+ ip_proto=_proto)
+
+ elif field.type == fd.IN_PORT:
+ _port = field.port
+ self.log.debug('field-type-in-port',
+ in_port=_port)
+ elif field.type == fd.TUNNEL_ID:
+ self.log.debug('field-type-tunnel-id')
+
+ elif field.type == fd.VLAN_VID:
+ _vlan_vid = field.vlan_vid & 0xfff
+ self.log.debug('field-type-vlan-vid',
+ vlan=_vlan_vid)
+
+ elif field.type == fd.VLAN_PCP:
+ _vlan_pcp = field.vlan_pcp
+ self.log.debug('field-type-vlan-pcp',
+ pcp=_vlan_pcp)
+
+ elif field.type == fd.UDP_DST:
+ _udp_dst = field.udp_dst
+ self.log.debug('field-type-udp-dst',
+ udp_dst=_udp_dst)
+
+ elif field.type == fd.UDP_SRC:
+ _udp_src = field.udp_src
+ self.log.debug('field-type-udp-src',
+ udp_src=_udp_src)
+
+ elif field.type == fd.IPV4_DST:
+ _ipv4_dst = field.ipv4_dst
+ self.log.debug('field-type-ipv4-dst',
+ ipv4_dst=_ipv4_dst)
+
+ elif field.type == fd.IPV4_SRC:
+ _ipv4_src = field.ipv4_src
+ self.log.debug('field-type-ipv4-src',
+ ipv4_dst=_ipv4_src)
+
+ elif field.type == fd.METADATA:
+ _metadata = field.table_metadata
+ self.log.debug('field-type-metadata',
+ metadata=_metadata)
+
+ else:
+ raise NotImplementedError('field.type={}'.format(
+ field.type))
+
+ for action in fd.get_actions(flow):
+
+ if action.type == fd.OUTPUT:
+ _output = action.output.port
+ self.log.debug('action-type-output',
+ output=_output, in_port=_in_port)
+
+ elif action.type == fd.POP_VLAN:
+ self.log.debug('action-type-pop-vlan',
+ in_port=_in_port)
+
+ elif action.type == fd.PUSH_VLAN:
+ _push_tpid = action.push.ethertype
+ self.log.debug('action-type-push-vlan',
+ push_tpid=_push_tpid, in_port=_in_port)
+ if action.push.ethertype != 0x8100:
+ self.log.error('unhandled-tpid',
+ ethertype=action.push.ethertype)
+
+ elif action.type == fd.SET_FIELD:
+ _field = action.set_field.field.ofb_field
+ assert (action.set_field.field.oxm_class ==
+ OFPXMC_OPENFLOW_BASIC)
+ self.log.debug('action-type-set-field',
+ field=_field, in_port=_in_port)
+ if _field.type == fd.VLAN_VID:
+ _set_vlan_vid = _field.vlan_vid & 0xfff
+ self.log.debug('set-field-type-vlan-vid',
+ vlan_vid=_set_vlan_vid)
+ elif _field.type == fd.VLAN_PCP:
+ _set_vlan_pcp = _field.vlan_pcp
+ self.log.debug('set-field-type-vlan-pcp',
+ vlan_pcp=_set_vlan_pcp)
+ else:
+ self.log.error('unsupported-action-set-field-type',
+ field_type=_field.type)
+ else:
+ self.log.error('unsupported-action-type',
+ action_type=action.type, in_port=_in_port)
+
+ if type is not None and _vlan_vid is None:
+ self.log.warn('ignoring-flow-with-ethType', ethType=_type)
+ elif _set_vlan_vid is None or _set_vlan_vid == 0:
+ self.log.warn('ignorning-flow-that-does-not-set-vlanid')
+ else:
+ self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, set_vlan_vid=_set_vlan_vid, vlan_vid=_vlan_vid,tp_id=tp_id)
+ self._add_vlan_filter_task(device, uni_id=uni_id, uni_port=uni_port,
+ _set_vlan_vid=_set_vlan_vid,
+ _set_vlan_pcp=_set_vlan_pcp, match_vlan=_vlan_vid,
+ tp_id=tp_id)
+
+ except Exception as e:
+ self.log.exception('failed-to-install-flow', e=e, flow=flow)
+
+
# Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
# flow decomposition that ultimately comes from onos
def update_flow_table(self, device, flows):
@@ -696,6 +927,7 @@
_push_tpid = None
_field = None
_set_vlan_vid = None
+ _set_vlan_pcp = None
_tunnel_id = None
try:
@@ -705,7 +937,7 @@
return
# extract tp id from flow
- tp_id = (write_metadata >> 32) & 0xFFFF
+ tp_id= self.get_tp_id_in_flow(flow)
self.log.debug("tp-id-in-flow", tp_id=tp_id)
_in_port = fd.get_in_port(flow)
@@ -815,6 +1047,10 @@
_set_vlan_vid = _field.vlan_vid & 0xfff
self.log.debug('set-field-type-vlan-vid',
vlan_vid=_set_vlan_vid)
+ elif _field.type == fd.VLAN_PCP:
+ _set_vlan_pcp = _field.vlan_pcp
+ self.log.debug('set-field-type-vlan-pcp',
+ vlan_pcp=_set_vlan_pcp)
else:
self.log.error('unsupported-action-set-field-type',
field_type=_field.type)
@@ -827,42 +1063,80 @@
if _set_vlan_vid is None or _set_vlan_vid == 0:
self.log.warn('ignoring-flow-that-does-not-set-vlanid')
else:
- self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
- self._add_vlan_filter_task(device, uni_id, uni_port, _set_vlan_vid, tp_id)
+ self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, match_vlan=_vlan_vid, set_vlan_vid=_set_vlan_vid, _set_vlan_pcp=_set_vlan_pcp, ethType=_type)
+ self._add_vlan_filter_task(device, uni_id,
+ uni_port=uni_port, match_vlan=_vlan_vid,
+ _set_vlan_vid=_set_vlan_vid, _set_vlan_pcp=_set_vlan_pcp, tp_id=tp_id)
except Exception as e:
self.log.exception('failed-to-install-flow', e=e, flow=flow)
- def _add_vlan_filter_task(self, device, uni_id, uni_port, _set_vlan_vid, tp_id):
+ def _add_vlan_filter_task(self, device, uni_id, uni_port=None, match_vlan=0,
+ _set_vlan_vid=None, _set_vlan_pcp=8, tp_id=0):
+ self.log.info('_adding_vlan_filter_task', uni_port=uni_port, uni_id=uni_id, tp_id=tp_id, match_vlan=match_vlan, vlan=_set_vlan_vid, vlan_pcp=_set_vlan_pcp)
assert uni_port is not None
if uni_id in self._tech_profile_download_done and self._tech_profile_download_done[uni_id] != {}:
@inlineCallbacks
def success(_results):
- self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id)
- self._vlan_filter_task = None
+ self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id, set_vlan_pcp=_set_vlan_pcp)
yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-pushed')
@inlineCallbacks
def failure(_reason):
self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id)
retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
- reactor.callLater(retry,
- self._add_vlan_filter_task, device, uni_port.port_number,
- uni_port, _set_vlan_vid, tp_id)
+ reactor.callLater(retry,
+ self._add_vlan_filter_task, device, uni_id, uni_port=uni_port,
+ match_vlan=match_vlan, _set_vlan_vid=_set_vlan_vid,
+ _set_vlan_pcp=_set_vlan_pcp, tp_id=tp_id)
yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-failed-retrying')
- self.log.info('setting-vlan-tag')
- self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid, tp_id)
- self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
+ self.log.info('setting-vlan-tag', uni_port=uni_port, uni_id=uni_id, tp_id=tp_id, match_vlan=match_vlan, vlan=_set_vlan_vid, vlan_pcp=_set_vlan_pcp)
+ vlan_filter_add_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid,
+ match_vlan, _set_vlan_pcp, add_tag=True,
+ tp_id=tp_id)
+ self._deferred = self._onu_omci_device.task_runner.queue_task(vlan_filter_add_task)
self._deferred.addCallbacks(success, failure)
else:
self.log.info('tp-service-specific-task-not-done-adding-request-to-local-cache',
- uni_id=uni_id)
- self._queued_vlan_filter_task[uni_id] = {"device": device,
- "uni_id": uni_id,
- "uni_port": uni_port,
- "set_vlan_vid": _set_vlan_vid,
- "tp_id": tp_id}
+ uni_id=uni_id, tp_id=tp_id)
+ if uni_id not in self._queued_vlan_filter_task:
+ self._queued_vlan_filter_task[uni_id] = dict()
+ self._queued_vlan_filter_task[uni_id][tp_id] = {"device": device,
+ "uni_id": uni_id,
+ "uni_port": uni_port,
+ "match_vlan": match_vlan,
+ "set_vlan_vid": _set_vlan_vid,
+ "set_vlan_pcp": _set_vlan_pcp,
+ "tp_id": tp_id}
+ def get_tp_id_in_flow(self, flow):
+ flow_metadata = fd.get_metadata_from_write_metadata ( flow )
+ tp_id = fd.get_tp_id_from_metadata ( flow_metadata )
+ return tp_id
+
+ def _remove_vlan_filter_task(self, device, uni_id, uni_port=None, match_vlan=0,
+ _set_vlan_vid=None, _set_vlan_pcp=8, tp_id=0):
+ assert uni_port is not None
+ @inlineCallbacks
+ def success(_results):
+ self.log.info('vlan-untagging-success', _results=_results)
+ yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-deleted')
+
+ @inlineCallbacks
+ def failure(_reason):
+ self.log.warn('vlan-untagging-failure', _reason=_reason)
+ yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-deletion-failed-retrying')
+ retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
+ reactor.callLater(retry,
+ self._remove_vlan_filter_task, device, uni_id,
+ add_tag=False, uni_port=uni_port)
+
+ self.log.info("remove_vlan_filter_task", tp_id=tp_id)
+ vlan_remove_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid,
+ match_vlan, _set_vlan_pcp, add_tag=False,
+ tp_id=tp_id)
+ self._deferred = self._onu_omci_device.task_runner.queue_task(vlan_remove_task)
+ self._deferred.addCallbacks(success, failure)
def process_inter_adapter_message(self, request):
self.log.debug('process-inter-adapter-message', type=request.header.type, from_topic=request.header.from_topic,
to_topic=request.header.to_topic, to_device_id=request.header.to_device_id)
@@ -951,7 +1225,7 @@
serial_number=onu_indication.serial_number)
if onu_indication.oper_state == 'down' or onu_indication.oper_state == "unreachable":
- self.log.debug('stopping-openomci-statemachine')
+ self.log.debug('stopping-openomci-statemachine', device_id=self.device_id)
reactor.callLater(0, self._onu_omci_device.stop)
# Let TP download happen again
diff --git a/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py b/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
index 6f704fd..23cd83d 100644
--- a/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
+++ b/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
@@ -41,7 +41,9 @@
task_priority = 200
name = "Broadcom VLAN Filter/Tagging Task"
- def __init__(self, omci_agent, handler, uni_port, set_vlan_id, tp_id, priority=task_priority):
+ def __init__(self, omci_agent, handler, uni_port, set_vlan_id,
+ match_vlan=0, set_vlan_pcp=8, add_tag=True,
+ priority=task_priority, tp_id=0):
"""
Class initialization
diff --git a/python/requirements.txt b/python/requirements.txt
index daf3dd8..85c6640 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
voltha-protos==3.2.3
-pyvoltha==2.3.14
+pyvoltha==2.3.15