VOL-1250: FTTB flow handling support at OLT and ONU for DT use cases

Change-Id: I5f8b24b3e78d682201b73f440127251dfbc50e0e
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
index 1ebae7d..4f94569 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
@@ -98,6 +98,8 @@
 
 ASFVOLT_HSIA_ID = 7
 
+# NOTE: Flow Ids 8 to 15 are reserved for No-L2-Modification flows. Do not use them
+
 RESERVED_VLAN_ID = 4095
 
 ASFVOLT16_NUM_PON_PORTS = 16
@@ -119,7 +121,9 @@
 # be sufficient for most practical uses cases
 MAX_ONU_ID_PER_PON_PORT = 128
 
-# traffic class specifier
+# Traffic class defined for traffic where PON passes the traffic tranparently
+TRAFFIC_CLASS_1 = 1
+# Traffic class defined for traffic where PON adds/removes tags
 TRAFFIC_CLASS_2 = 2
 
 class FlowInfo(object):
@@ -258,6 +262,17 @@
         # defaults to first NNI ports. Overriden after reading from the device 
         self.nni_intf_id = 0
 
+        # Up to 12 no L2 modificaton (transparent) flows per ONU allowed
+        # When transparent flow is to be added, an Id is popped from this set, and when
+        # the this flow is deleted, the Id is put back to the set.
+        # When the set is empty and is being popped, it raises a KeyError.
+        # Note: This is stateful data and is lot during reconciliation.
+        # There may be failure in adding/removing transparent flows after voltha restart
+        # as a result of storing this stateful data. This needs to be addressed
+        # in the future.
+        self.no_l2_mod_traffic_flow_ids = set([4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
+        self.num_of_no_l2_mod_flows = len(self.no_l2_mod_traffic_flow_ids)
+
     def __del__(self):
         super(Asfvolt16Handler, self).__del__()
 
@@ -355,7 +370,9 @@
         # ++++++++++++++++++++++++++++++++++++++++++++++
         # Note: Theoretical limit of onu_id is 255, but
         # practically we have upto 32 or 64 or 128 ONUs per pon port.
-        # For now limiting the ONU Id to 6 bits (or 32 ONUs)
+        # For now limiting the ONU Id to 6 bits (or 32 ONUs) to
+        # accomadate more transparent flows per ONU (this frees up more bits
+        # for flow id)
         return (onu_id << (self.FLOW_ID_BITS + self.PON_INTF_BITS)
                 | (intf_id << self.FLOW_ID_BITS) | id)
 
@@ -450,8 +467,7 @@
             if flow.traffic_class == traffic_class:
                 self.divide_and_add_flow(v_enet,
                                          flow.classifier,
-                                         flow.action,
-                                         flow.priority)
+                                         flow.action)
                 v_enet.pending_flows.remove(flow)
         return
 
@@ -2206,7 +2222,7 @@
                                    in_port=classifier_info['in_port'])
                     return
                 yield self.divide_and_add_flow(v_enet, classifier_info,
-                                               action_info, flow.priority)
+                                               action_info)
             elif is_down_stream:
                 yield self.add_downstream_only_flow(classifier_info, action_info)
 
@@ -2223,7 +2239,6 @@
                 onu_id = flow['onu_id']
                 intf_id = flow['intf_id']
                 gemport_id = flow['gemport_id']
-                priority = flow['priority']
                 queue_id = flow['queue_id']
                 ds_scheduler_id = flow['ds_scheduler_id']
                 us_scheduler_id = flow['us_scheduler_id']
@@ -2233,40 +2248,64 @@
 
                 # Deactivating and deleting downlink flow
                 is_down_stream = True
