VOL-1044 VOL-1114 VOL-1115 Openolt acts on logical flows directly and can remove flows

Change-Id: I4d71a365d1af3fa4285b2e84280afe4f7039e343
diff --git a/voltha/adapters/openolt/openolt.py b/voltha/adapters/openolt/openolt.py
index 288155c..e2df59c 100644
--- a/voltha/adapters/openolt/openolt.py
+++ b/voltha/adapters/openolt/openolt.py
@@ -41,7 +41,8 @@
         DeviceType(
             id=name,
             adapter=name,
-            accepts_bulk_flow_update=True
+            accepts_bulk_flow_update=True,
+            accepts_direct_logical_flows_update=True
         )
     ]
 
@@ -200,6 +201,18 @@
                  'implemented')
         raise NotImplementedError()
 
+    def update_logical_flows(self, device_id, flows_to_add, flows_to_remove,
+                             groups, device_rules_map):
+
+        log.info('logical-flows-update', flows_to_add=len(flows_to_add),
+                 flows_to_remove = len(flows_to_remove))
+        log.debug('logical-flows-details', flows_to_add=flows_to_add,
+                  flows_to_remove=flows_to_remove)
+        assert len(groups) == 0, "Cannot yet deal with groups"
+        handler = self.devices[device_id]
+        handler.update_logical_flows(flows_to_add, flows_to_remove,
+                                     device_rules_map)
+
     def update_pm_config(self, device, pm_configs):
         log.info('update_pm_config - Not implemented yet', device=device,
                   pm_configs=pm_configs)
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index c8bd97f..43bbc75 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -182,7 +182,8 @@
         self.adapter_agent.update_device(device)
 
         self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
-        self.flow_mgr = OpenOltFlowMgr(self.log, self.stub, self.device_id)
+        self.flow_mgr = OpenOltFlowMgr(self.log, self.stub, self.device_id,
+                                       self.logical_device_id)
         self.alarm_mgr = OpenOltAlarmMgr(self.log, self.adapter_agent,
                                          self.device_id,
                                          self.logical_device_id)
@@ -843,43 +844,40 @@
                 hex(ord(vendor_specific[3]) & 0x0f)[2:]])
 
     def update_flow_table(self, flows):
-        if not self.is_state_up() and not self.is_state_connected():
-            self.log.info('OLT is down, ignore update flow table')
-            return
+        self.log.debug('No updates here now, all is done in logical flows '
+                       'update')
 
-        device = self.adapter_agent.get_device(self.device_id)
-        self.log.debug('update flow table', number_of_flows=len(flows))
 
-        for flow in flows:
-            is_down_stream = None
-            in_port = fd.get_in_port(flow)
-            assert in_port is not None
-            # Right now there is only one NNI port. Get the NNI PORT and
-            # compare with IN_PUT port number. Need to find better way.
-            ports = self.adapter_agent.get_ports(device.id, Port.ETHERNET_NNI)
+    def update_logical_flows(self, flows_to_add, flows_to_remove,
+                             device_rules_map):
 
-            for port in ports:
-                if (port.port_no == in_port):
-                    self.log.debug('downstream-flow', in_port=in_port)
-                    is_down_stream = True
-                    break
-            if is_down_stream is None:
-                is_down_stream = False
-                self.log.debug('upstream-flow', in_port=in_port)
+        self.log.debug('logical flows update', flows_to_add=flows_to_add,
+            flows_to_remove=flows_to_remove)
 
-            for flow in flows:
-                try:
-                    self.flow_mgr.add_flow(flow, is_down_stream)
-                except grpc.RpcError as grpc_e:
-                    if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
-                        self.log.warn('flow already exists', e=grpc_e,
-                                      flow=flow)
-                    else:
-                        self.log.error('failed to add flow', flow=flow,
-                                       e=grpc_e)
-                except Exception as e:
-                    self.log.error('failed to add flow', flow=flow, e=e)
+        for flow in flows_to_add:
 
