VOL-1344:
1) Place all of resource manager and tech profile KV store data under /service/voltha
2) Ensure flow_ids are released on the KV store when device is deleted
3) Ensure pon resources are re-used on voltha restart
4) Few other code re-organization and bug fixes

Change-Id: Ia7bc8062d88b7a8eec5d4b87209536d81b115575
diff --git a/common/pon_resource_manager/resource_kv_store.py b/common/pon_resource_manager/resource_kv_store.py
index a1a5c14..307b0cd 100644
--- a/common/pon_resource_manager/resource_kv_store.py
+++ b/common/pon_resource_manager/resource_kv_store.py
@@ -21,7 +21,7 @@
 from voltha.core.config.config_backend import EtcdStore

 

 # KV store uses this prefix to store resource info

-PATH_PREFIX = 'resource_manager/{}'

+PATH_PREFIX = 'service/voltha/resource_manager/{}'

 

 

 class ResourceKvStore(object):

diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index a88b407..bdb45ee 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -578,7 +578,17 @@
                          else False
         """
         status = False
-
+        known_resource_types = [PONResourceManager.ONU_ID,
+                                PONResourceManager.ALLOC_ID,
+                                PONResourceManager.GEMPORT_ID,
+                                PONResourceManager.FLOW_ID]
+        if resource_type not in known_resource_types:
+            self._log.error("unknown-resource-type",
+                            resource_type=resource_type)
+            return status
+        if release_content is None:
+            self._log.debug("nothing-to-release")
+            return status
         # delegate to the master instance if sharing enabled across instances
         shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
         if shared_resource_mgr is not None and shared_resource_mgr is not self:
@@ -590,17 +600,13 @@
 
         try:
             resource = self._get_resource(path)
-            if resource is not None and (
-                    resource_type == PONResourceManager.ONU_ID or
-                    resource_type == PONResourceManager.FLOW_ID):
-                self._release_id(resource, release_content)
-            elif resource is not None and (
-                    resource_type == PONResourceManager.ALLOC_ID or
-                    resource_type == PONResourceManager.GEMPORT_ID):
+            if resource is None:
+                raise Exception("get-resource-failed")
+            if isinstance(release_content, list):
                 for content in release_content:
                     self._release_id(resource, content)
             else:
-                raise Exception("get-resource-failed")
+                self._release_id(resource, release_content)
 
             self._log.debug("Free-" + resource_type + "-success", path=path)
 
@@ -673,16 +679,40 @@
         :param pon_intf_onu_id: reference of PON interface id and onu id
         """
         # remove pon_intf_onu_id tuple to alloc_ids map
-        alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
-            self.device_id, str(pon_intf_onu_id)
-        )
-        self._kv_store.remove_from_kv_store(alloc_id_path)
+        try:
+            alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+                self.device_id, str(pon_intf_onu_id)
+            )
+            self._kv_store.remove_from_kv_store(alloc_id_path)
+        except Exception as e:
+            self._log.error("error-removing-alloc-id", e=e)
 