-                yield self.bal.deactivate_flow(flow_id,
-                                               is_down_stream,
-                                               onu_id=onu_id, intf_id=intf_id,
-                                               network_int_id=self.nni_intf_id,
-                                               gemport_id=gemport_id,
-                                               priority=priority,
-                                               stag=stag, ctag=ctag,
-                                               ds_scheduler_id=ds_scheduler_id,
-                                               queue_id=queue_id)
+                if self._is_no_l2_mod_flow(flow_id):
+                    yield self.bal.deactivate_no_l2_mod_flow(flow_id,
+                                                        is_down_stream,
+                                                        onu_id=onu_id, intf_id=intf_id,
+                                                        network_int_id=self.nni_intf_id,
+                                                        gemport_id=gemport_id,
+                                                        stag=stag, ctag=ctag,
+                                                        ds_scheduler_id=ds_scheduler_id,
+                                                        queue_id=queue_id)
+                else:
+                    yield self.bal.deactivate_ftth_flow(flow_id,
+                                                        is_down_stream,
+                                                        onu_id=onu_id, intf_id=intf_id,
+                                                        network_int_id=self.nni_intf_id,
+                                                        gemport_id=gemport_id,
+                                                        stag=stag, ctag=ctag,
+                                                        ds_scheduler_id=ds_scheduler_id,
+                                                        queue_id=queue_id)
+ 
                 yield asleep(0.1)
                 yield self.bal.delete_flow(flow_id, is_down_stream)
                 yield asleep(0.1)
 
                 # Deactivating and deleting uplink flow
                 is_down_stream = False
-                yield self.bal.deactivate_flow(flow_id,
-                                               is_down_stream, onu_id=onu_id,
-                                               intf_id=intf_id,
-                                               network_int_id=self.nni_intf_id,
-                                               gemport_id=gemport_id,
-                                               priority=priority,
-                                               stag=stag, ctag=ctag,
-                                               dba_sched_id=dba_sched_id,
-                                               us_scheduler_id=us_scheduler_id)
+                if self._is_no_l2_mod_flow(flow_id):
+                    yield self.bal.deactivate_no_l2_mod_flow(flow_id,
+                                                        is_down_stream, onu_id=onu_id,
+                                                        intf_id=intf_id,
+                                                        network_int_id=self.nni_intf_id,
+                                                        gemport_id=gemport_id,
+                                                        stag=stag, ctag=ctag,
+                                                        dba_sched_id=dba_sched_id,
+                                                        us_scheduler_id=us_scheduler_id)
+                else:
+                    yield self.bal.deactivate_ftth_flow(flow_id,
+                                                        is_down_stream, onu_id=onu_id,
+                                                        intf_id=intf_id,
+                                                        network_int_id=self.nni_intf_id,
+                                                        gemport_id=gemport_id,
+                                                        stag=stag, ctag=ctag,
+                                                        dba_sched_id=dba_sched_id,
+                                                        us_scheduler_id=us_scheduler_id)
                 yield asleep(0.1)
                 yield self.bal.delete_flow(flow_id, is_down_stream)
                 yield asleep(0.1)
 
+                if self._is_no_l2_mod_flow(flow_id):
+                    id = self._get_flow_id(flow_id)
+                    self.log.debug("freeing-up-no-l2-mod-flow-id", id=id)
+                    self.no_l2_mod_traffic_flow_ids.add(id)
+
             elif flow['direction'] == "DOWNSTREAM":
                 flow_id = flow['bal_flow_id']
                 is_down_stream = True
-                # deactivate_flow has generic flow deactivation handling
+                # deactivate_ftth_flow has generic flow deactivation handling
                 # Used this to deactivate LLDP flow