+            try:
+                self.flow_mgr.add_flow(flow)
+            except grpc.RpcError as grpc_e:
+                if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
+                    self.log.warn('flow already exists', e=grpc_e,
+                                   flow=flow)
+                else:
+                    self.log.error('failed to add flow', flow=flow,
+                                   grpc_error=grpc_e)
+            except Exception as e:
+                self.log.error('failed to add flow', flow=flow, e=e)
+
+        self.flow_mgr.update_children_flows(device_rules_map)
+
+        for flow in flows_to_remove:
+
+            try:
+                self.flow_mgr.remove_flow(flow)
+            except Exception as e:
+                self.log.error('failed to remove flow', flow=flow, e=e)
+
+    # There has to be a better way to do this
     def ip_hex(self, ip):
         octets = ip.split(".")
         hex_ip = []
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 48c877d..ae388e0 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -13,8 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import copy
+from twisted.internet import reactor
 
-from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC
+from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
+    ofp_flow_stats, ofp_match, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT
 import voltha.core.flow_decomposer as fd
 import openolt_platform as platform
 from voltha.adapters.openolt.protos import openolt_pb2
@@ -37,20 +40,23 @@
 
 class OpenOltFlowMgr(object):
 
-    def __init__(self, log, stub, device_id):
+    def __init__(self, log, stub, device_id, logical_device_id):
         self.log = log
         self.stub = stub
         self.device_id = device_id
-        self.flow_proxy = registry('core').get_proxy(
+        self.logical_device_id = logical_device_id
+        self.logical_flows_proxy = registry('core').get_proxy(
+            '/logical_devices/{}/flows'.format(self.logical_device_id))
+        self.flows_proxy = registry('core').get_proxy(
             '/devices/{}/flows'.format(self.device_id))
+        self.root_proxy = registry('core').get_proxy('/')
+        self.fd_object = fd.FlowDecomposer()
 
-    def add_flow(self, flow, is_down_stream):
-        self.log.debug('add flow', flow=flow, is_down_stream=is_down_stream)
+    def add_flow(self, flow):
+        self.log.debug('add flow', flow=flow)
         classifier_info = dict()
         action_info = dict()
 
-        in_port = fd.get_in_port(flow)
-        assert in_port is not None
 
         for field in fd.get_ofb_fields(flow):
             if field.type == fd.ETH_TYPE:
@@ -104,13 +110,16 @@
                                output=action_info['output'],
                                in_port=classifier_info['in_port'])
             elif action.type == fd.POP_VLAN:
+                if fd.get_goto_table_id(flow) is None:
+                    self.log.debug('being taken care of by ONU', flow=flow)
+                    return
                 action_info['pop_vlan'] = True
-                self.log.debug('action-type-pop-vlan', in_port=in_port)
+                self.log.debug('action-type-pop-vlan', in_port=classifier_info['in_port'])
             elif action.type == fd.PUSH_VLAN:
                 action_info['push_vlan'] = True
                 action_info['tpid'] = action.push.ethertype
                 self.log.debug('action-type-push-vlan',
-                               push_tpid=action_info['tpid'], in_port=in_port)
+                               push_tpid=action_info['tpid'], in_port=classifier_info['in_port'])
                 if action.push.ethertype != 0x8100:
                     self.log.error('unhandled-tpid',
                                    ethertype=action.push.ethertype)
@@ -120,7 +129,7 @@
                 assert (action.set_field.field.oxm_class ==
                         OFPXMC_OPENFLOW_BASIC)
                 self.log.debug('action-type-set-field',
-                               field=_field, in_port=in_port)
+                               field=_field, in_port=classifier_info['in_port'])
                 if _field.type == fd.VLAN_VID:
                     self.log.debug('set-field-type-vlan-vid',
                                    vlan_vid=_field.vlan_vid & 0xfff)
@@ -130,112 +139,148 @@
                                    field_type=_field.type)
             else:
                 self.log.error('unsupported-action-type',
-                               action_type=action.type, in_port=in_port)
+                               action_type=action.type, in_port=classifier_info['in_port'])
 
-        # FIXME - Why ignore downstream flows?
-        if is_down_stream is False:
-            intf_id = platform.intf_id_from_uni_port_num(
-                classifier_info['in_port'])
-            onu_id = platform.onu_id_from_port_num(
-                classifier_info['in_port'])
-            self.divide_and_add_flow(intf_id, onu_id,
-                                     flow.priority, classifier_info,
-                                     action_info)
-        # else:
-        #    self.log.info('ignore downstream flow', flow=flow,
-        #            classifier_info=classifier_info,
-        #            action_info=action_info)
+        if fd.get_goto_table_id(flow) is not None and not 'pop_vlan' in \
+                action_info:
+            self.log.debug('being taken care of by ONU', flow=flow)
 
