VOL-2623 adding update_flows_incrementally support to onu-adapter
With this task queuing of vlan_filter task is done per tp_id and
the signature of the constructor of vlan_filter task is changed to include default_vlan and vlan_pcp.

Change-Id: Ib83f0362cb5923f88d98f25c02241dd3d2019e0c
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
index 0ce8389..2eb1533 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
@@ -51,7 +51,7 @@
     supported_device_types = [
         DeviceType(
             id=name,
-            vendor_ids=['OPEN', 'ALCL', 'BRCM', 'TWSH', 'ALPH', 'ISKT', 'SFAA', 'BBSM', 'SCOM'],
+            vendor_ids=['OPEN', 'ALCL', 'BRCM', 'TWSH', 'ALPH', 'ISKT', 'SFAA', 'BBSM', 'SCOM', 'ARPX', 'DACM', 'ERSN', 'HWTC', 'CIGG'],
             adapter=name,
             accepts_bulk_flow_update=True
         )
@@ -215,7 +215,20 @@
         return handler.update_flow_table(device, flows.items)
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
-        raise NotImplementedError()
+        self.log.info('incremental-flow-update', device_id=device.id,
+                 flows=flow_changes, groups=group_changes)
+        # For now, there is no support for group changes
+        assert len(group_changes.to_add.items) == 0
+        assert len(group_changes.to_remove.items) == 0
+
+        handler = self.devices_handlers[device.id]
+        # Remove flows
+        if len(flow_changes.to_remove.items) != 0:
+            handler.remove_onu_flows(device, flow_changes.to_remove.items)
+
+        # Add flows
+        if len(flow_changes.to_add.items) != 0:
+            handler.add_onu_flows(device, flow_changes.to_add.items)
 
     def send_proxied_message(self, proxy_address, msg):
         self.log.debug('send-proxied-message', proxy_address=proxy_address, msg=msg)
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index deb498d..0d6ec4d 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -413,19 +413,22 @@
 
         return new_gem_ports
 
-    def _execute_queued_vlan_filter_tasks(self, uni_id):
+    def _execute_queued_vlan_filter_tasks(self, uni_id, tp_id):
         # During OLT Reboots, ONU Reboots, ONU Disable/Enable, it is seen that vlan_filter
         # task is scheduled even before tp task. So we queue vlan-filter task if tp_task
         # or initial-mib-download is not done. Once the tp_task is completed, we execute
         # such queued vlan-filter tasks
         try:
-            if uni_id in self._queued_vlan_filter_task:
+            if uni_id in self._queued_vlan_filter_task and tp_id in self._queued_vlan_filter_task[uni_id]:
                 self.log.info("executing-queued-vlan-filter-task",
-                              uni_id=uni_id)
-                filter_info = self._queued_vlan_filter_task[uni_id]
+                              uni_id=uni_id, tp_id=tp_id)
+                filter_info = self._queued_vlan_filter_task[uni_id][tp_id]
                 reactor.callLater(0, self._add_vlan_filter_task, filter_info.get("device"),
-                                  uni_id, filter_info.get("uni_port"), filter_info.get("set_vlan_vid"),
-                                  filter_info.get("tp_id"))
+                                  uni_id=uni_id, uni_port=filter_info.get("uni_port"),
+                                  match_vlan = filter_info.get("match_vlan"),
+                                  _set_vlan_vid= filter_info.get("set_vlan_vid"),
+                                  _set_vlan_pcp = filter_info.get("set_vlan_pcp"),
+                                  tp_id = filter_info.get("tp_id"))
                 # Now remove the entry from the dictionary
                 self._queued_vlan_filter_task[uni_id].clear()
                 self.log.debug("executed-queued-vlan-filter-task",
@@ -444,7 +447,7 @@
 
     def load_and_configure_tech_profile(self, uni_id, tp_path):
         self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
-
+        tp_id = int ( tp_path.split ( "/" )[1] )
         if uni_id not in self._tp_service_specific_task:
             self._tp_service_specific_task[uni_id] = dict()
 
@@ -475,7 +478,7 @@
                         del self._tp_service_specific_task[uni_id][tp_path]
                     self._tech_profile_download_done[uni_id][tp_path] = True
                     # Now execute any vlan filter tasks that were queued for later
-                    self._execute_queued_vlan_filter_tasks(uni_id)
+                    reactor.callInThread(self._execute_queued_vlan_filter_tasks, uni_id, tp_id)
                     yield self.core_proxy.device_reason_update(self.device_id, 'tech-profile-config-download-success')
 
                 @inlineCallbacks
@@ -659,6 +662,234 @@
         self.log.info('update_pm_config', pm_config=pm_config)
         self._pm_metrics.update(pm_config)
 
+    def remove_onu_flows(self, device, flows):
+        self.log.debug('remove_onu_flows', device_id=device.id)
+
+
+        # no point in removing omci flows if the device isnt reachable
+        if device.connect_status != ConnectStatus.REACHABLE or \
+           device.admin_state != AdminState.ENABLED:
+            self.log.warn("device-disabled-or-offline-skipping-remove-flow",
+                          admin=device.admin_state, connect=device.connect_status)
+            return
+
+        for flow in flows:
+            # if incoming flow contains cookie, then remove from ONU
+            if flow.cookie:
+                self.log.debug("remove-flow", device_id=device.id, flow=flow)
+
+                def is_downstream(port):
+                    return port == self._pon_port_number
+
+                def is_upstream(port):
+                    return not is_downstream(port)
+
+                try:
+                    _in_port = fd.get_in_port(flow)
+                    assert _in_port is not None
+
+                    _out_port = fd.get_out_port(flow)  # may be None
+                    _vlan_vid = fd.get_default_vlan(flow)
+
+                    if is_downstream(_in_port):
+                        self.log.debug('downstream-flow-no-need-to-remove', in_port=_in_port, out_port=_out_port,
+                                       device_id=device.id)
+                        # extended vlan tagging operation will handle it
+                        continue
+                    elif is_upstream(_in_port):
+                        self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
+                        if fd.is_dhcp_flow(flow):
+                            self.log.debug('The dhcp trap-to-host flow will be discarded', device_id=device.id)
+                            return
+
+                        uni_port = self.uni_port(_in_port)
+                        uni_id = _in_port & 0xF
+                    else:
+                        raise Exception('port should be 1 or 2 by our convention')
+
+                    self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
+
+                    tp_id = self.get_tp_id_in_flow(flow)
+                    # Deleting flow from ONU.
+                    self._remove_vlan_filter_task(device, uni_id, uni_port=uni_port, _set_vlan_vid=_vlan_vid,
+                                              match_vlan=_vlan_vid, tp_id=tp_id)
+                    #TODO:Delete TD task.
+                except Exception as e:
+                    self.log.exception('failed-to-remove-flow', e=e)
+
+    def add_onu_flows(self, device, flows):
+        self.log.debug('function-entry', flows=flows)
+
+        #
+        # We need to proxy through the OLT to get to the ONU
+        # Configuration from here should be using OMCI
+        #
+        # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+
+        # no point in pushing omci flows if the device isnt reachable
+        if device.connect_status != ConnectStatus.REACHABLE or \
+            device.admin_state != AdminState.ENABLED:
+            self.log.warn("device-disabled-or-offline-skipping-flow-update",
+                          admin=device.admin_state, connect=device.connect_status)
+            return
+        def is_downstream(port):
+            return port == self._pon_port_number
+
+        def is_upstream(port):
+            return not is_downstream(port)
+
+        for flow in flows:
+            # if incoming flow contains cookie, then add to ONU
+            if flow.cookie:
+                _type = None
+                _port = None
+                _vlan_vid = None
+                _udp_dst = None
+                _udp_src = None
+                _ipv4_dst = None
+                _ipv4_src = None
+                _metadata = None
+                _output = None
+                _push_tpid = None
+                _field = None
+                _set_vlan_vid = None
+                _set_vlan_pcp = 0
+                _tunnel_id = None
+                self.log.debug("add-flow", device_id=device.id, flow=flow)
+
+                try:
+                    _in_port = fd.get_in_port(flow)
+                    assert _in_port is not None
+
+                    _out_port = fd.get_out_port(flow)  # may be None
+                    tp_id = self.get_tp_id_in_flow(flow)
+                    if is_downstream(_in_port):
+                        self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
+                        # NOTE: We don't care downstream flow because we will copy vlan_id to upstream flow
+                        # uni_port = self.uni_port(_out_port)
+                        # uni_id = _out_port  & 0xF
+                        continue
+                    elif is_upstream(_in_port):
+                        self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
+                        uni_port = self.uni_port(_in_port)
+                        uni_id = _in_port & 0xF
+                    else:
+                        raise Exception('port should be 1 or 2 by our convention')
+
+                    self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
+
+                    for field in fd.get_ofb_fields(flow):
+                        if field.type == fd.ETH_TYPE:
+                            _type = field.eth_type
+                            self.log.debug('field-type-eth-type',
+                                           eth_type=_type)
+
+                        elif field.type == fd.IP_PROTO:
+                            _proto = field.ip_proto
+                            self.log.debug('field-type-ip-proto',
+                                           ip_proto=_proto)
+
+                        elif field.type == fd.IN_PORT:
+                            _port = field.port
+                            self.log.debug('field-type-in-port',
+                                           in_port=_port)
+                        elif field.type == fd.TUNNEL_ID:
+                            self.log.debug('field-type-tunnel-id')
+
+                        elif field.type == fd.VLAN_VID:
+                            _vlan_vid = field.vlan_vid & 0xfff
+                            self.log.debug('field-type-vlan-vid',
+                                           vlan=_vlan_vid)
+
+                        elif field.type == fd.VLAN_PCP:
+                            _vlan_pcp = field.vlan_pcp
+                            self.log.debug('field-type-vlan-pcp',
+                                           pcp=_vlan_pcp)
+
+                        elif field.type == fd.UDP_DST:
+                            _udp_dst = field.udp_dst
+                            self.log.debug('field-type-udp-dst',
+                                           udp_dst=_udp_dst)
+
+                        elif field.type == fd.UDP_SRC:
+                            _udp_src = field.udp_src
+                            self.log.debug('field-type-udp-src',
+                                           udp_src=_udp_src)
+
+                        elif field.type == fd.IPV4_DST:
+                            _ipv4_dst = field.ipv4_dst
+                            self.log.debug('field-type-ipv4-dst',
+                                           ipv4_dst=_ipv4_dst)
+
+                        elif field.type == fd.IPV4_SRC:
+                            _ipv4_src = field.ipv4_src
+                            self.log.debug('field-type-ipv4-src',
+                                           ipv4_dst=_ipv4_src)
+
+                        elif field.type == fd.METADATA:
+                            _metadata = field.table_metadata
+                            self.log.debug('field-type-metadata',
+                                           metadata=_metadata)
+
+                        else:
+                            raise NotImplementedError('field.type={}'.format(
+                                field.type))
+
+                    for action in fd.get_actions(flow):
+
+                        if action.type == fd.OUTPUT:
+                            _output = action.output.port
+                            self.log.debug('action-type-output',
+                                           output=_output, in_port=_in_port)
+
+                        elif action.type == fd.POP_VLAN:
+                            self.log.debug('action-type-pop-vlan',
+                                           in_port=_in_port)
+
+                        elif action.type == fd.PUSH_VLAN:
+                            _push_tpid = action.push.ethertype
+                            self.log.debug('action-type-push-vlan',
+                                           push_tpid=_push_tpid, in_port=_in_port)
+                            if action.push.ethertype != 0x8100:
+                                self.log.error('unhandled-tpid',
+                                               ethertype=action.push.ethertype)
+
+                        elif action.type == fd.SET_FIELD:
+                            _field = action.set_field.field.ofb_field
+                            assert (action.set_field.field.oxm_class ==
+                                    OFPXMC_OPENFLOW_BASIC)
+                            self.log.debug('action-type-set-field',
+                                           field=_field, in_port=_in_port)
+                            if _field.type == fd.VLAN_VID:
+                                _set_vlan_vid = _field.vlan_vid & 0xfff
+                                self.log.debug('set-field-type-vlan-vid',
+                                               vlan_vid=_set_vlan_vid)
+                            elif _field.type == fd.VLAN_PCP:
+                                _set_vlan_pcp = _field.vlan_pcp
+                                self.log.debug('set-field-type-vlan-pcp',
+                                               vlan_pcp=_set_vlan_pcp)
+                            else:
+                                self.log.error('unsupported-action-set-field-type',
+                                               field_type=_field.type)
+                        else:
+                            self.log.error('unsupported-action-type',
+                                           action_type=action.type, in_port=_in_port)
+
+                    if type is not None and _vlan_vid is None:
+                        self.log.warn('ignoring-flow-with-ethType', ethType=_type)
+                    elif _set_vlan_vid is None or _set_vlan_vid == 0:
+                        self.log.warn('ignorning-flow-that-does-not-set-vlanid')
+                    else:
+                        self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, set_vlan_vid=_set_vlan_vid, vlan_vid=_vlan_vid,tp_id=tp_id)
+                        self._add_vlan_filter_task(device, uni_id=uni_id, uni_port=uni_port,
+                                                   _set_vlan_vid=_set_vlan_vid,
+                                                   _set_vlan_pcp=_set_vlan_pcp, match_vlan=_vlan_vid,
+                                                   tp_id=tp_id)
+
+                except Exception as e:
+                    self.log.exception('failed-to-install-flow', e=e, flow=flow)
+
+
     # Calling this assumes the onu is active/ready and had at least an initial mib downloaded.   This gets called from
     # flow decomposition that ultimately comes from onos
     def update_flow_table(self, device, flows):
