VOL-1344:
1) Place all of resource manager and tech profile KV store data under /service/voltha
2) Ensure flow_ids are released on the KV store when device is deleted
3) Ensure pon resources are re-used on voltha restart
4) Few other code re-organization and bug fixes

Change-Id: Ia7bc8062d88b7a8eec5d4b87209536d81b115575
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1b996e7..3b08edc 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -17,6 +17,8 @@
 from twisted.internet import reactor
 import grpc
 from google.protobuf.json_format import MessageToDict
+import hashlib
+from simplejson import dumps
 
 from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
     ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -30,11 +32,6 @@
 
 # Flow categories
 HSIA_FLOW = "HSIA_FLOW"
-DHCP_FLOW = "DHCP_FLOW"
-EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
-EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
-IGMP_FLOW = "IGMP_FLOW"
-LLDP_FLOW = "LLDP_FLOW"
 
 EAP_ETH_TYPE = 0x888e
 LLDP_ETH_TYPE = 0x88cc
@@ -71,7 +68,6 @@
 TRAP_TO_HOST = 'trap_to_host'
 
 
-
 class OpenOltFlowMgr(object):
 
     def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
@@ -215,7 +211,6 @@
 
     def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
         uni_port_no = None
-        flow_category = HSIA_FLOW  # default
         child_device_id = None
         if flow_direction == UPSTREAM:
             for field in fd.get_ofb_fields(flow):
@@ -223,15 +218,6 @@
                     is_uni, child_device_id = self._is_uni_port(field.port)
                     if is_uni:
                         uni_port_no = field.port
-                elif field.type == fd.IP_PROTO:
-                    if field.ip_proto == IGMP_PROTO:
-                        flow_category = IGMP_FLOW
-                elif field.type == fd.ETH_TYPE:
-                    if field.eth_type == EAP_ETH_TYPE:
-                        flow_category = EAPOL_PRIMARY_FLOW
-                    elif field.eth_type == LLDP_ETH_TYPE:
-                        flow_category = LLDP_FLOW
-
         elif flow_direction == DOWNSTREAM:
             for field in fd.get_ofb_fields(flow):
                 if field.type == fd.METADATA:
@@ -248,7 +234,7 @@
                         if is_uni:
                             uni_port_no = action.output.port
 
-        if flow_category and child_device_id:
+        if child_device_id:
             child_device = self.adapter_agent.get_device(child_device_id)
             pon_intf = child_device.proxy_address.channel_id
             onu_id = child_device.proxy_address.onu_id
@@ -270,7 +256,6 @@
             self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
         else:
             self.log.error("invalid-info", uni_port_no=uni_port_no,
-                           flow_category=flow_category,
                            child_device_id=child_device_id)
 
     def remove_flow(self, flow):
@@ -333,6 +318,22 @@
         ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
         return ofp_port_name
 
+    def get_tp_path(self, intf_id, ofp_port_name):
+        # FIXME Should get Table id form the flow, as of now hardcoded to
+        # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+        # 'tp_path' contains the suffix part of the tech_profile_instance path.
+        # The prefix to the 'tp_path' should be set to \
+        # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
+        return self.tech_profile[intf_id]. \
+            get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+                        ofp_port_name)
+
+    def delete_tech_profile_instance(self, intf_id, onu_id, uni_id):
+        # Remove the TP instance associated with the ONU
+        ofp_port_name = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+        tp_path = self.get_tp_path(intf_id, ofp_port_name)
+        return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
+
     def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
                             action, flow):
 
@@ -372,7 +373,6 @@
                     if vlan_id is not None:
                         self.add_eapol_flow(
                             intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
-                            eapol_flow_category=EAPOL_SECONDARY_FLOW,
                             vlan_id=vlan_id)
                     parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
                     onu_device = self.adapter_agent.get_child_device(self.device_id,
@@ -383,14 +383,7 @@
                         self.log.error("port-name-not-found")
                         return
 
-                    # FIXME Should get Table id form the flow, as of now hardcoded to
-                    # DEFAULT_TECH_PROFILE_TABLE_ID (64)
-                    # 'tp_path' contains the suffix part of the tech_profile_instance path.
-                    # The prefix to the 'tp_path' should be set to \
-                    # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
-                    tp_path = self.tech_profile[intf_id]. \
-                                  get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
-                                              ofp_port_name)
+                    tp_path = self.get_tp_path(intf_id, ofp_port_name)
 
                     self.log.debug('Load-tech-profile-request-to-brcm-handler',
                                    tp_path=tp_path)
@@ -418,6 +411,16 @@
 
     def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
         alloc_id, gem_port_ids = None, None