-    # FIXME - No need for divide_and_add_flow if
-    # both upstream and downstream flows
-    # are acted upon (not just upstream flows).
-    def divide_and_add_flow(self, intf_id, onu_id, priority, classifier,
-                            action):
+        if not 'output' in action_info and 'metadata' in classifier_info:
+            #find flow in the next table
+            next_flow = self.find_next_flow(flow)
+            if next_flow is None:
+                return
+            action_info['output'] = fd.get_out_port(next_flow)
+            for field in fd.get_ofb_fields(next_flow):
+                if field.type == fd.VLAN_VID:
+                    classifier_info['metadata'] = field.vlan_vid & 0xfff
+
+
+        (intf_id, onu_id) = platform.extract_access_from_flow(
+            classifier_info['in_port'], action_info['output'])
+
+
+        self.divide_and_add_flow(intf_id, onu_id, classifier_info,
+                                 action_info, flow)
+
+    def remove_flow(self, flow):
+        self.log.debug('trying to remove flows from logical flow :',
+                   logical_flow=flow)
+        device_flows_to_remove = []
+        device_flows = self.flows_proxy.get('/').items
+        for f in device_flows:
+            if f.cookie == flow.id:
+                device_flows_to_remove.append(f)
+
+
+        for f in device_flows_to_remove:
+            (id, direction) = self.decode_stored_id(f.id)
+            flow_to_remove = openolt_pb2.Flow(flow_id=id, flow_type=direction)
+            self.stub.FlowRemove(flow_to_remove)
+            self.log.debug('flow removed from device', flow=f,
+                           flow_key=flow_to_remove)
+
+        if len(device_flows_to_remove) > 0:
+            new_flows = []
+            flows_ids_to_remove = [f.id for f in device_flows_to_remove]
+            for f in device_flows:
+                if f.id not in flows_ids_to_remove:
+                    new_flows.append(f)
+
+            self.flows_proxy.update('/', Flows(items=new_flows))
+            self.log.debug('flows removed from the data store',
+                           flow_ids_removed=flows_ids_to_remove,
+                           number_of_flows_removed=(len(device_flows) - len(
+                               new_flows)), expected_flows_removed=len(
+                            device_flows_to_remove))
+        else:
+            self.log.debug('no device flow to remove for this flow (normal '
+                           'for multi table flows)', flow=flow)
+
+
+    def divide_and_add_flow(self, intf_id, onu_id, classifier,
+                            action, flow):
+
+        self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
+                       classifier=classifier, action=action)
+
         if 'ip_proto' in classifier:
             if classifier['ip_proto'] == 17:
                 self.log.debug('dhcp flow add')
-                self.add_dhcp_trap(intf_id, onu_id, priority, classifier,
-                                   action)
+                self.add_dhcp_trap(intf_id, onu_id, classifier,
+                                   action, flow)
             elif classifier['ip_proto'] == 2:
-                self.log.debug('igmp flow add ignored')
+                self.log.warn('igmp flow add ignored, not implemented yet')
             else:
-                self.log.debug("Invalid-Classifier-to-handle",
+                self.log.warn("Invalid-Classifier-to-handle",
                                classifier=classifier,
                                action=action)
         elif 'eth_type' in classifier:
             if classifier['eth_type'] == EAP_ETH_TYPE:
                 self.log.debug('eapol flow add')
-                self.add_eapol_flow(intf_id, onu_id, priority)
+                self.add_eapol_flow(intf_id, onu_id, flow)
 
         elif 'push_vlan' in action:
-            self.add_data_flow(intf_id, onu_id, priority, classifier, action)
+            self.add_upstream_data_flow(intf_id, onu_id, classifier, action,
+                                      flow)
+        elif 'pop_vlan' in action:
+            self.add_downstream_data_flow(intf_id, onu_id, classifier,
+                                          action, flow)
         else:
             self.log.debug('Invalid-flow-type-to-handle',
                            classifier=classifier,
-                           action=action)
+                           action=action, flow=flow)
 