-                yield self.bal.deactivate_flow(flow_id=flow_id,
+                yield self.bal.deactivate_ftth_flow(flow_id=flow_id,
                                                is_downstream=is_down_stream,
                                                network_int_id=self.nni_intf_id)
                 yield asleep(0.1)
@@ -2399,14 +2438,20 @@
     # expects down stream flows to be added to handle
     # packet_out messge from controller.
     @inlineCallbacks
-    def divide_and_add_flow(self, v_enet, classifier, action, priority):
+    def divide_and_add_flow(self, v_enet, classifier, action):
         flow_classifier_set = set(classifier.keys())
         flow_action_set = set(action.keys())
+        no_l2_modification_flow_classifier_set = set(['in_port', 'metadata'])
+        no_l2_modification_flow_action_set = set(['output'])
 
         self.log.debug('flow_classifier_set',
                        flow_classifier_set=flow_classifier_set)
+        self.log.debug('no_l2_modification_flow_classifier_set',
+                       no_l2_modification_flow_classifier_set=no_l2_modification_flow_classifier_set)
         self.log.debug('flow_action_set',
                        flow_action_set=flow_action_set)
+        self.log.debug('no_l2_modification_flow_action_set',
+                       no_l2_modification_flow_action_set=no_l2_modification_flow_action_set)
 
         if 'ip_proto' in classifier:
             if classifier['ip_proto'] == 17:
@@ -2438,7 +2483,13 @@
                 yield self.prepare_and_add_eapol_flow(classifier, action, v_enet,
                                                       ASFVOLT_EAPOL_ID_DATA_VLAN,
                                                       ASFVOLT_DOWNLINK_EAPOL_ID_DATA_VLAN)
-            yield self.add_data_flow(classifier, action, v_enet, priority)
+            yield self.add_data_flow(classifier, action, v_enet)
+        elif no_l2_modification_flow_classifier_set.issubset(flow_classifier_set) and \
+                no_l2_modification_flow_action_set.issubset(flow_action_set):
+            '''
+            No L2 modification specific flow
+            '''
+            yield self.prepare_no_l2_modification_flow(classifier, action, v_enet)
         else:
             self.log.info('Invalid-flow-type-to-handle',
                           classifier=classifier,
@@ -2751,7 +2802,7 @@
         return
 
     @inlineCallbacks
-    def add_data_flow(self, uplink_classifier, uplink_action, v_enet, priority):
+    def add_data_flow(self, uplink_classifier, uplink_action, v_enet):
 
         downlink_classifier = dict(uplink_classifier)
         downlink_action = dict(uplink_action)
@@ -2776,17 +2827,39 @@
         # We need to revisit when mulitple gem port per p bits is needed.
         yield self.add_hsia_flow(uplink_classifier, uplink_action,
                                  downlink_classifier, downlink_action,
-                                 v_enet, priority, ASFVOLT_HSIA_ID)
+                                 v_enet, ASFVOLT_HSIA_ID)
+
+    @inlineCallbacks
+    def prepare_no_l2_modification_flow(self, uplink_classifier,
+                                        uplink_action, v_enet):
+        self.log.debug('prepare_no_l2_modification_flow',
+                       ul_c=uplink_classifier, ul_a=uplink_action, v_en=v_enet)
+        uplink_classifier['pkt_tag_type'] = 'double_tag'
+        del uplink_classifier['metadata']
+        downlink_classifier = dict(uplink_classifier)
+        uplink_action = None
+        downlink_action = None
+        try:
+            no_l2_mod_flow_id = self.no_l2_mod_traffic_flow_ids.pop()
+        except KeyError as err:
+            self.log.error("no-available-flow-ids", err=err)
+            return
+        yield self.add_hsia_flow(uplink_classifier, uplink_action,
+                                 downlink_classifier, downlink_action,
+                                 v_enet, no_l2_mod_flow_id, False)
+
 
     @inlineCallbacks
     def add_hsia_flow(self, uplink_classifier, uplink_action,
                       downlink_classifier, downlink_action,
-                      v_enet, priority, hsia_id, l2_modification_flow=True):
+                      v_enet, hsia_id, l2_modification_flow=True):
         # Add Upstream Firmware Flow.
         # To-Do For a time being hard code the traffic class value.
         # Need to know how to get the traffic class info from flows.
         if l2_modification_flow:
             traffic_class = TRAFFIC_CLASS_2
+        else:
+            traffic_class = TRAFFIC_CLASS_1
 
         queue_id = ASFVOLT16_DEFAULT_QUEUE_ID_START
         gem_port = self.get_gem_port_info(v_enet, traffic_class=traffic_class)
@@ -2842,7 +2915,6 @@
                                     flow_id, gem_port.gemport_id,
                                     uplink_classifier, is_down_stream,
                                     action_info=uplink_action,
-                                    priority=priority,
                                     dba_sched_id=tcont.alloc_id,
                                     queue_id=queue_id,
                                     queue_sched_id=scheduler_id)
@@ -2880,7 +2952,6 @@
                                     downlink_flow_id, gem_port.gemport_id,
                                     downlink_classifier, is_down_stream,
                                     action_info=downlink_action,
-                                    priority=priority,
                                     queue_id=queue_id,
                                     queue_sched_id=scheduler_id)
             # To-Do. While addition of one flow is in progress,