+        pon_intf_onu_id = (intf_id, onu_id)
+
+        # If we already have allocated alloc_id and gem_ports earlier, render them
+        alloc_id = \
+            self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+        gem_port_ids = \
+            self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
+        if alloc_id is not None and gem_port_ids is not None:
+            return alloc_id, gem_port_ids
+
         try:
             (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
             if ofp_port_name is None:
@@ -437,31 +440,30 @@
                 tech_profile_instance = self.tech_profile[intf_id]. \
                     create_tech_profile_instance(table_id, ofp_port_name,
                                                  intf_id)
-                if tech_profile_instance is not None:
-
-                    # upstream scheduler
-                    us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
-                        tech_profile_instance)
-                    # downstream scheduler
-                    ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
-                        tech_profile_instance)
-                    # create Tcont
-                    tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
-                                                                   us_scheduler,
-                                                                   ds_scheduler)
-
-                    self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
-                                                              onu_id=onu_id,
-                                                              uni_id=uni_id,
-                                                              port_no=ofp_port_no,
-                                                              tconts=tconts))
-                else:
+                if tech_profile_instance is None:
                     raise Exception('Tech-profile-instance-creation-failed')
             else:
                 self.log.debug(
                     'Tech-profile-instance-already-exist-for-given port-name',
                     ofp_port_name=ofp_port_name)
 
+            # upstream scheduler
+            us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+                tech_profile_instance)
+            # downstream scheduler
+            ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+                tech_profile_instance)
+            # create Tcont
+            tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+                                                           us_scheduler,
+                                                           ds_scheduler)
+
+            self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+                                                      onu_id=onu_id,
+                                                      uni_id=uni_id,
+                                                      port_no=ofp_port_no,
+                                                      tconts=tconts))
+
             # Fetch alloc id and gemports from tech profile instance
             alloc_id = tech_profile_instance.us_scheduler.alloc_id
             gem_port_ids = []
@@ -469,7 +471,7 @@
                     tech_profile_instance.upstream_gem_port_attribute_list)):
                 gem_port_ids.append(
                     tech_profile_instance.upstream_gem_port_attribute_list[i].
-                        gemport_id)
+                    gemport_id)
         except BaseException as e:
             self.log.exception(exception=e)
 
@@ -504,8 +506,7 @@
         (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
         if eap_active:
             self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
-                                gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
-                                vlan_id=uplink_classifier[VLAN_VID])
+                                gemport_id, vlan_id=uplink_classifier[VLAN_VID])
 
     def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
                                  downlink_action, flow, alloc_id, gemport_id):
@@ -521,7 +522,18 @@
     def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
                       direction, logical_flow, alloc_id, gemport_id):
 
-        flow_id = self.resource_mgr.get_hsia_flow_for_uni(intf_id, onu_id, uni_id, gemport_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier,
+                                                        gemport_id)
+
+        # One of the OLT platform (Broadcom BAL) requires that symmetric
+        # flows require the same flow_id to be used across UL and DL.
+        # Since HSIA flow is the only symmetric flow currently, we need to
+        # re-use the flow_id across both direction. The 'flow_category'
+        # takes priority over flow_cookie to find any available HSIA_FLOW
+        # id for the ONU.
+        flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+                                                flow_store_cookie,
+                                                HSIA_FLOW)
         if flow_id is None:
             self.log.error("hsia-flow-unavailable")
             return
@@ -537,8 +549,11 @@
             cookie=logical_flow.cookie)
 
         if self.add_flow_to_device(flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
-            self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id, flow.uni_id,
+            flow_info = self._get_flow_info_as_json_blob(flow,
+                                                         flow_store_cookie,
+                                                         HSIA_FLOW)
+            self.update_flow_info_to_kv_store(flow.access_intf_id,
+                                              flow.onu_id, flow.uni_id,
                                               flow.flow_id, flow_info)
 
     def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
@@ -554,7 +569,12 @@
         classifier[PACKET_TAG_TYPE] = SINGLE_TAG
         classifier.pop(VLAN_VID, None)
 
-        flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier,
+                                                        gemport_id)
+
+        flow_id = self.resource_mgr.get_flow_id(
+            intf_id, onu_id, uni_id, flow_store_cookie
+        )
 
         dhcp_flow = openolt_pb2.Flow(
             onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
@@ -567,7 +587,7 @@
             cookie=logical_flow.cookie)
 
         if self.add_flow_to_device(dhcp_flow, logical_flow):
-            flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+            flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
             self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
                                               dhcp_flow.onu_id,
                                               dhcp_flow.uni_id,
@@ -575,8 +595,7 @@
                                               flow_info)
 
     def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
-                       gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
-                       vlan_id=DEFAULT_MGMT_VLAN):
+                       gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
 
         uplink_classifier = dict()
         uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
@@ -586,11 +605,12 @@
         uplink_action = dict()
         uplink_action[TRAP_TO_HOST] = True
 
+        flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
+                                                        gemport_id)
         # Add Upstream EAPOL Flow.