@@ -696,6 +927,7 @@
             _push_tpid = None
             _field = None
             _set_vlan_vid = None
+            _set_vlan_pcp = None
             _tunnel_id = None
 
             try:
@@ -705,7 +937,7 @@
                     return
 
                 # extract tp id from flow
-                tp_id = (write_metadata >> 32) & 0xFFFF
+                tp_id= self.get_tp_id_in_flow(flow)
                 self.log.debug("tp-id-in-flow", tp_id=tp_id)
 
                 _in_port = fd.get_in_port(flow)
@@ -815,6 +1047,10 @@
                             _set_vlan_vid = _field.vlan_vid & 0xfff
                             self.log.debug('set-field-type-vlan-vid',
                                            vlan_vid=_set_vlan_vid)
+                        elif _field.type == fd.VLAN_PCP:
+                            _set_vlan_pcp = _field.vlan_pcp
+                            self.log.debug('set-field-type-vlan-pcp',
+                                           vlan_pcp=_set_vlan_pcp)
                         else:
                             self.log.error('unsupported-action-set-field-type',
                                            field_type=_field.type)
@@ -827,42 +1063,80 @@
                 if _set_vlan_vid is None or _set_vlan_vid == 0:
                     self.log.warn('ignoring-flow-that-does-not-set-vlanid')
                 else:
-                    self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
-                    self._add_vlan_filter_task(device, uni_id, uni_port, _set_vlan_vid, tp_id)
+                    self.log.info('set-vlanid', uni_id=uni_id, uni_port=uni_port, match_vlan=_vlan_vid, set_vlan_vid=_set_vlan_vid, _set_vlan_pcp=_set_vlan_pcp, ethType=_type)
+                    self._add_vlan_filter_task(device, uni_id,
+                                              uni_port=uni_port, match_vlan=_vlan_vid,
+                                              _set_vlan_vid=_set_vlan_vid, _set_vlan_pcp=_set_vlan_pcp, tp_id=tp_id)
             except Exception as e:
                 self.log.exception('failed-to-install-flow', e=e, flow=flow)
 
-    def _add_vlan_filter_task(self, device, uni_id, uni_port, _set_vlan_vid, tp_id):
+    def _add_vlan_filter_task(self, device, uni_id, uni_port=None, match_vlan=0,
+                             _set_vlan_vid=None, _set_vlan_pcp=8, tp_id=0):
+        self.log.info('_adding_vlan_filter_task', uni_port=uni_port, uni_id=uni_id, tp_id=tp_id, match_vlan=match_vlan, vlan=_set_vlan_vid, vlan_pcp=_set_vlan_pcp)
         assert uni_port is not None
         if uni_id in self._tech_profile_download_done and self._tech_profile_download_done[uni_id] != {}:
             @inlineCallbacks
             def success(_results):