-    def add_data_flow(self, intf_id, onu_id, priority, uplink_classifier,
-                      uplink_action):
 
-        downlink_classifier = dict(uplink_classifier)
-        downlink_action = dict(uplink_action)
+    def add_upstream_data_flow(self, intf_id, onu_id, uplink_classifier,
+                       uplink_action, logical_flow):
+
 
         uplink_classifier['pkt_tag_type'] = 'single_tag'
 
-        downlink_classifier['pkt_tag_type'] = 'double_tag'
-        downlink_classifier['vlan_vid'] = uplink_action['vlan_vid']
-        downlink_classifier['metadata'] = uplink_classifier['vlan_vid']
-        del downlink_action['push_vlan']
-        downlink_action['pop_vlan'] = True
-
-        # To-Do right now only one GEM port is supported, so below method
-        # will take care of handling all the p bits.
-        # We need to revisit when mulitple gem port per p bits is needed.
-        self.add_hsia_flow(intf_id, onu_id, priority, uplink_classifier,
-                           uplink_action, downlink_classifier, downlink_action,
-                           HSIA_FLOW_INDEX)
+        self.add_hsia_flow(intf_id, onu_id, uplink_classifier,
+                           uplink_action, 'upstream', HSIA_FLOW_INDEX,
+                           logical_flow)
 
         # Secondary EAP on the subscriber vlan
-        (eap_active, eap_priority) = self.is_eap_enabled(intf_id, onu_id)
+        (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id)
         if eap_active:
-            self.add_eapol_flow(intf_id, onu_id, eap_priority,
+            self.add_eapol_flow(intf_id, onu_id, eap_logical_flow,
                 uplink_eapol_id=EAPOL_UPLINK_SECONDARY_FLOW_INDEX,
                 downlink_eapol_id=EAPOL_DOWNLINK_SECONDARY_FLOW_INDEX,
                 vlan_id=uplink_classifier['vlan_vid'])
 
-    def add_hsia_flow(self, intf_id, onu_id, priority, uplink_classifier,
-                      uplink_action, downlink_classifier, downlink_action,
-                      hsia_id):
+
+    def add_downstream_data_flow(self, intf_id, onu_id, downlink_classifier,
+                                 downlink_action, flow):
+        downlink_classifier['pkt_tag_type'] = 'double_tag'
+        # Needed ???? It should be already there
+        downlink_action['pop_vlan'] = True
+        downlink_action['vlan_vid'] = downlink_classifier['vlan_vid']
+
+        self.add_hsia_flow(intf_id, onu_id, downlink_classifier,
+                           downlink_action, 'downstream', HSIA_FLOW_INDEX,
+                           flow)
+
+    # To-Do right now only one GEM port is supported, so below method
+    # will take care of handling all the p bits.
+    # We need to revisit when mulitple gem port per p bits is needed.
+    def add_hsia_flow(self, intf_id, onu_id, classifier, action,
+                      direction, hsia_id, logical_flow):
 
         gemport_id = platform.mk_gemport_id(onu_id)
         flow_id = platform.mk_flow_id(intf_id, onu_id, hsia_id)
 
-        self.log.debug('add upstream flow', onu_id=onu_id,
-                       classifier=uplink_classifier, action=uplink_action,
-                       gemport_id=gemport_id, flow_id=flow_id)
-
         flow = openolt_pb2.Flow(
-            onu_id=onu_id, flow_id=flow_id, flow_type="upstream",
-            access_intf_id=intf_id, gemport_id=gemport_id, priority=priority,
-            classifier=self.mk_classifier(uplink_classifier),
-            action=self.mk_action(uplink_action))
-
-        self.stub.FlowAdd(flow)
-
-        self.log.debug('add downstream flow', classifier=downlink_classifier,
-                       action=downlink_action, gemport_id=gemport_id,
-                       flow_id=flow_id)
-
-        flow = openolt_pb2.Flow(
-                onu_id=onu_id, flow_id=flow_id, flow_type="downstream",
+                onu_id=onu_id, flow_id=flow_id, flow_type=direction,
                 access_intf_id=intf_id, gemport_id=gemport_id,
-                priority=priority,
-                classifier=self.mk_classifier(downlink_classifier),
-                action=self.mk_action(downlink_action))
+                priority=logical_flow.priority,
+                classifier=self.mk_classifier(classifier),
+                action=self.mk_action(action))
 
