VOL-1366: Prevent numerous reflows to BAL

Change-Id: I3bf5d9504688d69c05e03a366a5ad70d4c8b3c64
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index e298db2..acfcbfd 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -255,7 +255,7 @@
                         # between DS and US.
                         return
 
-            self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
+            self.resource_mgr.free_flow_id(pon_intf, onu_id, uni_id, flow_id)
         else:
             self.log.error("invalid-info", uni_port_no=uni_port_no,
                            child_device_id=child_device_id)
@@ -534,35 +534,41 @@
         flow_store_cookie = self._get_flow_store_cookie(classifier,
                                                         gemport_id)
 
-        # One of the OLT platform (Broadcom BAL) requires that symmetric
-        # flows require the same flow_id to be used across UL and DL.
-        # Since HSIA flow is the only symmetric flow currently, we need to
-        # re-use the flow_id across both direction. The 'flow_category'
-        # takes priority over flow_cookie to find any available HSIA_FLOW
-        # id for the ONU.
-        flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
-                                                flow_store_cookie,
-                                                HSIA_FLOW)
-        if flow_id is None:
-            self.log.error("hsia-flow-unavailable")
-            return
-        flow = openolt_pb2.Flow(
-            access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
-            flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
-            gemport_id=gemport_id,
-            classifier=self.mk_classifier(classifier),
-            action=self.mk_action(action),
-            priority=logical_flow.priority,
-            port_no=port_no,
-            cookie=logical_flow.cookie)
+        if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+                                                        flow_store_cookie):
+            self.log.debug('flow-exists--not-re-adding')
+        else:
 
-        if self.add_flow_to_device(flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(flow,
-                                                         flow_store_cookie,
-                                                         HSIA_FLOW)
-            self.update_flow_info_to_kv_store(flow.access_intf_id,
-                                              flow.onu_id, flow.uni_id,
-                                              flow.flow_id, flow_info)
+            # One of the OLT platform (Broadcom BAL) requires that symmetric
+            # flows require the same flow_id to be used across UL and DL.
+            # Since HSIA flow is the only symmetric flow currently, we need to
+            # re-use the flow_id across both direction. The 'flow_category'
+            # takes priority over flow_cookie to find any available HSIA_FLOW
+            # id for the ONU.
+            flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+                                                    flow_store_cookie,
+                                                    HSIA_FLOW)
+            if flow_id is None:
+                self.log.error("hsia-flow-unavailable")
+                return
+
+            flow = openolt_pb2.Flow(
+                access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
+                flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+                gemport_id=gemport_id,
+                classifier=self.mk_classifier(classifier),
+                action=self.mk_action(action),
+                priority=logical_flow.priority,
+                port_no=port_no,
+                cookie=logical_flow.cookie)
+
+            if self.add_flow_to_device(flow, logical_flow):
+                flow_info = self._get_flow_info_as_json_blob(flow,
+                                                             flow_store_cookie,
+                                                             HSIA_FLOW)
+                self.update_flow_info_to_kv_store(flow.access_intf_id,
+                                                  flow.onu_id, flow.uni_id,
+                                                  flow.flow_id, flow_info)
 
     def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
                       alloc_id, gemport_id):
@@ -579,27 +585,31 @@
 
         flow_store_cookie = self._get_flow_store_cookie(classifier,
                                                         gemport_id)
+        if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+                                                        flow_store_cookie):
+            self.log.debug('flow-exists--not-re-adding')
+        else:
+            flow_id = self.resource_mgr.get_flow_id(
+                intf_id, onu_id, uni_id, flow_store_cookie
+            )
 
-        flow_id = self.resource_mgr.get_flow_id(
-            intf_id, onu_id, uni_id, flow_store_cookie
-        )
-        dhcp_flow = openolt_pb2.Flow(
-            onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
-            access_intf_id=intf_id, gemport_id=gemport_id,
-            alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
-            priority=logical_flow.priority,
-            classifier=self.mk_classifier(classifier),
-            action=self.mk_action(action),
-            port_no=port_no,
-            cookie=logical_flow.cookie)
+            dhcp_flow = openolt_pb2.Flow(
+                onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
+                access_intf_id=intf_id, gemport_id=gemport_id,
+                alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+                priority=logical_flow.priority,
+                classifier=self.mk_classifier(classifier),
+                action=self.mk_action(action),
+                port_no=port_no,
+                cookie=logical_flow.cookie)
 
-        if self.add_flow_to_device(dhcp_flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
-            self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
-                                              dhcp_flow.onu_id,
-                                              dhcp_flow.uni_id,
-                                              dhcp_flow.flow_id,
-                                              flow_info)
+            if self.add_flow_to_device(dhcp_flow, logical_flow):
+                flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
+                self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
+                                                  dhcp_flow.onu_id,
+                                                  dhcp_flow.uni_id,
+                                                  dhcp_flow.flow_id,
+                                                  flow_info)
 
     def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
                        gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