-                self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id)
-                self._vlan_filter_task = None
+                self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id, set_vlan_pcp=_set_vlan_pcp)
                 yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-pushed')
 
             @inlineCallbacks
             def failure(_reason):
                 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid, tp_id=tp_id)
                 retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
-                reactor.callLater(retry,
-                                  self._add_vlan_filter_task, device, uni_port.port_number,
-                                  uni_port, _set_vlan_vid, tp_id)
+                reactor.callLater(retry, 
+                                    self._add_vlan_filter_task, device, uni_id, uni_port=uni_port, 
+                                    match_vlan=match_vlan, _set_vlan_vid=_set_vlan_vid, 
+                                    _set_vlan_pcp=_set_vlan_pcp, tp_id=tp_id)
                 yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-failed-retrying')
 
-            self.log.info('setting-vlan-tag')
-            self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid, tp_id)
-            self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
+            self.log.info('setting-vlan-tag', uni_port=uni_port, uni_id=uni_id, tp_id=tp_id, match_vlan=match_vlan, vlan=_set_vlan_vid, vlan_pcp=_set_vlan_pcp)
+            vlan_filter_add_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid,
+                                                    match_vlan, _set_vlan_pcp, add_tag=True,
+                                                    tp_id=tp_id)
+            self._deferred = self._onu_omci_device.task_runner.queue_task(vlan_filter_add_task)
             self._deferred.addCallbacks(success, failure)
         else:
             self.log.info('tp-service-specific-task-not-done-adding-request-to-local-cache',
-                          uni_id=uni_id)
-            self._queued_vlan_filter_task[uni_id] = {"device": device,
-                                                     "uni_id": uni_id,
-                                                     "uni_port": uni_port,
-                                                     "set_vlan_vid": _set_vlan_vid,
-                                                     "tp_id": tp_id}
+                          uni_id=uni_id, tp_id=tp_id)
+            if uni_id not in self._queued_vlan_filter_task:
+                self._queued_vlan_filter_task[uni_id] = dict()
+            self._queued_vlan_filter_task[uni_id][tp_id] = {"device": device,
+                                                            "uni_id": uni_id,
+                                                            "uni_port": uni_port,
+                                                            "match_vlan": match_vlan,
+                                                            "set_vlan_vid": _set_vlan_vid,
+                                                            "set_vlan_pcp": _set_vlan_pcp,
+                                                            "tp_id": tp_id}
 