-        if eapol_flow_category == EAPOL_PRIMARY_FLOW:
-            uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
-        else:
-            uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+        uplink_flow_id = self.resource_mgr.get_flow_id(
+            intf_id, onu_id, uni_id, flow_store_cookie
+        )
 
         upstream_flow = openolt_pb2.Flow(
             access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
@@ -608,29 +628,26 @@
         logical_flow.match.type = OFPMT_OXM
 
         if self.add_flow_to_device(upstream_flow, logical_flow):
-            if eapol_flow_category == EAPOL_PRIMARY_FLOW:
-                flow_info = self._get_flow_info_as_json_blob(upstream_flow,
-                                                             EAPOL_PRIMARY_FLOW)
-                self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
-                                                  upstream_flow.onu_id,
-                                                  upstream_flow.uni_id,
-                                                  upstream_flow.flow_id,
-                                                  flow_info)
-            else:
-                flow_info = self._get_flow_info_as_json_blob(upstream_flow,
-                                                             EAPOL_SECONDARY_FLOW)
-                self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
-                                                  upstream_flow.onu_id,
-                                                  upstream_flow.uni_id,
-                                                  upstream_flow.flow_id,
-                                                  flow_info)
+            flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+                                                         flow_store_cookie)
+            self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+                                              upstream_flow.onu_id,
+                                              upstream_flow.uni_id,
+                                              upstream_flow.flow_id,
+                                              flow_info)
 
         if vlan_id == DEFAULT_MGMT_VLAN:
             # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
             # requirement)
-            # FIXME this is actually not used. But, here to be consistent with the upstream tagging from ONU.
-            # It needs refactored to be completely removed
-            special_vlan_downstream_flow = 4091 # 4000 - onu_id
+            # On one of the platforms (Broadcom BAL), when same DL classifier
+            # vlan was used across multiple ONUs, eapol flow re-adds after
+            # flow delete (cases of onu reboot/disable) fails.
+            # In order to generate unique vlan, a combination of intf_id
+            # onu_id and uni_id is used.
+            # uni_id defaults to 0, so add 1 to it.
+            special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
+            # Assert that we do not generate invalid vlans under no condition
+            assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
 
             downlink_classifier = dict()
             downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -640,7 +657,13 @@
             downlink_action[PUSH_VLAN] = True
             downlink_action[VLAN_VID] = vlan_id
 
-            downlink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+
+            flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
+                                                            gemport_id)
+
+            downlink_flow_id = self.resource_mgr.get_flow_id(
+                intf_id, onu_id, uni_id, flow_store_cookie
+            )
 
             downstream_flow = openolt_pb2.Flow(
                 access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
@@ -668,7 +691,7 @@
 
             if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
                 flow_info = self._get_flow_info_as_json_blob(downstream_flow,
-                                                             EAPOL_PRIMARY_FLOW)
+                                                             flow_store_cookie)
                 self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
                                                   downstream_flow.onu_id,
                                                   downstream_flow.uni_id,
@@ -715,7 +738,9 @@
         # *********************************************
         onu_id = -1
         uni_id = -1
-        flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id)
+        flow_store_cookie = self._get_flow_store_cookie(classifier)
+        flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+                                                flow_store_cookie)
 
         downstream_flow = openolt_pb2.Flow(
             access_intf_id=-1,  # access_intf_id not required
@@ -822,7 +847,7 @@
         for flow in flows:
             in_port = fd.get_in_port(flow)
             out_port = fd.get_out_port(flow)
-            if in_port == port and \
+            if in_port == port and out_port is not None and \
                     self.platform.intf_id_to_port_type_name(out_port) \
                     == Port.ETHERNET_NNI:
                 fields = fd.get_ofb_fields(flow)
@@ -984,11 +1009,14 @@
         # the device
         assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
 
-    def _get_flow_info_as_json_blob(self, flow, flow_category):
+    def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
+                                    flow_category=None):
         json_blob = MessageToDict(message=flow,
                                   preserving_proto_field_name=True)
         self.log.debug("flow-info", json_blob=json_blob)
-        json_blob['flow_category'] = flow_category
+        json_blob['flow_store_cookie'] = flow_store_cookie
+        if flow_category is not None:
+            json_blob['flow_category'] = flow_category
         flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
                                                        flow.onu_id, flow.uni_id, flow.flow_id)
 
@@ -1000,3 +1028,13 @@
             flow_info.append(json_blob)
 
         return flow_info
+
+    @staticmethod
+    def _get_flow_store_cookie(classifier, gem_port=None):
+        assert isinstance(classifier, dict)
+        # We need unique flows per gem_port
+        if gem_port is not None:
+            to_hash = dumps(classifier, sort_keys=True) + str(gem_port)
+        else:
+            to_hash = dumps(classifier, sort_keys=True)
+        return hashlib.md5(to_hash).hexdigest()[:12]