VOL-1221: OpenOLT Adapter/Driver will use a Technology Profile Instance to create the OLT Upstream and Downstream Queuing and Scheduling Constructs for a Bidirectional Flow.

Change-Id: Iaf1a782529e2c459c586b158bd4f6447f548e004
diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index aa1b6ca..9e249fb 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -21,13 +21,14 @@
 uses a KV store in backend to ensure resiliency of the data.
 """
 import json
+import ast
 import structlog
 from bitstring import BitArray
-from ast import literal_eval
 import shlex
 from argparse import ArgumentParser, ArgumentError
 
 from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+from common.tech_profile.tech_profile import TechProfile
 
 
 # Used to parse extra arguments to OpenOlt adapter from the NBI
@@ -45,6 +46,7 @@
     ONU_ID = 'ONU_ID'
     ALLOC_ID = 'ALLOC_ID'
     GEMPORT_ID = 'GEMPORT_ID'
+    FLOW_ID = 'FLOW_ID'
 
     # Constants for passing command line arugments
     OLT_MODEL_ARG = '--olt_model'
@@ -63,6 +65,8 @@
         "alloc_id_end": 2816,
         "gemport_id_start": 1024,
         "gemport_id_end": 8960,
+        "flow_id_start": 1,
+        "flow_id_end": 16383,
         "pon_ports": 16
     }
 
@@ -78,6 +82,9 @@
     GEMPORT_ID_START_IDX = "gemport_id_start"
     GEMPORT_ID_END_IDX = "gemport_id_end"
     GEMPORT_ID_SHARED_IDX = "gemport_id_shared"
+    FLOW_ID_START_IDX = "flow_id_start"
+    FLOW_ID_END_IDX = "flow_id_end"
+    FLOW_ID_SHARED_IDX = "flow_id_shared"
     NUM_OF_PON_PORT = "pon_ports"
 
     # PON Resource range configuration on the KV store.
@@ -90,6 +97,7 @@
     ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
     GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
     ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+    FLOW_ID_POOL_PATH = '{}/flow_id_pool/{}'
 
     # Path on the KV store for storing list of alloc IDs for a given ONU
     # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
@@ -99,6 +107,14 @@
     # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
     GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
 
+    # Path on the KV store for storing list of Flow IDs for a given ONU
+    # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_ids
+    FLOW_ID_RESOURCE_MAP_PATH = '{}/{}/flow_ids'
+
+    # Flow Id info: Use to store more metadata associated with the flow_id
+    # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_id_info/<flow_id>
+    FLOW_ID_INFO_PATH = '{}/{}/flow_id_info/{}'
+
     # Constants for internal usage.
     PON_INTF_ID = 'pon_intf_id'
     START_IDX = 'start_idx'
@@ -133,6 +149,7 @@
 
             self._kv_store = ResourceKvStore(technology, device_id, backend,
                                              host, port)
+            self.tech_profile = TechProfile(self)
 
             # Below attribute, pon_resource_ranges, should be initialized
             # by reading from KV store.
@@ -140,16 +157,19 @@
             self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX] = None
             self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
             self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+            self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX] = None
 
             self.shared_resource_mgrs = dict()
             self.shared_resource_mgrs[PONResourceManager.ONU_ID_SHARED_IDX] = None
             self.shared_resource_mgrs[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
             self.shared_resource_mgrs[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+            self.shared_resource_mgrs[PONResourceManager.FLOW_ID_SHARED_IDX] = None
 
             self.shared_idx_by_type = dict()
             self.shared_idx_by_type[PONResourceManager.ONU_ID] = PONResourceManager.ONU_ID_SHARED_IDX
             self.shared_idx_by_type[PONResourceManager.ALLOC_ID] = PONResourceManager.ALLOC_ID_SHARED_IDX
             self.shared_idx_by_type[PONResourceManager.GEMPORT_ID] = PONResourceManager.GEMPORT_ID_SHARED_IDX
+            self.shared_idx_by_type[PONResourceManager.FLOW_ID] = PONResourceManager.FLOW_ID_SHARED_IDX
 
             self.intf_ids = None
 
@@ -193,19 +213,18 @@
                                 e=e)
         return False
 
-
     def update_range_(self, start_idx, start, end_idx, end, shared_idx, shared_pool_id, shared_resource_mgr):
         if (start is not None) and \
-           (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
-              self.pon_resource_ranges[start_idx] = start
+                (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
+            self.pon_resource_ranges[start_idx] = start
         if (end is not None) and \
-           (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
-              self.pon_resource_ranges[end_idx] = end
+                (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
+            self.pon_resource_ranges[end_idx] = end
         if (shared_pool_id is not None) and \
-           (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
+                (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
             self.pon_resource_ranges[shared_idx] = shared_pool_id
         if (shared_resource_mgr is not None) and \
-           (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
+                (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
             self.shared_resource_mgrs[shared_idx] = shared_resource_mgr
 
     def update_ranges(self,
@@ -220,22 +239,31 @@
                       gemport_id_start_idx=None,
                       gemport_id_end_idx=None,
                       gemport_id_shared_pool_id=None,
-                      gemport_id_shared_resource_mgr=None):
+                      gemport_id_shared_resource_mgr=None,
+                      flow_id_start_idx=None,
+                      flow_id_end_idx=None,
+                      flow_id_shared_pool_id=None,
+                      flow_id_shared_resource_mgr=None):
 
         self.update_range_(PONResourceManager.ONU_ID_START_IDX, onu_id_start_idx,
-                          PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
-                          PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
-                          onu_id_shared_resource_mgr)
+                           PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
+                           PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
+                           onu_id_shared_resource_mgr)
 
         self.update_range_(PONResourceManager.ALLOC_ID_START_IDX, alloc_id_start_idx,
-                          PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
-                          PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
-                          alloc_id_shared_resource_mgr)
+                           PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
+                           PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
+                           alloc_id_shared_resource_mgr)
 
         self.update_range_(PONResourceManager.GEMPORT_ID_START_IDX, gemport_id_start_idx,
-                          PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
-                          PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
-                          gemport_id_shared_resource_mgr)
+                           PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
+                           PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
+                           gemport_id_shared_resource_mgr)
+
+        self.update_range_(PONResourceManager.FLOW_ID_START_IDX, flow_id_start_idx,
+                           PONResourceManager.FLOW_ID_END_IDX, flow_id_end_idx,
+                           PONResourceManager.FLOW_ID_SHARED_IDX, flow_id_shared_pool_id,
+                           flow_id_shared_resource_mgr)
 
     def init_default_pon_resource_ranges(self,
                                          onu_id_start_idx=1,
@@ -247,6 +275,9 @@
                                          gemport_id_start_idx=1024,
                                          gemport_id_end_idx=8960,
                                          gemport_id_shared_pool_id=None,
+                                         flow_id_start_idx=1,
+                                         flow_id_end_idx=16383,
+                                         flow_id_shared_pool_id=None,
                                          num_of_pon_ports=16,
                                          intf_ids=None):
         """