@@ -614,34 +624,39 @@
 
         flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
                                                         gemport_id)
-        # Add Upstream EAPOL Flow.
-        uplink_flow_id = self.resource_mgr.get_flow_id(
-            intf_id, onu_id, uni_id, flow_store_cookie
-        )
 
-        upstream_flow = openolt_pb2.Flow(
-            access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
-            flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
-            gemport_id=gemport_id,
-            classifier=self.mk_classifier(uplink_classifier),
-            action=self.mk_action(uplink_action),
-            priority=logical_flow.priority,
-            port_no=port_no,
-            cookie=logical_flow.cookie)
+        if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+                                                        flow_store_cookie):
+            self.log.debug('flow-exists--not-re-adding')
+        else:
+            # Add Upstream EAPOL Flow.
+            uplink_flow_id = self.resource_mgr.get_flow_id(
+                intf_id, onu_id, uni_id, flow_store_cookie
+            )
 
-        logical_flow = copy.deepcopy(logical_flow)
-        logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
-            vlan_id | 0x1000)]))
-        logical_flow.match.type = OFPMT_OXM
+            upstream_flow = openolt_pb2.Flow(
+                access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
+                flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+                gemport_id=gemport_id,
+                classifier=self.mk_classifier(uplink_classifier),
+                action=self.mk_action(uplink_action),
+                priority=logical_flow.priority,
+                port_no=port_no,
+                cookie=logical_flow.cookie)
 
-        if self.add_flow_to_device(upstream_flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(upstream_flow,
-                                                         flow_store_cookie)
-            self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
-                                              upstream_flow.onu_id,
-                                              upstream_flow.uni_id,
-                                              upstream_flow.flow_id,
-                                              flow_info)
+            logical_flow = copy.deepcopy(logical_flow)
+            logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
+                vlan_id | 0x1000)]))
+            logical_flow.match.type = OFPMT_OXM
+
+            if self.add_flow_to_device(upstream_flow, logical_flow):
+                flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+                                                             flow_store_cookie)
+                self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+                                                  upstream_flow.onu_id,
+                                                  upstream_flow.uni_id,
+                                                  upstream_flow.flow_id,
+                                                  flow_info)
 
         if vlan_id == DEFAULT_MGMT_VLAN:
             # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
@@ -667,43 +682,47 @@
 
             flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
                                                             gemport_id)
+            if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+                                                            flow_store_cookie):
+                self.log.debug('flow-exists--not-re-adding')
+            else:
 
-            downlink_flow_id = self.resource_mgr.get_flow_id(
-                intf_id, onu_id, uni_id, flow_store_cookie
-            )
+                downlink_flow_id = self.resource_mgr.get_flow_id(
+                    intf_id, onu_id, uni_id, flow_store_cookie
+                )
 
-            downstream_flow = openolt_pb2.Flow(
-                access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
-                flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
-                gemport_id=gemport_id,
-                classifier=self.mk_classifier(downlink_classifier),
-                action=self.mk_action(downlink_action),
-                priority=logical_flow.priority,
-                port_no=port_no,
-                cookie=logical_flow.cookie)
+                downstream_flow = openolt_pb2.Flow(
+                    access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
+                    flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+                    gemport_id=gemport_id,
+                    classifier=self.mk_classifier(downlink_classifier),
+                    action=self.mk_action(downlink_action),
+                    priority=logical_flow.priority,
+                    port_no=port_no,
+                    cookie=logical_flow.cookie)
 
-            downstream_logical_flow = ofp_flow_stats(
-                id=logical_flow.id, cookie=logical_flow.cookie,
-                table_id=logical_flow.table_id, priority=logical_flow.priority,
-                flags=logical_flow.flags)
+                downstream_logical_flow = ofp_flow_stats(
+                    id=logical_flow.id, cookie=logical_flow.cookie,
+                    table_id=logical_flow.table_id, priority=logical_flow.priority,
+                    flags=logical_flow.flags)
 
-            downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
-                fd.in_port(fd.get_out_port(logical_flow)),
-                fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
-            downstream_logical_flow.match.type = OFPMT_OXM
+                downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
+                    fd.in_port(fd.get_out_port(logical_flow)),
+                    fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
+                downstream_logical_flow.match.type = OFPMT_OXM
 