@@ -2903,7 +2974,6 @@
         bal_id_dir_info["onu_id"] = onu_device.proxy_address.onu_id
         bal_id_dir_info["intf_id"] = onu_device.proxy_address.channel_id
         bal_id_dir_info["gemport_id"] = gem_port.gemport_id
-        bal_id_dir_info["priority"] = priority
         bal_id_dir_info["queue_id"] = queue_id
         bal_id_dir_info["queue_sched_id"] = scheduler_id
         bal_id_dir_info["dba_sched_id"] = tcont.alloc_id
@@ -3097,14 +3167,24 @@
         self.log.info('Deleting-Downstream-flow', flow_id=downlink_flow_id)
 
         self.log.info('Retrieving-stored-stag', stag=onu_device.vlan)
-        yield self.bal.deactivate_flow(downlink_flow_id,
-                                       is_down_stream,
-                                       onu_id=onu_id, intf_id=intf_id,
-                                       network_int_id=self.nni_intf_id,
-                                       gemport_id=gem_port.gemport_id,
-                                       stag=onu_device.vlan,
-                                       queue_id=queue_id,
-                                       ds_scheduler_id=ds_scheduler_id)
+        if self._is_no_l2_mod_flow(downlink_flow_id):
+            yield self.bal.deactivate_no_l2_mod_flow(downlink_flow_id,
+                                                is_down_stream,
+                                                onu_id=onu_id, intf_id=intf_id,
+                                                network_int_id=self.nni_intf_id,
+                                                gemport_id=gem_port.gemport_id,
+                                                stag=onu_device.vlan,
+                                                queue_id=queue_id,
+                                                ds_scheduler_id=ds_scheduler_id)
+        else:
+            yield self.bal.deactivate_ftth_flow(downlink_flow_id,
+                                           is_down_stream,
+                                           onu_id=onu_id, intf_id=intf_id,
+                                           network_int_id=self.nni_intf_id,
+                                           gemport_id=gem_port.gemport_id,
+                                           stag=onu_device.vlan,
+                                           ds_scheduler_id=ds_scheduler_id,
+                                           queue_id=queue_id)
 
         # While deletion of one flow is in progress,
         # we cannot delete an another flow. Right now use sleep
@@ -3117,14 +3197,24 @@
         is_down_stream = False
         self.log.info('deleting-Upstream-flow',
                       flow_id=uplink_flow_id)
-        yield self.bal.deactivate_flow(uplink_flow_id,
-                                       is_down_stream, onu_id=onu_id,
-                                       intf_id=intf_id,
-                                       network_int_id=self.nni_intf_id,
-                                       gemport_id=gem_port.gemport_id,
-                                       stag=onu_device.vlan,
-                                       dba_sched_id=dba_sched_id,
-                                       us_scheduler_id=us_scheduler_id)
+        if self._is_no_l2_mod_flow(uplink_flow_id):
+            yield self.bal.deactivate_no_l2_mod_flow(uplink_flow_id,
+                                                is_down_stream, onu_id=onu_id,
+                                                intf_id=intf_id,
+                                                network_int_id=self.nni_intf_id,
+                                                gemport_id=gem_port.gemport_id,
+                                                stag=onu_device.vlan,
+                                                dba_sched_id=dba_sched_id,
+                                                us_scheduler_id=us_scheduler_id)
+        else:
+            yield self.bal.deactivate_ftth_flow(uplink_flow_id,
+                                                is_down_stream, onu_id=onu_id,
+                                                intf_id=intf_id,
+                                                network_int_id=self.nni_intf_id,
+                                                gemport_id=gem_port.gemport_id,
+                                                stag=onu_device.vlan,
+                                                dba_sched_id=dba_sched_id,
+                                                us_scheduler_id=us_scheduler_id)
 
         # While deletion of one flow is in progress,
         # we cannot delete an another flow. Right now use sleep
@@ -3195,6 +3285,13 @@
             yield asleep(0.2)
         return
 
+    def _is_no_l2_mod_flow(self, flow_id):
+        id = flow_id & ((2**self.FLOW_ID_BITS) - 1)
+        if ASFVOLT_HSIA_ID < id <= (ASFVOLT_HSIA_ID + self.num_of_no_l2_mod_flows):
+            return True
+
+        return False
+
     def _get_flow_id(self, flow_id):
         return flow_id & ((2**self.FLOW_ID_BITS) - 1)
 