@@ -261,6 +292,9 @@
         :param gemport_id_start_idx: gemport id start index
         :param gemport_id_end_idx: gemport id end index
         :param gemport_id_shared_pool_id: pool idx for gemport id shared by all intfs or None for no sharing
+        :param flow_id_start_idx: flow id start index
+        :param flow_id_end_idx: flow id end index
+        :param flow_id_shared_pool_id: pool idx for flow id shared by all intfs or None for no sharing
         :param num_of_pon_ports: number of PON ports
         :param intf_ids: interfaces serviced by this manager
         """
@@ -268,7 +302,8 @@
 
         self.update_ranges(onu_id_start_idx, onu_id_end_idx, onu_id_shared_pool_id, None,
                            alloc_id_start_idx, alloc_id_end_idx, alloc_id_shared_pool_id, None,
-                           gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None)
+                           gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None,
+                           flow_id_start_idx, flow_id_end_idx, flow_id_shared_pool_id, None)
 
         if intf_ids is None:
             intf_ids = range(0, num_of_pon_ports)
@@ -281,11 +316,12 @@
         """
 
         self._log.info("init-device-resource-pool", technology=self.technology,
-            pon_resource_ranges=self.pon_resource_ranges)
+                       pon_resource_ranges=self.pon_resource_ranges)
 
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.init_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.ONU_ID,
@@ -293,11 +329,13 @@
                     PONResourceManager.ONU_ID_START_IDX],
                 end_idx=self.pon_resource_ranges[
                     PONResourceManager.ONU_ID_END_IDX])
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
 
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.init_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.ALLOC_ID,
@@ -305,11 +343,13 @@
                     PONResourceManager.ALLOC_ID_START_IDX],
                 end_idx=self.pon_resource_ranges[
                     PONResourceManager.ALLOC_ID_END_IDX])
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
 
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.init_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.GEMPORT_ID,
@@ -317,7 +357,22 @@
                     PONResourceManager.GEMPORT_ID_START_IDX],
                 end_idx=self.pon_resource_ranges[
                     PONResourceManager.GEMPORT_ID_END_IDX])
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
+
+        for i in self.intf_ids:
+            shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+            if shared_pool_id is not None:
+                i = shared_pool_id
+            self.init_resource_id_pool(
+                pon_intf_id=i,
+                resource_type=PONResourceManager.FLOW_ID,
+                start_idx=self.pon_resource_ranges[
+                    PONResourceManager.FLOW_ID_START_IDX],
+                end_idx=self.pon_resource_ranges[
+                    PONResourceManager.FLOW_ID_END_IDX])
+            if shared_pool_id is not None:
+                break
 
     def clear_device_resource_pool(self):
         """
@@ -325,30 +380,47 @@
         """
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.clear_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.ONU_ID,
             )
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
 
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.clear_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.ALLOC_ID,
             )
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
 
         for i in self.intf_ids:
             shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