-        self.stub.FlowAdd(flow)
+        self.add_flow_to_device(flow, logical_flow)
 
-    def add_dhcp_trap(self, intf_id, onu_id, priority, classifier, action):
+    def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow):
 
         self.log.debug('add dhcp upstream trap', classifier=classifier,
                        action=action)
@@ -250,17 +295,22 @@
 
         upstream_flow = openolt_pb2.Flow(
             onu_id=onu_id, flow_id=flow_id, flow_type="upstream",
-            access_intf_id=intf_id, network_intf_id=0, gemport_id=gemport_id,
-            priority=priority, classifier=self.mk_classifier(classifier),
+            access_intf_id=intf_id, gemport_id=gemport_id,
+            priority=logical_flow.priority,
+            classifier=self.mk_classifier(classifier),
             action=self.mk_action(action))
 
-        self.stub.FlowAdd(upstream_flow)
+        self.add_flow_to_device(upstream_flow, logical_flow)
 
-        # FIXME - Fix OpenOLT handling of downstream flows instead
-        #         of duplicating the downstream flow from the upstream
-        #         flow.
+
         # FIXME - ONOS should send explicit upstream and downstream
         #         exact dhcp trap flow.
+
+        downstream_logical_flow = copy.deepcopy(logical_flow)
+        for oxm_field in downstream_logical_flow.match.oxm_fields:
+            if oxm_field.ofb_field.type == OFPXMT_OFB_IN_PORT:
+                oxm_field.ofb_field.port = 128
+
         classifier['udp_src'] = 67
         classifier['udp_dst'] = 68
         classifier['pkt_tag_type'] = 'double_tag'
@@ -272,22 +322,18 @@
         downstream_flow = openolt_pb2.Flow(
             onu_id=onu_id, flow_id=flow_id, flow_type="downstream",
             access_intf_id=intf_id, network_intf_id=0, gemport_id=gemport_id,
-            priority=priority, classifier=self.mk_classifier(classifier),
+            priority=logical_flow.priority, classifier=self.mk_classifier(
+                classifier),
             action=self.mk_action(action))
 
-        self.log.debug('add dhcp downstream trap', access_intf_id=intf_id,
-                       onu_id=onu_id, flow_id=flow_id)
-        self.stub.FlowAdd(downstream_flow)
 
-    def add_eapol_flow(self, intf_id, onu_id, priority,
+        self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+
+    def add_eapol_flow(self, intf_id, onu_id, logical_flow,
                        uplink_eapol_id=EAPOL_FLOW_INDEX,
                        downlink_eapol_id=EAPOL_DOWNLINK_FLOW_INDEX,
                        vlan_id=DEFAULT_MGMT_VLAN):
 
-        # self.log.debug('add eapol flow pre-process',
-        #                classifier=uplink_classifier)
-        #                #action=uplink_action)
-
         downlink_classifier = {}
         downlink_classifier['eth_type'] = EAP_ETH_TYPE
         downlink_classifier['pkt_tag_type'] = 'single_tag'
@@ -313,11 +359,17 @@
 
         upstream_flow = openolt_pb2.Flow(
             onu_id=onu_id, flow_id=uplink_flow_id, flow_type="upstream",
-            access_intf_id=intf_id, gemport_id=gemport_id, priority=priority,
+            access_intf_id=intf_id, gemport_id=gemport_id,
+            priority=logical_flow.priority,
             classifier=self.mk_classifier(uplink_classifier),
             action=self.mk_action(uplink_action))
 
-        self.stub.FlowAdd(upstream_flow)
+        logical_flow = copy.deepcopy(logical_flow)
+        logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
+            vlan_id | 0x1000)]))
+        logical_flow.match.type = OFPMT_OXM
+
+        self.add_flow_to_device(upstream_flow, logical_flow)
 
         # Add Downstream EAPOL Flow.
         downlink_flow_id = platform.mk_flow_id(intf_id, onu_id,
@@ -326,13 +378,26 @@
         downstream_flow = openolt_pb2.Flow(
             onu_id=onu_id, flow_id=downlink_flow_id, flow_type="downstream",
             access_intf_id=intf_id, gemport_id=gemport_id,
+            priority=logical_flow.priority,
             classifier=self.mk_classifier(downlink_classifier),
             action=self.mk_action(downlink_action))
 
-        self.stub.FlowAdd(downstream_flow)
+        downstream_logical_flow = ofp_flow_stats(id=logical_flow.id,
+             cookie=logical_flow.cookie, table_id=logical_flow.table_id,
+             priority=logical_flow.priority, flags=logical_flow.flags)
 
-        self.log.debug('eap flows', upstream_flow=upstream_flow,
-                       downstream_flow=downstream_flow)
+        downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
+            fd.in_port(fd.get_out_port(logical_flow)),
+            fd.eth_type(EAP_ETH_TYPE), fd.vlan_vid(vlan_id | 0x1000)]))
+        downstream_logical_flow.match.type = OFPMT_OXM
+
+        downstream_logical_flow.instructions.extend(
+            fd.mk_instructions_from_actions([fd.output(
+            platform.mk_uni_port_num(intf_id, onu_id))]))
+
+        self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+
+
 
     def mk_classifier(self, classifier_info):
 