diff --git a/voltha/adapters/asfvolt16_olt/bal.py b/voltha/adapters/asfvolt16_olt/bal.py
index 33d3deb..17403b9 100644
--- a/voltha/adapters/asfvolt16_olt/bal.py
+++ b/voltha/adapters/asfvolt16_olt/bal.py
@@ -259,7 +259,7 @@
     @inlineCallbacks
     def add_flow(self, onu_id=None, intf_id=None, network_int_id=None,
                  flow_id=None, gem_port=None, classifier_info=None,
-                 is_downstream=None, action_info=None, priority=None,
+                 is_downstream=None, action_info=None,
                  dba_sched_id=None, queue_id=None, queue_sched_id=None):
         try:
             obj = bal_pb2.BalCfg()
@@ -291,8 +291,6 @@
                 obj.flow.data.sub_term_id = onu_id
             if gem_port:
                 obj.flow.data.svc_port_id = gem_port
-            if priority:
-                obj.flow.data.priority = priority
             obj.flow.data.classifier.presence_mask = 0
 
             if classifier_info is None:
@@ -396,19 +394,18 @@
         return
 
     @inlineCallbacks
-    def deactivate_flow(self, flow_id,
-                        is_downstream,
-                        onu_id=None,
-                        intf_id=None,
-                        network_int_id=None,
-                        gemport_id=None,
-                        priority=None,
-                        stag=None,
-                        ctag=None,
-                        dba_sched_id=None,
-                        us_scheduler_id=None,
-                        ds_scheduler_id=None,
-                        queue_id=None):
+    def deactivate_ftth_flow(self, flow_id,
+                             is_downstream,
+                             onu_id=None,
+                             intf_id=None,
+                             network_int_id=None,
+                             gemport_id=None,
+                             stag=None,
+                             ctag=None,
+                             dba_sched_id=None,
+                             us_scheduler_id=None,
+                             ds_scheduler_id=None,
+                             queue_id=None):
         try:
             obj = bal_pb2.BalCfg()
             # Fill Header details
@@ -426,8 +423,6 @@
                 obj.flow.data.sub_term_id = onu_id
             if gemport_id is not None:
                 obj.flow.data.svc_port_id = gemport_id
-            if priority is not None:
-                obj.flow.data.priority = priority
 
             if is_downstream is True:
                 obj.flow.key.flow_type = \
@@ -491,6 +486,73 @@
         return
 
     @inlineCallbacks