-            if shared_pool_id is not None: i = shared_pool_id
+            if shared_pool_id is not None:
+                i = shared_pool_id
             self.clear_resource_id_pool(
                 pon_intf_id=i,
                 resource_type=PONResourceManager.GEMPORT_ID,
             )
-            if shared_pool_id is not None: break
+            if shared_pool_id is not None:
+                break
+
+        for i in self.intf_ids:
+            shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+            if shared_pool_id is not None:
+                i = shared_pool_id
+            self.clear_resource_id_pool(
+                pon_intf_id=i,
+                resource_type=PONResourceManager.FLOW_ID,
+            )
+            if shared_pool_id is not None:
+                break
 
     def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
                               end_idx):
@@ -367,7 +439,7 @@
         shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
         if shared_resource_mgr is not None and shared_resource_mgr is not self:
             return shared_resource_mgr.init_resource_id_pool(pon_intf_id, resource_type,
-                start_idx, end_idx)
+                                                             start_idx, end_idx)
 
         path = self._get_path(pon_intf_id, resource_type)
         if path is None:
@@ -398,7 +470,7 @@
 
     def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
         """
-        Create alloc/gemport/onu id for given OLT PON interface.
+        Create alloc/gemport/onu/flow id for given OLT PON interface.
 
         :param pon_intf_id: OLT PON interface id
         :param resource_type: String to identify type of resource
@@ -409,6 +481,10 @@
         """
         result = None
 
+        if num_of_id < 1:
+            self._log.error("invalid-num-of-resources-requested")
+            return result
+
         # delegate to the master instance if sharing enabled across instances
         shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
         if shared_resource_mgr is not None and shared_resource_mgr is not self:
@@ -420,16 +496,20 @@
 
         try:
             resource = self._get_resource(path)
-            if resource is not None and resource_type == \
-                    PONResourceManager.ONU_ID:
+            if resource is not None and \
+                    (resource_type == PONResourceManager.ONU_ID or
+                     resource_type == PONResourceManager.FLOW_ID):
                 result = self._generate_next_id(resource)
             elif resource is not None and (
                     resource_type == PONResourceManager.GEMPORT_ID or
                     resource_type == PONResourceManager.ALLOC_ID):
-                result = list()
-                while num_of_id > 0:
-                    result.append(self._generate_next_id(resource))
-                    num_of_id -= 1
+                if num_of_id == 1:
+                    result = self._generate_next_id(resource)
+                else:
+                    result = list()
+                    while num_of_id > 0:
+                        result.append(self._generate_next_id(resource))
+                        num_of_id -= 1
             else:
                 raise Exception("get-resource-failed")
 
@@ -445,7 +525,7 @@
 
     def free_resource_id(self, pon_intf_id, resource_type, release_content):
         """
-        Release alloc/gemport/onu id for given OLT PON interface.
+        Release alloc/gemport/onu/flow id for given OLT PON interface.
 
         :param pon_intf_id: OLT PON interface id
         :param resource_type: String to identify type of resource
@@ -466,8 +546,9 @@
 
         try:
             resource = self._get_resource(path)
-            if resource is not None and resource_type == \
-                    PONResourceManager.ONU_ID:
+            if resource is not None and (
+                    resource_type == PONResourceManager.ONU_ID or
+                    resource_type == PONResourceManager.FLOW_ID):
                 self._release_id(resource, release_content)
             elif resource is not None and (
                     resource_type == PONResourceManager.ALLOC_ID or
@@ -564,6 +645,8 @@
         Get currently configured alloc ids for given pon_intf_onu_id
 
         :param pon_intf_onu_id: reference of PON interface id and onu id
+
+        :return list: List of alloc_ids if available, else None
         """
         path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
             self.device_id,
@@ -581,6 +664,8 @@
         Get currently configured gemport ids for given pon_intf_onu_id
 
         :param pon_intf_onu_id: reference of PON interface id and onu id