@@ -385,7 +450,7 @@
         return action
 
     def is_eap_enabled(self, intf_id, onu_id):
-        flows = self.flow_proxy.get('/').items
+        flows = self.logical_flows_proxy.get('/').items
 
         for flow in flows:
             eap_flow = False
@@ -405,6 +470,81 @@
                                intf_id=intf_id, eap_intf_id=eap_intf_id,
                                eap_onu_id=eap_onu_id)
             if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id:
-                return (True, flow.priority)
+                return (True, flow)
 
-        return (False, 0)
+        return (False, None)
+
+    def add_flow_to_device(self, flow, logical_flow):
+        self.log.debug('pushing flow to device', flow=flow)
+        self.stub.FlowAdd(flow)
+        self.register_flow(logical_flow, flow)
+
+    def register_flow(self, logical_flow, device_flow):
+        self.log.debug('registering flow in device',
+                       logical_flow=logical_flow, device_flow=device_flow)
+        stored_flow = copy.deepcopy(logical_flow)
+        stored_flow.id = self.generate_stored_id(device_flow.flow_id,
+                                                 device_flow.flow_type)
+        self.log.debug('generated device flow id', id=stored_flow.id,
+                       flow_id=device_flow.flow_id,
+                       direction=device_flow.flow_type)
+        stored_flow.cookie = logical_flow.id
+        flows = self.flows_proxy.get('/')
+        flows.items.extend([stored_flow])
+        self.flows_proxy.update('/', flows)
+
+
+    def find_next_flow(self, flow):
+        table_id = fd.get_goto_table_id(flow)
+        metadata = 0
+        for field in fd.get_ofb_fields(flow):
+            if field.type == fd.METADATA:
+                metadata = field.table_metadata
+        if table_id is None:
+            return None
+        flows = self.logical_flows_proxy.get('/').items
+        next_flows = []
+        for f in flows:
+            if f.table_id == table_id:
+                #FIXME:
+                if fd.get_in_port(f) == fd.get_in_port(flow) and \
+                        fd.get_out_port(f) == metadata:
+
+                    next_flows.append(f)
+
+
+        if len(next_flows) == 0:
+            self.log.warning('no next flow found, it may be a timing issue',
+                             flow=flow, number_of_flows=len(flows))
+            reactor.callLater(5, self.add_flow, flow)
+            return None
+
+        next_flows.sort(key=lambda f:f.priority, reverse=True)
+
+        return next_flows[0]
+
+    def update_children_flows(self, device_rules_map):
+
+        for device_id, (flows, groups) in device_rules_map.iteritems():
+            if device_id != self.device_id:
+                self.root_proxy.update('/devices/{}/flows'.format(device_id),
+                                       Flows(items=flows.values()))
+                self.root_proxy.update('/devices/{}/flow_groups'.format(
+                    device_id), FlowGroups(items=groups.values()))
+
+    def generate_stored_id(self, flow_id, direction):
+        if direction == 'upstream':
+            self.log.debug('upstream flow, shifting id')
+            return 0x1 << 15 | flow_id
+        elif direction == 'downstream':
+            self.log.debug('downstream flow, not shifting id')
+            return flow_id
+        else:
+            self.log.warn('Unrecognized direction', direction=direction)
+            return flow_id
+
+    def decode_stored_id(self, id):
+        if id >> 15 == 0x1:
+            return (id & 0x7fff, 'upstream')
+        else:
+            return (id, 'downstream')
\ No newline at end of file
diff --git a/voltha/adapters/openolt/openolt_platform.py b/voltha/adapters/openolt/openolt_platform.py
index fc5756e..c2a9b5e 100644
--- a/voltha/adapters/openolt/openolt_platform.py
+++ b/voltha/adapters/openolt/openolt_platform.py
@@ -162,3 +162,22 @@
     except Exception as err:
         raise Exception(err)
 