+    def deactivate_no_l2_mod_flow(self, flow_id,
+                             is_downstream,
+                             onu_id=None,
+                             intf_id=None,
+                             network_int_id=None,
+                             gemport_id=None,
+                             stag=None,
+                             ctag=None,
+                             dba_sched_id=None,
+                             us_scheduler_id=None,
+                             ds_scheduler_id=None,
+                             queue_id=None):
+        try:
+            obj = bal_pb2.BalCfg()
+            # Fill Header details
+            obj.device_id = self.device_id.encode('ascii', 'ignore')
+            obj.hdr.obj_type = bal_model_ids_pb2.BAL_OBJ_ID_FLOW
+            # Fill Access Terminal Details
+            # To-DO flow ID need to be retrieved from flow details
+            obj.flow.key.flow_id = flow_id
+            obj.flow.data.admin_state = bal_model_types_pb2.BAL_STATE_DOWN
+            if intf_id is not None:
+                obj.flow.data.access_int_id = intf_id
+            if network_int_id is not None:
+                obj.flow.data.network_int_id = network_int_id
+            if onu_id is not None:
+                obj.flow.data.sub_term_id = onu_id
+            if gemport_id is not None:
+                obj.flow.data.svc_port_id = gemport_id
+            if is_downstream is True:
+                obj.flow.key.flow_type = \
+                    bal_model_types_pb2.BAL_FLOW_TYPE_DOWNSTREAM
+            else:
+                obj.flow.key.flow_type = \
+                    bal_model_types_pb2.BAL_FLOW_TYPE_UPSTREAM
+
+            obj.flow.data.classifier.pkt_tag_type = \
+                bal_model_types_pb2.BAL_PKT_TAG_TYPE_DOUBLE_TAG
+            obj.flow.data.classifier.presence_mask |= \
+                bal_model_types_pb2.BAL_CLASSIFIER_ID_PKT_TAG_TYPE
+            obj.flow.data.classifier.o_vid = stag
+            if ctag != RESERVED_VLAN_ID:
+                obj.flow.data.classifier.o_vid = ctag
+                obj.flow.data.classifier.presence_mask |= \
+                    bal_model_types_pb2.BAL_CLASSIFIER_ID_O_VID
+
+            if dba_sched_id is not None:
+                obj.flow.data.dba_tm_sched_id = dba_sched_id
+
+            if queue_id is not None:
+                obj.flow.data.queue.queue_id = queue_id
+                if ds_scheduler_id is not None:
+                    obj.flow.data.queue.sched_id = ds_scheduler_id
+            else:
+                obj.flow.data.queue.queue_id = 0
+                if us_scheduler_id is not None:
+                    obj.flow.data.queue.sched_id = us_scheduler_id
+
+            self.log.info('deactivating-flows-from-OLT-Device',
+                          flow_details=obj)
+            yield self.stub.BalCfgSet(obj, timeout=GRPC_TIMEOUT)
+        except Exception as e:
+            self.log.exception('deactivate_flow-exception',
+                          flow_id, onu_id, exc=str(e))
+        return
+
+    @inlineCallbacks
     def deactivate_eapol_flow(self, flow_id, is_downstream,
                               onu_id=None,
                               intf_id=None,
@@ -674,30 +736,6 @@
         return
 
     @inlineCallbacks
-    def delete_scheduler(self, id, direction):
-        try:
-            obj = bal_pb2.BalKey()
-            obj.hdr.obj_type = bal_model_ids_pb2.BAL_OBJ_ID_TM_SCHED
-            # Fill Access Terminal Details
-            if direction == 'downstream':
-                obj.tm_sched_key.dir =\
-                    bal_model_types_pb2.BAL_TM_SCHED_DIR_DS
-            else:
-                obj.tm_sched_key.dir = \
-                    bal_model_types_pb2.BAL_TM_SCHED_DIR_US
-            obj.tm_sched_key.id = id
-            self.log.info('Deleting Scheduler',
-                          scheduler_details=obj)
-            yield self.stub.BalCfgClear(obj, timeout=GRPC_TIMEOUT)
-        except Exception as e:
-            self.log.info('creat-scheduler-exception',
-                          olt=self.olt.olt_id,
-                          sched_id=id,
-                          direction=direction,
-                          exc=str(e))
-        return
-
-    @inlineCallbacks
     def delete_flow(self, flow_id, is_downstream):
         try:
             obj = bal_pb2.BalKey()
@@ -832,7 +870,6 @@
             Need to fetch schd_type then assign either one of them
             '''
             if weight is not None:
-                #obj.tm_queue_cfg.data.priority = priority
                 obj.tm_queue_cfg.data.weight = weight
 
             if rate_info is not None:
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 3cfde23..35b2537 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -846,16 +846,26 @@
 
     def send_set_8021p_mapper_service_profile(self,
                                               entity_id,
-                                              interwork_tp_id=None):
+                                              interwork_tp_id_0_6=None,
+                                              interwork_tp_id_7=None):
         data = dict()
-        data['interwork_tp_pointer_for_p_bit_priority_0']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_1']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_2']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_3']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_4']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_5']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_6']=interwork_tp_id
-        data['interwork_tp_pointer_for_p_bit_priority_7']=interwork_tp_id
+        if interwork_tp_id_0_6 is not None:
+            data['interwork_tp_pointer_for_p_bit_priority_0']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_1']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_2']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_3']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_4']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_5']=interwork_tp_id_0_6
+            data['interwork_tp_pointer_for_p_bit_priority_6']=interwork_tp_id_0_6
+        if interwork_tp_id_7 is not None:
+            data['interwork_tp_pointer_for_p_bit_priority_0']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_1']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_2']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_3']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_4']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_5']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_6']=interwork_tp_id_7
+            data['interwork_tp_pointer_for_p_bit_priority_7']=interwork_tp_id_7
 
         frame = OmciFrame(
             transaction_id=self.get_tx_id(),
@@ -1455,6 +1465,18 @@
             yield self.wait_for_response()
 
             # Set AR - ExtendedVlanTaggingOperationConfigData
+            #          514 - RxVlanTaggingOperationTable - add VLAN <cvid> to priority tagged pkts - c-vid
+            # To be removed once DPU supports sending vlan4090p3 instead of vlan4090p0
+            self.send_set_extended_vlan_tagging_operation_vlan_configuration_data_single_tag(0x200 + port_id, 0, mcvid, 0, 1, 7, mcvid)
+            yield self.wait_for_response()
+
+
+            # Set AR - ExtendedVlanTaggingOperationConfigData
+            #          514 - RxVlanTaggingOperationTable - add VLAN <cvid> to priority tagged pkts - c-vid
+            self.send_set_extended_vlan_tagging_operation_vlan_configuration_data_single_tag(0x200 + port_id, 3, mcvid, 0, 1, 7, mcvid)
+            yield self.wait_for_response()
+
+            # Set AR - ExtendedVlanTaggingOperationConfigData
             #          514 - RxVlanTaggingOperationTable - add VLAN <cvid> to untagged pkts - c-vid
             self.send_set_extended_vlan_tagging_operation_vlan_configuration_data_untagged(0x200 + port_id, 0x1000, cvid)
             yield self.wait_for_response()
@@ -1689,7 +1711,12 @@
             # Set AR - 802.1pMapperServiceProfile - Mapper_ profile_id -
             #                                       gem_port_tp pointer
 
-            self.send_set_8021p_mapper_service_profile(0x8001, interwork_tp_id=gem_port.gemport_id)
+            if gem_port.traffic_class == 2:
+                self.send_set_8021p_mapper_service_profile(0x8001,
+                                                           interwork_tp_id_7=gem_port.gemport_id)
+            else:
+                self.send_set_8021p_mapper_service_profile(0x8001,
+                                                           interwork_tp_id_0_6=gem_port.gemport_id)
                
             yield self.wait_for_response()
 
@@ -1704,7 +1731,9 @@
             self.log.error('device-unreachable')
             return
 
-        self.send_set_8021p_mapper_service_profile(0x8001, interwork_tp_id=0xFFFF)
+        self.send_set_8021p_mapper_service_profile(0x8001,
+                                                   interwork_tp_id_0_6=0xFFFF,
+                                                   interwork_tp_id_7=0xFFFF)
         yield self.wait_for_response()
 
         self.send_delete_omci_mesage(GemInterworkingTp.class_id,
diff --git a/voltha/core/flow_decomposer.py b/voltha/core/flow_decomposer.py
index d98c1a2..125bfe4 100644
--- a/voltha/core/flow_decomposer.py
+++ b/voltha/core/flow_decomposer.py
@@ -702,25 +702,62 @@
                     ))
 
                 else:
-                    assert out_port_no is not None
-                    fl_lst, _ = device_rules.setdefault(
-                        egress_hop.device.id, ([], []))
-                    fl_lst.append(mk_flow_stat(
-                        priority=flow.priority,
-                        cookie=flow.cookie,
-                        match_fields=[
-                            in_port(egress_hop.ingress_port.port_no),
-                        ] + [
-                            field for field in get_ofb_fields(flow)
-                            if field.type not in (IN_PORT, )
-                        ],
-                        actions=[
-                            action for action in get_actions(flow)
-                            if action.type != OUTPUT
-                        ] + [
-                            output(egress_hop.egress_port.port_no)
-                        ]
-                    ))
+
+                    actions = [action.type for action in get_actions(flow)]
+                    # Transparent ONU and OLT case (No-L2-Modification flow)
+                    if len(actions) == 1 and OUTPUT in actions:
+                        child_device_flow_lst, _ = device_rules.setdefault(
+                            ingress_hop.device.id, ([], []))
+                        parent_device_flow_lst, _ = device_rules.setdefault(
+                            egress_hop.device.id, ([], []))
+
+                        child_device_flow_lst.append(mk_flow_stat(
+                                            priority = flow.priority,
+                                            cookie = flow.cookie,
+                                            match_fields=[
+                                                in_port(ingress_hop.ingress_port.port_no)
+                                            ] + [
+                                                field for field in get_ofb_fields(flow)
+                                                if field.type not in (IN_PORT,)
+                                            ],
+                                            actions=[
+                                                output(ingress_hop.egress_port.port_no)
+                                            ]
+                        ))
+
+                        parent_device_flow_lst.append(mk_flow_stat(
+                                            priority = flow.priority,
+                                            cookie=flow.cookie,
+                                            match_fields=[
+                                                in_port(egress_hop.ingress_port.port_no),
+                                            ] + [
+                                                field for field in get_ofb_fields(flow)
+                                                if field.type not in (IN_PORT,)
+                                            ],
+                                            actions=[
+                                                output(egress_hop.egress_port.port_no)
+                                            ]
+                        ))
+                    else:
+                        assert out_port_no is not None
+                        fl_lst, _ = device_rules.setdefault(
+                            egress_hop.device.id, ([], []))
+                        fl_lst.append(mk_flow_stat(
+                            priority=flow.priority,
+                            cookie=flow.cookie,
+                            match_fields=[
+                                in_port(egress_hop.ingress_port.port_no),
+                            ] + [
+                                field for field in get_ofb_fields(flow)
+                                if field.type not in (IN_PORT, )
+                            ],
+                            actions=[
+                                action for action in get_actions(flow)
+                                if action.type != OUTPUT
+                            ] + [
+                                output(egress_hop.egress_port.port_no)
+                            ]
+                        ))
 
             else:  # downstream
                 if has_next_table(flow):
@@ -790,25 +827,62 @@
                         ))
 
                 elif out_port_no is not None:  # unicast case
-                    fl_lst, _ = device_rules.setdefault(
-                        egress_hop.device.id, ([], []))
-                    fl_lst.append(mk_flow_stat(
-                        priority=flow.priority,
-                        cookie=flow.cookie,
-                        match_fields=[
-                            in_port(egress_hop.ingress_port.port_no)
-                        ] + [
-                            field for field in get_ofb_fields(flow)
-                            if field.type not in (IN_PORT,)
-                        ],
-                        actions=[
-                            action for action in get_actions(flow)
-                            if action.type not in (OUTPUT,)
-                        ] + [
-                            output(egress_hop.egress_port.port_no)
-                        ]
 
-                    ))
+                    actions = [action.type for action in get_actions(flow)]
+                    # Transparent ONU and OLT case (No-L2-Modification flow)
+                    if len(actions) == 1 and OUTPUT in actions:
+                        parent_device_flow_lst, _ = device_rules.setdefault(
+                                                ingress_hop.device.id, ([], []))
+                        child_device_flow_lst, _ = device_rules.setdefault(
+                                                egress_hop.device.id, ([], []))
+
+                        parent_device_flow_lst.append(mk_flow_stat(
+                                            priority=flow.priority,
+                                            cookie=flow.cookie,
+                                            match_fields=[
+                                                in_port(ingress_hop.ingress_port.port_no)
+                                            ] + [
+                                                field for field in get_ofb_fields(flow)
+                                                if field.type not in (IN_PORT,)
+                                            ],
+                                            actions=[
+                                                 output(ingress_hop.egress_port.port_no)
+                                            ]
+                                            ))
+
+                        child_device_flow_lst.append(mk_flow_stat(
+                                            priority = flow.priority,
+                                            cookie=flow.cookie,
+                                            match_fields = [
+                                                 in_port(egress_hop.ingress_port.port_no),
+                                            ] + [
+                                                 field for field in get_ofb_fields(flow)
+                                                 if field.type not in (IN_PORT, )
+                                            ],
+                                            actions=[
+                                                 output(egress_hop.egress_port.port_no)
+                                            ]
+                                            ))
+                    else:
+                        fl_lst, _ = device_rules.setdefault(
+                            egress_hop.device.id, ([], []))
+                        fl_lst.append(mk_flow_stat(
+                            priority=flow.priority,
+                            cookie=flow.cookie,
+                            match_fields=[
+                                in_port(egress_hop.ingress_port.port_no)
+                            ] + [
+                                field for field in get_ofb_fields(flow)
+                                if field.type not in (IN_PORT,)
+                            ],
+                            actions=[
+                                action for action in get_actions(flow)
+                                if action.type not in (OUTPUT,)
+                            ] + [
+                                output(egress_hop.egress_port.port_no)
+                            ]
+
+                        ))
 
                 else:
                     grp_id = get_group(flow)