+
+        :return list: List of gemport IDs if available, else None
         """
 
         path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
@@ -594,11 +679,68 @@
 
         return None
 
+    def get_current_flow_ids_for_onu(self, pon_intf_onu_id):
+        """
+        Get currently configured flow ids for given pon_intf_onu_id
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+
+        :return list: List of Flow IDs if available, else None
+        """
+
+        path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+            self.device_id,
+            str(pon_intf_onu_id))
+        value = self._kv_store.get_from_kv_store(path)
+        if value is not None:
+            flow_id_list = json.loads(value)
+            assert(isinstance(flow_id_list, list))
+            if len(flow_id_list) > 0:
+                return flow_id_list
+
+        return None
+
+    def get_flow_id_info(self, pon_intf_onu_id, flow_id):
+        """
+        Get flow_id details configured for the ONU.
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param flow_id: Flow Id reference
+
+        :return blob: Flow data blob if available, else None
+        """
+
+        path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+            self.device_id,
+            str(pon_intf_onu_id),
+            flow_id)
+        value = self._kv_store.get_from_kv_store(path)
+        if value is not None:
+            return ast.literal_eval(value)
+
+        return None
+
+    def remove_flow_id_info(self, pon_intf_onu_id, flow_id):
+        """
+        Get flow_id details configured for the ONU.
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param flow_id: Flow Id reference
+
+        """
+
+        path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+            self.device_id,
+            str(pon_intf_onu_id),
+            flow_id)
+        self._kv_store.remove_from_kv_store(path)
+
     def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
         """
         Update currently configured alloc ids for given pon_intf_onu_id
 
         :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param alloc_ids: list of alloc ids
         """
         path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
             self.device_id, str(pon_intf_onu_id)
@@ -612,6 +754,7 @@
         Update currently configured gemport ids for given pon_intf_onu_id
 
         :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param gemport_ids: list of gem port ids
         """
         path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
             self.device_id, str(pon_intf_onu_id)
@@ -620,6 +763,48 @@
             path, json.dumps(gemport_ids)
         )
 
+    def update_flow_id_for_onu(self, pon_intf_onu_id, flow_id, add=True):
+        """
+        Update the flow_id list of the ONU (add or remove flow_id from the list)
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param flow_id: flow ID
+        :param add: Boolean flag to indicate whether the flow_id should be
+                    added or removed from the list. Defaults to adding the flow.
+        """
+        path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        current_flow_ids = self.get_current_flow_ids_for_onu(pon_intf_onu_id)
+        if not isinstance(current_flow_ids, list):
+            # When the first flow_id is being added, the current_flow_ids is None
+            current_flow_ids = list()
+
+        if add:
+            if flow_id not in current_flow_ids:
+                current_flow_ids.append(flow_id)
+        else:
+            if flow_id in current_flow_ids:
+                current_flow_ids.remove(flow_id)
+
+        self._kv_store.update_to_kv_store(path, current_flow_ids)
+
+    def update_flow_id_info_for_onu(self, pon_intf_onu_id, flow_id, flow_data):
+        """
+        Update any metadata associated with the flow_id. The flow_data could be json
+        or any of other data structure. The resource manager doesnt care
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        :param flow_id: Flow ID
+        :param flow_data: Flow data blob
+        """
+        path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+            self.device_id, str(pon_intf_onu_id), flow_id
+        )
+
+        if not self._kv_store.update_to_kv_store(path, flow_data):
+            self._log.error("flow-info-update-failed", path=path, flow_id=flow_id)
+
     def _get_olt_model(self):
         """
         Get olt model variant
@@ -673,7 +858,8 @@
         """
 
         shared_pool_id = self.pon_resource_ranges[self.shared_idx_by_type[resource_type]]
-        if shared_pool_id is not None: pon_intf_id = shared_pool_id
+        if shared_pool_id is not None:
+            pon_intf_id = shared_pool_id
 
         path = None
         if resource_type == PONResourceManager.ONU_ID:
@@ -682,10 +868,22 @@
             path = self._get_alloc_id_resource_path(pon_intf_id)
         elif resource_type == PONResourceManager.GEMPORT_ID:
             path = self._get_gemport_id_resource_path(pon_intf_id)
+        elif resource_type == PONResourceManager.FLOW_ID:
+            path = self._get_flow_id_resource_path(pon_intf_id)
         else:
             self._log.error("invalid-resource-pool-identifier")
         return path
 
+    def _get_flow_id_resource_path(self, pon_intf_id):
+        """
+        Get flow id resource path.
+
+        :param pon_intf_id: OLT PON interface id
+        :return: flow id resource path
+        """
+        return PONResourceManager.FLOW_ID_POOL_PATH.format(
+            self.device_id, pon_intf_id)
+
     def _get_alloc_id_resource_path(self, pon_intf_id):
         """
         Get alloc id resource path.