+    def get_tp_id_in_flow(self, flow):
+        flow_metadata = fd.get_metadata_from_write_metadata ( flow )
+        tp_id = fd.get_tp_id_from_metadata ( flow_metadata )
+        return tp_id
+
+    def _remove_vlan_filter_task(self, device, uni_id, uni_port=None, match_vlan=0,
+                             _set_vlan_vid=None, _set_vlan_pcp=8, tp_id=0):
+        assert uni_port is not None
+        @inlineCallbacks
+        def success(_results):
+            self.log.info('vlan-untagging-success', _results=_results)
+            yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-deleted')
+
+        @inlineCallbacks
+        def failure(_reason):
+            self.log.warn('vlan-untagging-failure', _reason=_reason)
+            yield self.core_proxy.device_reason_update(self.device_id, 'omci-flows-deletion-failed-retrying')
+            retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
+            reactor.callLater(retry,
+                                            self._remove_vlan_filter_task, device, uni_id,
+                                            add_tag=False, uni_port=uni_port)
+
+        self.log.info("remove_vlan_filter_task", tp_id=tp_id)
+        vlan_remove_task = BrcmVlanFilterTask(self.omci_agent, self, uni_port, _set_vlan_vid,
+                                                    match_vlan, _set_vlan_pcp, add_tag=False,
+                                                    tp_id=tp_id)
+        self._deferred = self._onu_omci_device.task_runner.queue_task(vlan_remove_task)
+        self._deferred.addCallbacks(success, failure)
     def process_inter_adapter_message(self, request):
         self.log.debug('process-inter-adapter-message', type=request.header.type, from_topic=request.header.from_topic,
                        to_topic=request.header.to_topic, to_device_id=request.header.to_device_id)
@@ -951,7 +1225,7 @@
                        serial_number=onu_indication.serial_number)
 
         if onu_indication.oper_state == 'down' or onu_indication.oper_state == "unreachable":
-            self.log.debug('stopping-openomci-statemachine')
+            self.log.debug('stopping-openomci-statemachine', device_id=self.device_id)
             reactor.callLater(0, self._onu_omci_device.stop)
 
             # Let TP download happen again
diff --git a/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py b/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
index 6f704fd..23cd83d 100644
--- a/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
+++ b/python/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
@@ -41,7 +41,9 @@
     task_priority = 200
     name = "Broadcom VLAN Filter/Tagging Task"
 
-    def __init__(self, omci_agent, handler, uni_port, set_vlan_id, tp_id, priority=task_priority):
+    def __init__(self, omci_agent, handler, uni_port, set_vlan_id,
+                 match_vlan=0, set_vlan_pcp=8, add_tag=True,
+                 priority=task_priority, tp_id=0):
         """
         Class initialization
 
diff --git a/python/requirements.txt b/python/requirements.txt
index daf3dd8..85c6640 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
 voltha-protos==3.2.3
-pyvoltha==2.3.14
+pyvoltha==2.3.15