-            downstream_logical_flow.instructions.extend(
-                fd.mk_instructions_from_actions([fd.output(
-                    self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
+                downstream_logical_flow.instructions.extend(
+                    fd.mk_instructions_from_actions([fd.output(
+                        self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
 
-            if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
-                flow_info = self._get_flow_info_as_json_blob(downstream_flow,
-                                                             flow_store_cookie)
-                self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
-                                                  downstream_flow.onu_id,
-                                                  downstream_flow.uni_id,
-                                                  downstream_flow.flow_id,
-                                                  flow_info)
+                if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
+                    flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+                                                                 flow_store_cookie)
+                    self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
+                                                      downstream_flow.onu_id,
+                                                      downstream_flow.uni_id,
+                                                      downstream_flow.flow_id,
+                                                      flow_info)
 
     def repush_all_different_flows(self):
         # Check if the device is supposed to have flows, if so add them
@@ -746,28 +765,35 @@
         onu_id = -1
         uni_id = -1
         flow_store_cookie = self._get_flow_store_cookie(classifier)
-        flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
-                                                flow_store_cookie)
 
-        downstream_flow = openolt_pb2.Flow(
-            access_intf_id=-1,  # access_intf_id not required
-            onu_id=onu_id, # onu_id not required
-            uni_id=uni_id, # uni_id not used
-            flow_id=flow_id,
-            flow_type=DOWNSTREAM,
-            network_intf_id=network_intf_id,
-            gemport_id=-1,  # gemport_id not required
-            classifier=self.mk_classifier(classifier),
-            action=self.mk_action(action),
-            priority=logical_flow.priority,
-            port_no=port_no,
-            cookie=logical_flow.cookie)
+        if self.resource_mgr.is_flow_cookie_on_kv_store(network_intf_id, onu_id, uni_id,
+                                                            flow_store_cookie):
+            self.log.debug('flow-exists--not-re-adding')
+        else:
+            flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+                                                    flow_store_cookie)
 
-        self.log.debug('add lldp downstream trap', classifier=classifier,
-                       action=action, flow=downstream_flow, port_no=port_no)
-        if self.add_flow_to_device(downstream_flow, logical_flow):
-            self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
-                                              flow_id, downstream_flow)
+            downstream_flow = openolt_pb2.Flow(
+                access_intf_id=-1,  # access_intf_id not required
+                onu_id=onu_id, # onu_id not required
+                uni_id=uni_id, # uni_id not used
+                flow_id=flow_id,
+                flow_type=DOWNSTREAM,
+                network_intf_id=network_intf_id,
+                gemport_id=-1,  # gemport_id not required
+                classifier=self.mk_classifier(classifier),
+                action=self.mk_action(action),
+                priority=logical_flow.priority,
+                port_no=port_no,
+                cookie=logical_flow.cookie)
+
+            self.log.debug('add lldp downstream trap', classifier=classifier,
+                           action=action, flow=downstream_flow, port_no=port_no)
+            if self.add_flow_to_device(downstream_flow, logical_flow):
+                flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+                                                             flow_store_cookie)
+                self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
+                                                  flow_id, flow_info)
 
     def mk_classifier(self, classifier_info):
 
@@ -884,8 +910,8 @@
             return True
 
     def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id, flow):
-        self.resource_mgr.update_flow_id_info_for_uni(intf_id, onu_id, uni_id,
-                                                      flow_id, flow)
+        self.resource_mgr.update_flow_id_info(intf_id, onu_id, uni_id,
+                                              flow_id, flow)
 
     def register_flow(self, logical_flow, device_flow):
         self.log.debug('registering flow in device',
@@ -962,7 +988,7 @@
             get_tech_profile_instance(
             DEFAULT_TECH_PROFILE_TABLE_ID,
             ofp_port_name)
-        flow_ids = self.resource_mgr.get_current_flow_ids_for_uni(pon_port, onu_id, uni_id)
+        flow_ids = self.resource_mgr.get_current_flow_ids(pon_port, onu_id, uni_id)
         self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
         for flow_id in flow_ids:
             flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id, uni_id, flow_id)
@@ -980,7 +1006,7 @@
                     else:
                         raise grpc_e
 
-                self.resource_mgr.free_flow_id_for_uni(pon_port, onu_id, uni_id, flow_id)
+                self.resource_mgr.free_flow_id(pon_port, onu_id, uni_id, flow_id)
 
         try:
             tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
@@ -1028,8 +1054,19 @@
         json_blob['flow_store_cookie'] = flow_store_cookie
         if flow_category is not None:
             json_blob['flow_category'] = flow_category
-        flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
-                                                       flow.onu_id, flow.uni_id, flow.flow_id)
+
+        # For flows which trap out of the NNI, the access_intf_id is invalid (set to -1).
+        # In such cases, we need to refer to the network_intf_id.
+        if flow.access_intf_id != -1:
+            flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
+                                                           flow.onu_id, flow.uni_id,
+                                                           flow.flow_id)
+        else:
+            # Case of LLDP trap flow from the NNI. We can't use flow.access_intf_id
+            # in that case, as it is invalid. We use flow.network_intf_id.
+            flow_info = self.resource_mgr.get_flow_id_info(flow.network_intf_id,
+                                                           flow.onu_id, flow.uni_id,
+                                                           flow.flow_id)
 
         if flow_info is None:
             flow_info = list()
@@ -1060,3 +1097,4 @@
         self.nni_intf_id = self.platform.intf_id_from_nni_port_num(logical_port.ofp_port.port_no)
         self.log.debug("nni-intf-d ", nni_intf_id=self.nni_intf_id)
         return self.nni_intf_id
+