+def extract_access_from_flow(in_port, out_port):
+    if is_upstream(in_port, out_port):
+        return (intf_id_from_uni_port_num(in_port), onu_id_from_port_num(
+            in_port))
+    else:
+        return (intf_id_from_uni_port_num(out_port), onu_id_from_port_num(
+            out_port))
+
+def is_upstream(in_port, out_port):
+    #FIXME
+    if out_port in [128, 129, 130, 131, 0xfffd, 0xfffffffd]:
+        return True
+    # if in_port not in [128, 129, 130, 131]:
+    #     return True
+
+    return False
+
+def is_downstream(in_port, out_port):
+    return not is_upstream(in_port, out_port)
\ No newline at end of file
diff --git a/voltha/adapters/openolt/protos/openolt.proto b/voltha/adapters/openolt/protos/openolt.proto
index c4d4fd0..00509f6 100644
--- a/voltha/adapters/openolt/protos/openolt.proto
+++ b/voltha/adapters/openolt/protos/openolt.proto
@@ -81,6 +81,13 @@
         };
     }
 
+    rpc FlowRemove(Flow) returns (Empty) {
+        option (google.api.http) = {
+          post: "/v1/FlowRemove"
+          body: "*"
+        };
+    }
+
     rpc HeartbeatCheck(Empty) returns (Heartbeat) {
         option (google.api.http) = {
           post: "/v1/HeartbeatCheck"
diff --git a/voltha/core/flow_decomposer.py b/voltha/core/flow_decomposer.py
index 726e8df..f4d978e 100644
--- a/voltha/core/flow_decomposer.py
+++ b/voltha/core/flow_decomposer.py
@@ -335,6 +335,22 @@
 def has_group(flow):
     return get_group(flow) is not None
 
+def mk_oxm_fields(match_fields):
+    oxm_fields=[
+        ofp.ofp_oxm_field(
+            oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+            ofb_field=field
+        ) for field in match_fields
+        ]
+
+    return oxm_fields
+
+def mk_instructions_from_actions(actions):
+    instructions_action = ofp.ofp_instruction_actions()
+    instructions_action.actions.extend(actions)
+    instruction = ofp.ofp_instruction(type=ofp.OFPIT_APPLY_ACTIONS,
+                                      actions=instructions_action)
+    return [instruction]
 
 def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
                        next_table_id=None, **kw):
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index eaf301a..11afee4 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -23,6 +23,7 @@
 
 from common.event_bus import EventBusClient
 from common.frameio.frameio import hexify
+from voltha.registry import registry
 from voltha.core.config.config_proxy import CallbackType
 from voltha.core.device_graph import DeviceGraph
 from voltha.core.flow_decomposer import FlowDecomposer, \
@@ -80,6 +81,30 @@
 
             self._routes = None
             self._no_flow_changes_required = False
+            self._flows_ids_to_add = []
+            self._flows_ids_to_remove = []
+            self._flows_to_remove = []
+
+            self.accepts_direct_logical_flows = False
+            self.device_id = self.self_proxy.get('/').root_device_id
+            device_adapter_type = self.root_proxy.get('/devices/{}'.format(
+                self.device_id)).adapter
+            device_type = self.root_proxy.get('/device_types/{}'.format(
+                device_adapter_type))
+
+            if device_type is not None:
+                self.accepts_direct_logical_flows = \
+                    device_type.accepts_direct_logical_flows_update
+
+            if self.accepts_direct_logical_flows:
+
+                self.device_adapter_agent = registry(
+                    'adapter_loader').get_agent(device_adapter_type).adapter
+
+                self.log.debug('this device accepts direct logical flows',
+                               device_adapter_type=device_adapter_type)
+
+
 
         except Exception, e:
             self.log.exception('init-error', e=e)
@@ -546,15 +571,24 @@
         current_flow_ids = set(f.id for f in current_flows.items)
         desired_flow_ids = set(f.id for f in flows.items)
 
-        ids_to_add = desired_flow_ids.difference(current_flow_ids)
-        ids_to_del = current_flow_ids.difference(desired_flow_ids)
+        self._flows_ids_to_add = desired_flow_ids.difference(current_flow_ids)
+        self._flows_ids_to_remove = current_flow_ids.difference(desired_flow_ids)
+        self._flows_to_remove = []
+        for f in current_flows.items:
+            if f.id in self._flows_ids_to_remove:
+                self._flows_to_remove.append(f)
 
-        if len(ids_to_add) + len(ids_to_del) == 0:
+        if len(self._flows_ids_to_add) + len(self._flows_ids_to_remove) == 0:
             # No changes of flows, just stats are changing
             self._no_flow_changes_required = True
         else:
             self._no_flow_changes_required = False
 
+        self.log.debug('flows-preprocess-output', current_flows=len(
+            current_flow_ids), new_flows=len(desired_flow_ids),
+                      adding_flows=len(self._flows_ids_to_add),
+                      removing_flows=len(self._flows_ids_to_remove))
+
 
     def _flow_table_updated(self, flows):
         self.log.debug('flow-table-updated',
@@ -565,18 +599,51 @@
             self.log.debug('flow-stats-update')
         else:
 
+            groups = self.groups_proxy.get('/').items
+            device_rules_map = self.decompose_rules(flows.items, groups)
+
             # TODO we have to evolve this into a policy-based, event based pattern
             # This is a raw implementation of the specific use-case with certain
             # built-in assumptions, and not yet device vendor specific. The policy-
             # based refinement will be introduced that later.
 
-            groups = self.groups_proxy.get('/').items
-            device_rules_map = self.decompose_rules(flows.items, groups)
-            for device_id, (flows, groups) in device_rules_map.iteritems():
-                self.root_proxy.update('/devices/{}/flows'.format(device_id),
-                                       Flows(items=flows.values()))
-                self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
-                                       FlowGroups(items=groups.values()))
+
+            # Temporary bypass for openolt
+
+            if self.accepts_direct_logical_flows:
+                #give the logical flows directly to the adapter
+                self.log.debug('it is an direct logical flow bypass')
+                if self.device_adapter_agent is None:
+                    self.log.error('No device adapter agent',
+                                   device_id=self.device_id,
+                                   logical_device_id = self.logical_device_id)
+                    return
+
+                flows_to_add = []
+                for f in flows.items:
+                    if f.id in self._flows_ids_to_add:
+                        flows_to_add.append(f)
+
+
+                self.log.debug('flows to remove',
+                               flows_to_remove=self._flows_to_remove,
+                               flows_ids=self._flows_ids_to_remove)
+
+                try:
+                    self.device_adapter_agent.update_logical_flows(
+                        self.device_id, flows_to_add, self._flows_to_remove,
+                        groups, device_rules_map)
+                except Exception as e:
+                    self.log.error('logical flows bypass error', error=e,
+                                   flows=flows)
+            else:
+
+                for device_id, (flows, groups) in device_rules_map.iteritems():
+
+                    self.root_proxy.update('/devices/{}/flows'.format(device_id),
+                                           Flows(items=flows.values()))
+                    self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+                                           FlowGroups(items=groups.values()))
 
     # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index e0f306f..d8b4b23 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -30,6 +30,7 @@
 
     bool accepts_bulk_flow_update = 3;
     bool accepts_add_remove_flow_updates = 4;
+    bool accepts_direct_logical_flows_update = 7;
 
 }