-        # remove pon_intf_onu_id tuple to gemport_ids map
-        gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
-            self.device_id, str(pon_intf_onu_id)
-        )
-        self._kv_store.remove_from_kv_store(gemport_id_path)
+        try:
+            # remove pon_intf_onu_id tuple to gemport_ids map
+            gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+                self.device_id, str(pon_intf_onu_id)
+            )
+            self._kv_store.remove_from_kv_store(gemport_id_path)
+        except Exception as e:
+            self._log.error("error-removing-gem-ports", e=e)
+
+        flow_id_path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id))
+        flow_ids = self._kv_store.get_from_kv_store(flow_id_path)
+
+        if flow_ids and isinstance(flow_ids, list):
+            for flow_id in flow_ids:
+                try:
+                    flow_id_info_path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+                                        self.device_id, str(pon_intf_onu_id), flow_id)
+                    self._kv_store.remove_from_kv_store(flow_id_info_path)
+                except Exception as e:
+                    self._log.error("error-removing-flow-info", flow_id=flow_id, e=e)
+                    continue
+        try:
+            self._kv_store.remove_from_kv_store(flow_id_path)
+        except Exception as e:
+            self._log.error("error-removing-flow-ids", e=e)
 
     def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
         """
diff --git a/common/tech_profile/tech_profile.py b/common/tech_profile/tech_profile.py
index c3a9993..150667e 100644
--- a/common/tech_profile/tech_profile.py
+++ b/common/tech_profile/tech_profile.py
@@ -24,7 +24,6 @@
 from voltha.registry import registry

 from voltha.adapters.openolt.protos import openolt_pb2

 

-

 # logger

 log = structlog.get_logger()

 

@@ -126,7 +125,7 @@
     pbits = ['0b11111111']

 

     # Tech profile path prefix in kv store

-    KV_STORE_TECH_PROFILE_PATH_PREFIX = 'voltha/technology_profiles'

+    KV_STORE_TECH_PROFILE_PATH_PREFIX = 'service/voltha/technology_profiles'

 

     # Tech profile path in kv store

     TECH_PROFILE_PATH = '{}/{}'  # <technology>/<table_id>

@@ -174,13 +173,13 @@
                 host, port = self.args.etcd.split(':', 1)

                 self._kv_store = EtcdStore(

                     host, port, TechProfile.

-                        KV_STORE_TECH_PROFILE_PATH_PREFIX)

+                    KV_STORE_TECH_PROFILE_PATH_PREFIX)

             elif self.args.backend == 'consul':

                 # KV store's IP Address and PORT

                 host, port = self.args.consul.split(':', 1)

                 self._kv_store = ConsulStore(

                     host, port, TechProfile.

-                        KV_STORE_TECH_PROFILE_PATH_PREFIX)

+                    KV_STORE_TECH_PROFILE_PATH_PREFIX)

 

             # self.tech_profile_instance_store = dict()

         except Exception as e:

@@ -257,17 +256,14 @@
                       path=path, tech_profile_instance=None, exception=e)

             return None

 

-    def delete_tech_profile_instance(self, table_id, uni_port_name):

-        # path to delete tech profile instance json from kv store

-        path = TechProfile.TECH_PROFILE_INSTANCE_PATH.format(

-            self.resource_mgr.technology, table_id, uni_port_name)

+    def delete_tech_profile_instance(self, tp_path):

 

         try:

-            del self._kv_store[path]

-            log.debug("Delete-tech-profile-instance-success", path=path)

+            del self._kv_store[tp_path]

+            log.debug("Delete-tech-profile-instance-success", path=tp_path)

             return True

         except Exception as e:

-            log.debug("Delete-tech-profile-instance-failed", path=path,

+            log.debug("Delete-tech-profile-instance-failed", path=tp_path,

                       exception=e)

             return False

 

@@ -537,7 +533,7 @@
 

             gemport_list = list()

             if isinstance(gem_ports, int):

-               gemport_list.append(gem_ports)

+                gemport_list.append(gem_ports)

             elif isinstance(gem_ports, list):

                 for gem in gem_ports:

                     gemport_list.append(gem)

diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index fde2591..45bc45e 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -664,8 +664,10 @@
             reactor.callLater(0, self._onu_omci_device.stop)
 
             # Let TP download happen again
-            for i in self._tp_service_specific_task: i.clear()
-            for i in self._tech_profile_download_done: i.clear()
+            for uni_id in self._tp_service_specific_task:
+                self._tp_service_specific_task[uni_id].clear()
+            for uni_id in self._tech_profile_download_done:
+                self._tech_profile_download_done[uni_id].clear()
 
             self.disable_ports(onu_device)
             onu_device.reason = "stopping-openomci"
@@ -685,8 +687,10 @@
         reactor.callLater(0, self._onu_omci_device.stop)
 
         # Let TP download happen again
-        for i in self._tp_service_specific_task: i.clear()
-        for i in self._tech_profile_download_done: i.clear()
+        for uni_id in self._tp_service_specific_task:
+            self._tp_service_specific_task[uni_id].clear()
+        for uni_id in self._tech_profile_download_done:
+            self._tech_profile_download_done[uni_id].clear()
 
         self.disable_ports(onu_device)
         onu_device.reason = "stopping-openomci"
@@ -731,8 +735,10 @@
                 reactor.callLater(0, self._onu_omci_device.stop)
 
                 # Let TP download happen again
-                for i in self._tp_service_specific_task: i.clear()
-                for i in self._tech_profile_download_done: i.clear()
+                for uni_id in self._tp_service_specific_task:
+                    self._tp_service_specific_task[uni_id].clear()
+                for uni_id in self._tech_profile_download_done:
+                    self._tech_profile_download_done[uni_id].clear()
 
                 self.disable_ports(device)
                 device.oper_status = OperStatus.UNKNOWN
@@ -1035,4 +1041,4 @@
         self._pon._port = pon_port
 
         self.adapter_agent.add_port_reference_to_parent(self.device_id,
-                                                        pon_port)
\ No newline at end of file
+                                                        pon_port)
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 0858005..7332c4a 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -1007,17 +1007,20 @@
         # TODO FIXME - For each uni.
         # TODO FIXME - Flows are not deleted
         uni_id = 0  # FIXME
+        self.flow_mgr.delete_tech_profile_instance(
+                    child_device.proxy_address.channel_id,
+                    child_device.proxy_address.onu_id,
+                    uni_id
+        )
         pon_intf_id_onu_id = (child_device.proxy_address.channel_id,
                               child_device.proxy_address.onu_id,
                               uni_id)
-        alloc_id = self.resource_mgr.get_alloc_id(pon_intf_id_onu_id)
         # Free any PON resources that were reserved for the ONU
         self.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
 
         onu = openolt_pb2.Onu(intf_id=child_device.proxy_address.channel_id,
                               onu_id=child_device.proxy_address.onu_id,
-                              serial_number=serial_number,
-                              alloc_id=alloc_id)
+                              serial_number=serial_number)
         self.stub.DeleteOnu(onu)
 
     def reboot(self):
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1b996e7..3b08edc 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -17,6 +17,8 @@
 from twisted.internet import reactor
 import grpc
 from google.protobuf.json_format import MessageToDict
+import hashlib
+from simplejson import dumps
 
 from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
     ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -30,11 +32,6 @@
 
 # Flow categories
 HSIA_FLOW = "HSIA_FLOW"
-DHCP_FLOW = "DHCP_FLOW"
-EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
-EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
-IGMP_FLOW = "IGMP_FLOW"
-LLDP_FLOW = "LLDP_FLOW"
 
 EAP_ETH_TYPE = 0x888e
 LLDP_ETH_TYPE = 0x88cc
@@ -71,7 +68,6 @@
 TRAP_TO_HOST = 'trap_to_host'
 
 
-
 class OpenOltFlowMgr(object):
 
     def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
@@ -215,7 +211,6 @@
 
     def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
         uni_port_no = None
-        flow_category = HSIA_FLOW  # default
         child_device_id = None
         if flow_direction == UPSTREAM:
             for field in fd.get_ofb_fields(flow):
@@ -223,15 +218,6 @@
                     is_uni, child_device_id = self._is_uni_port(field.port)
                     if is_uni:
                         uni_port_no = field.port
-                elif field.type == fd.IP_PROTO:
-                    if field.ip_proto == IGMP_PROTO:
-                        flow_category = IGMP_FLOW
-                elif field.type == fd.ETH_TYPE:
-                    if field.eth_type == EAP_ETH_TYPE:
-                        flow_category = EAPOL_PRIMARY_FLOW
-                    elif field.eth_type == LLDP_ETH_TYPE:
-                        flow_category = LLDP_FLOW
-
         elif flow_direction == DOWNSTREAM:
             for field in fd.get_ofb_fields(flow):
                 if field.type == fd.METADATA:
@@ -248,7 +234,7 @@
                         if is_uni:
                             uni_port_no = action.output.port
 
-        if flow_category and child_device_id:
+        if child_device_id:
             child_device = self.adapter_agent.get_device(child_device_id)
             pon_intf = child_device.proxy_address.channel_id
             onu_id = child_device.proxy_address.onu_id
@@ -270,7 +256,6 @@
             self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
         else:
             self.log.error("invalid-info", uni_port_no=uni_port_no,
-                           flow_category=flow_category,
                            child_device_id=child_device_id)
 
     def remove_flow(self, flow):
@@ -333,6 +318,22 @@
         ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
         return ofp_port_name
 
+    def get_tp_path(self, intf_id, ofp_port_name):
+        # FIXME Should get Table id form the flow, as of now hardcoded to
+        # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+        # 'tp_path' contains the suffix part of the tech_profile_instance path.
+        # The prefix to the 'tp_path' should be set to \
+        # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
+        return self.tech_profile[intf_id]. \
+            get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+                        ofp_port_name)
+
+    def delete_tech_profile_instance(self, intf_id, onu_id, uni_id):
+        # Remove the TP instance associated with the ONU
+        ofp_port_name = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+        tp_path = self.get_tp_path(intf_id, ofp_port_name)
+        return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
+
     def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
                             action, flow):
 
@@ -372,7 +373,6 @@
                     if vlan_id is not None:
                         self.add_eapol_flow(
                             intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
-                            eapol_flow_category=EAPOL_SECONDARY_FLOW,
                             vlan_id=vlan_id)
                     parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
                     onu_device = self.adapter_agent.get_child_device(self.device_id,
@@ -383,14 +383,7 @@
                         self.log.error("port-name-not-found")
                         return
 
-                    # FIXME Should get Table id form the flow, as of now hardcoded to
-                    # DEFAULT_TECH_PROFILE_TABLE_ID (64)
-                    # 'tp_path' contains the suffix part of the tech_profile_instance path.
-                    # The prefix to the 'tp_path' should be set to \
-                    # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
-                    tp_path = self.tech_profile[intf_id]. \
-                                  get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
-                                              ofp_port_name)
+                    tp_path = self.get_tp_path(intf_id, ofp_port_name)
 
                     self.log.debug('Load-tech-profile-request-to-brcm-handler',
                                    tp_path=tp_path)
@@ -418,6 +411,16 @@
 
     def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
         alloc_id, gem_port_ids = None, None
+        pon_intf_onu_id = (intf_id, onu_id)
+
+        # If we already have allocated alloc_id and gem_ports earlier, render them
+        alloc_id = \
+            self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+        gem_port_ids = \
+            self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
+        if alloc_id is not None and gem_port_ids is not None:
+            return alloc_id, gem_port_ids
+
         try:
             (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
             if ofp_port_name is None:
@@ -437,31 +440,30 @@
                 tech_profile_instance = self.tech_profile[intf_id]. \
                     create_tech_profile_instance(table_id, ofp_port_name,
                                                  intf_id)
-                if tech_profile_instance is not None:
-
-                    # upstream scheduler
-                    us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
-                        tech_profile_instance)
-                    # downstream scheduler
-                    ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
-                        tech_profile_instance)
-                    # create Tcont
-                    tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
-                                                                   us_scheduler,
-                                                                   ds_scheduler)
-
-                    self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
-                                                              onu_id=onu_id,
-                                                              uni_id=uni_id,
-                                                              port_no=ofp_port_no,
-                                                              tconts=tconts))
-                else:
+                if tech_profile_instance is None:
                     raise Exception('Tech-profile-instance-creation-failed')
             else:
                 self.log.debug(
                     'Tech-profile-instance-already-exist-for-given port-name',
                     ofp_port_name=ofp_port_name)
 
+            # upstream scheduler
+            us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+                tech_profile_instance)
+            # downstream scheduler
+            ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+                tech_profile_instance)
+            # create Tcont
+            tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+                                                           us_scheduler,
+                                                           ds_scheduler)
+
+            self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+                                                      onu_id=onu_id,
+                                                      uni_id=uni_id,
+                                                      port_no=ofp_port_no,
+                                                      tconts=tconts))
+
             # Fetch alloc id and gemports from tech profile instance
             alloc_id = tech_profile_instance.us_scheduler.alloc_id
             gem_port_ids = []
@@ -469,7 +471,7 @@
                     tech_profile_instance.upstream_gem_port_attribute_list)):
                 gem_port_ids.append(
                     tech_profile_instance.upstream_gem_port_attribute_list[i].
-                        gemport_id)
+                    gemport_id)
         except BaseException as e:
             self.log.exception(exception=e)
 
@@ -504,8 +506,7 @@
         (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
         if eap_active:
             self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
-                                gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
-                                vlan_id=uplink_classifier[VLAN_VID])
+                                gemport_id, vlan_id=uplink_classifier[VLAN_VID])
 
     def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
                                  downlink_action, flow, alloc_id, gemport_id):
@@ -521,7 +522,18 @@
     def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
                       direction, logical_flow, alloc_id, gemport_id):
 
-        flow_id = self.resource_mgr.get_hsia_flow_for_uni(intf_id, onu_id, uni_id, gemport_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier,
+                                                        gemport_id)
+
+        # One of the OLT platform (Broadcom BAL) requires that symmetric
+        # flows require the same flow_id to be used across UL and DL.
+        # Since HSIA flow is the only symmetric flow currently, we need to
+        # re-use the flow_id across both direction. The 'flow_category'
+        # takes priority over flow_cookie to find any available HSIA_FLOW
+        # id for the ONU.
+        flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+                                                flow_store_cookie,
+                                                HSIA_FLOW)
         if flow_id is None:
             self.log.error("hsia-flow-unavailable")
             return
@@ -537,8 +549,11 @@
             cookie=logical_flow.cookie)
 
         if self.add_flow_to_device(flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
-            self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id, flow.uni_id,
+            flow_info = self._get_flow_info_as_json_blob(flow,
+                                                         flow_store_cookie,
+                                                         HSIA_FLOW)
+            self.update_flow_info_to_kv_store(flow.access_intf_id,
+                                              flow.onu_id, flow.uni_id,
                                               flow.flow_id, flow_info)
 
     def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
@@ -554,7 +569,12 @@
         classifier[PACKET_TAG_TYPE] = SINGLE_TAG
         classifier.pop(VLAN_VID, None)
 
-        flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier,
+                                                        gemport_id)
+
+        flow_id = self.resource_mgr.get_flow_id(
+            intf_id, onu_id, uni_id, flow_store_cookie
+        )
 
         dhcp_flow = openolt_pb2.Flow(
             onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
@@ -567,7 +587,7 @@
             cookie=logical_flow.cookie)
 
         if self.add_flow_to_device(dhcp_flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+            flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
             self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
                                               dhcp_flow.onu_id,
                                               dhcp_flow.uni_id,
@@ -575,8 +595,7 @@
                                               flow_info)
 
     def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
-                       gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
-                       vlan_id=DEFAULT_MGMT_VLAN):
+                       gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
 
         uplink_classifier = dict()
         uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
@@ -586,11 +605,12 @@
         uplink_action = dict()
         uplink_action[TRAP_TO_HOST] = True
 
+        flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
+                                                        gemport_id)
         # Add Upstream EAPOL Flow.
-        if eapol_flow_category == EAPOL_PRIMARY_FLOW:
-            uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
-        else:
-            uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+        uplink_flow_id = self.resource_mgr.get_flow_id(
+            intf_id, onu_id, uni_id, flow_store_cookie
+        )
 
         upstream_flow = openolt_pb2.Flow(
             access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
@@ -608,29 +628,26 @@
         logical_flow.match.type = OFPMT_OXM
 
         if self.add_flow_to_device(upstream_flow, logical_flow):
-            if eapol_flow_category == EAPOL_PRIMARY_FLOW:
-                flow_info = self._get_flow_info_as_json_blob(upstream_flow,
-                                                             EAPOL_PRIMARY_FLOW)
-                self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
-                                                  upstream_flow.onu_id,
-                                                  upstream_flow.uni_id,
-                                                  upstream_flow.flow_id,
-                                                  flow_info)
-            else:
-                flow_info = self._get_flow_info_as_json_blob(upstream_flow,
-                                                             EAPOL_SECONDARY_FLOW)
-                self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
-                                                  upstream_flow.onu_id,
-                                                  upstream_flow.uni_id,
-                                                  upstream_flow.flow_id,
-                                                  flow_info)
+            flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+                                                         flow_store_cookie)
+            self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+                                              upstream_flow.onu_id,
+                                              upstream_flow.uni_id,
+                                              upstream_flow.flow_id,
+                                              flow_info)
 
         if vlan_id == DEFAULT_MGMT_VLAN:
             # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
             # requirement)
-            # FIXME this is actually not used. But, here to be consistent with the upstream tagging from ONU.
-            # It needs refactored to be completely removed
-            special_vlan_downstream_flow = 4091 # 4000 - onu_id
+            # On one of the platforms (Broadcom BAL), when same DL classifier
+            # vlan was used across multiple ONUs, eapol flow re-adds after
+            # flow delete (cases of onu reboot/disable) fails.
+            # In order to generate unique vlan, a combination of intf_id
+            # onu_id and uni_id is used.
+            # uni_id defaults to 0, so add 1 to it.
+            special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
+            # Assert that we do not generate invalid vlans under no condition
+            assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
 
             downlink_classifier = dict()
             downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -640,7 +657,13 @@
             downlink_action[PUSH_VLAN] = True
             downlink_action[VLAN_VID] = vlan_id
 
-            downlink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+
+            flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
+                                                            gemport_id)
+
+            downlink_flow_id = self.resource_mgr.get_flow_id(
+                intf_id, onu_id, uni_id, flow_store_cookie
+            )
 
             downstream_flow = openolt_pb2.Flow(
                 access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
@@ -668,7 +691,7 @@
 
             if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
                 flow_info = self._get_flow_info_as_json_blob(downstream_flow,
-                                                             EAPOL_PRIMARY_FLOW)
+                                                             flow_store_cookie)
                 self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
                                                   downstream_flow.onu_id,
                                                   downstream_flow.uni_id,
@@ -715,7 +738,9 @@
         # *********************************************
         onu_id = -1
         uni_id = -1
-        flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier)
+        flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+                                                flow_store_cookie)
 
         downstream_flow = openolt_pb2.Flow(
             access_intf_id=-1,  # access_intf_id not required
@@ -822,7 +847,7 @@
         for flow in flows:
             in_port = fd.get_in_port(flow)
             out_port = fd.get_out_port(flow)
-            if in_port == port and \
+            if in_port == port and out_port is not None and \
                     self.platform.intf_id_to_port_type_name(out_port) \
                     == Port.ETHERNET_NNI:
                 fields = fd.get_ofb_fields(flow)
@@ -984,11 +1009,14 @@
         # the device
         assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
 
-    def _get_flow_info_as_json_blob(self, flow, flow_category):
+    def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
+                                    flow_category=None):
         json_blob = MessageToDict(message=flow,
                                   preserving_proto_field_name=True)
         self.log.debug("flow-info", json_blob=json_blob)
-        json_blob['flow_category'] = flow_category
+        json_blob['flow_store_cookie'] = flow_store_cookie
+        if flow_category is not None:
+            json_blob['flow_category'] = flow_category
         flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
                                                        flow.onu_id, flow.uni_id, flow.flow_id)
 
@@ -1000,3 +1028,13 @@
             flow_info.append(json_blob)
 
         return flow_info
+
+    @staticmethod
+    def _get_flow_store_cookie(classifier, gem_port=None):
+        assert isinstance(classifier, dict)
+        # We need unique flows per gem_port
+        if gem_port is not None:
+            to_hash = dumps(classifier, sort_keys=True) + str(gem_port)
+        else:
+            to_hash = dumps(classifier, sort_keys=True)
+        return hashlib.md5(to_hash).hexdigest()[:12]
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index 49b353d..2b6e432 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -27,7 +27,7 @@
 
 
 class OpenOltResourceMgr(object):
-    BASE_PATH_KV_STORE = "openolt/{}"  # openolt/<device_id>
+    BASE_PATH_KV_STORE = "service/voltha/openolt/{}"  # service/voltha/openolt/<device_id>
 
     def __init__(self, device_id, host_and_port, extra_args, device_info):
         self.log = structlog.get_logger(id=device_id,
@@ -137,12 +137,33 @@
 
         return onu_id
 
-    def get_flow_id(self, pon_intf_id, onu_id, uni_id):
+    def get_flow_id(self, pon_intf_id, onu_id, uni_id, flow_store_cookie,
+                    flow_category=None):
         pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+        try:
+            flow_ids = self.resource_mgrs[pon_intf_id]. \
+                get_current_flow_ids_for_onu(pon_intf_onu_id)
+            if flow_ids is not None:
+                for flow_id in flow_ids:
+                    flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
+                    assert (isinstance(flows, list))
+                    for flow in flows:
+
+                        if flow_category is not None and \
+                                'flow_category' in flow and \
+                                flow['flow_category'] == flow_category:
+                            return flow_id
+                        if flow['flow_store_cookie'] == flow_store_cookie:
+                            return flow_id
+        except Exception as e:
+            self.log.error("error-retrieving-flow-info", e=e)
+
         flow_id = self.resource_mgrs[pon_intf_id].get_resource_id(
-            pon_intf_id, PONResourceManager.FLOW_ID)
+            pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
         if flow_id is not None:
-            self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(pon_intf_onu_id, flow_id)
+            self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(
+                pon_intf_onu_id, flow_id
+            )
 
         return flow_id
 
@@ -159,24 +180,6 @@
         return self.resource_mgrs[pon_intf_id].update_flow_id_info_for_onu(
             pon_intf_onu_id, flow_id, flow_data)
 
-    def get_hsia_flow_for_uni(self, pon_intf_id, onu_id, uni_id, gemport_id):
-        pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
-        try:
-            flow_ids = self.resource_mgrs[pon_intf_id]. \
-                get_current_flow_ids_for_onu(pon_intf_onu_id)
-            if flow_ids is not None:
-                for flow_id in flow_ids:
-                    flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
-                    assert (isinstance(flows, list))
-                    for flow in flows:
-                        if flow['flow_category'] == HSIA_FLOW and \
-                                flow['gemport_id'] == gemport_id:
-                            return flow_id
-        except Exception as e:
-            self.log.error("error-retrieving-flow-info", e=e)
-
-        return self.get_flow_id(pon_intf_id, onu_id, uni_id)
-
     def get_alloc_id(self, pon_intf_onu_id):
         # Derive the pon_intf from the pon_intf_onu_id tuple
         pon_intf = pon_intf_onu_id[0]
@@ -207,9 +210,16 @@
 
     def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
         pon_intf_id = pon_intf_onu_id[0]
-        assert False, 'unused function'
         return self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_onu_id)
 
+    def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+        pon_intf_id = pon_intf_onu_id[0]
+        alloc_ids = self.resource_mgrs[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_onu_id)
+        if alloc_ids is None:
+            return None
+        # We support only one tcont at the moment
+        return alloc_ids[0]
+
     def update_gemports_ponport_to_onu_map_on_kv_store(self, gemport_list, pon_port, onu_id, uni_id):
         for gemport in gemport_list:
             pon_intf_gemport = (pon_port, gemport)
@@ -286,6 +296,12 @@
                                                          PONResourceManager.GEMPORT_ID,
                                                          gemport_ids)
 
+        flow_ids = \
+            self.resource_mgrs[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_id_onu_id)
+        self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
+                                                         PONResourceManager.FLOW_ID,
+                                                         flow_ids)
+
         self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
                                                          PONResourceManager.ONU_ID,
                                                          onu_id)