VOL-1221: OpenOLT Adapter/Driver will use a Technology Profile Instance to create the OLT Upstream and Downstream Queuing and Scheduling Constructs for a Bidirectional Flow.
Change-Id: Iaf1a782529e2c459c586b158bd4f6447f548e004
diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index aa1b6ca..9e249fb 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -21,13 +21,14 @@
uses a KV store in backend to ensure resiliency of the data.
"""
import json
+import ast
import structlog
from bitstring import BitArray
-from ast import literal_eval
import shlex
from argparse import ArgumentParser, ArgumentError
from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+from common.tech_profile.tech_profile import TechProfile
# Used to parse extra arguments to OpenOlt adapter from the NBI
@@ -45,6 +46,7 @@
ONU_ID = 'ONU_ID'
ALLOC_ID = 'ALLOC_ID'
GEMPORT_ID = 'GEMPORT_ID'
+ FLOW_ID = 'FLOW_ID'
# Constants for passing command line arugments
OLT_MODEL_ARG = '--olt_model'
@@ -63,6 +65,8 @@
"alloc_id_end": 2816,
"gemport_id_start": 1024,
"gemport_id_end": 8960,
+ "flow_id_start": 1,
+ "flow_id_end": 16383,
"pon_ports": 16
}
@@ -78,6 +82,9 @@
GEMPORT_ID_START_IDX = "gemport_id_start"
GEMPORT_ID_END_IDX = "gemport_id_end"
GEMPORT_ID_SHARED_IDX = "gemport_id_shared"
+ FLOW_ID_START_IDX = "flow_id_start"
+ FLOW_ID_END_IDX = "flow_id_end"
+ FLOW_ID_SHARED_IDX = "flow_id_shared"
NUM_OF_PON_PORT = "pon_ports"
# PON Resource range configuration on the KV store.
@@ -90,6 +97,7 @@
ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+ FLOW_ID_POOL_PATH = '{}/flow_id_pool/{}'
# Path on the KV store for storing list of alloc IDs for a given ONU
# Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
@@ -99,6 +107,14 @@
# Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
+ # Path on the KV store for storing list of Flow IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_ids
+ FLOW_ID_RESOURCE_MAP_PATH = '{}/{}/flow_ids'
+
+ # Flow Id info: Use to store more metadata associated with the flow_id
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_id_info/<flow_id>
+ FLOW_ID_INFO_PATH = '{}/{}/flow_id_info/{}'
+
# Constants for internal usage.
PON_INTF_ID = 'pon_intf_id'
START_IDX = 'start_idx'
@@ -133,6 +149,7 @@
self._kv_store = ResourceKvStore(technology, device_id, backend,
host, port)
+ self.tech_profile = TechProfile(self)
# Below attribute, pon_resource_ranges, should be initialized
# by reading from KV store.
@@ -140,16 +157,19 @@
self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX] = None
self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+ self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX] = None
self.shared_resource_mgrs = dict()
self.shared_resource_mgrs[PONResourceManager.ONU_ID_SHARED_IDX] = None
self.shared_resource_mgrs[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
self.shared_resource_mgrs[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+ self.shared_resource_mgrs[PONResourceManager.FLOW_ID_SHARED_IDX] = None
self.shared_idx_by_type = dict()
self.shared_idx_by_type[PONResourceManager.ONU_ID] = PONResourceManager.ONU_ID_SHARED_IDX
self.shared_idx_by_type[PONResourceManager.ALLOC_ID] = PONResourceManager.ALLOC_ID_SHARED_IDX
self.shared_idx_by_type[PONResourceManager.GEMPORT_ID] = PONResourceManager.GEMPORT_ID_SHARED_IDX
+ self.shared_idx_by_type[PONResourceManager.FLOW_ID] = PONResourceManager.FLOW_ID_SHARED_IDX
self.intf_ids = None
@@ -193,19 +213,18 @@
e=e)
return False
-
def update_range_(self, start_idx, start, end_idx, end, shared_idx, shared_pool_id, shared_resource_mgr):
if (start is not None) and \
- (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
- self.pon_resource_ranges[start_idx] = start
+ (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
+ self.pon_resource_ranges[start_idx] = start
if (end is not None) and \
- (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
- self.pon_resource_ranges[end_idx] = end
+ (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
+ self.pon_resource_ranges[end_idx] = end
if (shared_pool_id is not None) and \
- (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
+ (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
self.pon_resource_ranges[shared_idx] = shared_pool_id
if (shared_resource_mgr is not None) and \
- (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
+ (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
self.shared_resource_mgrs[shared_idx] = shared_resource_mgr
def update_ranges(self,
@@ -220,22 +239,31 @@
gemport_id_start_idx=None,
gemport_id_end_idx=None,
gemport_id_shared_pool_id=None,
- gemport_id_shared_resource_mgr=None):
+ gemport_id_shared_resource_mgr=None,
+ flow_id_start_idx=None,
+ flow_id_end_idx=None,
+ flow_id_shared_pool_id=None,
+ flow_id_shared_resource_mgr=None):
self.update_range_(PONResourceManager.ONU_ID_START_IDX, onu_id_start_idx,
- PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
- PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
- onu_id_shared_resource_mgr)
+ PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
+ PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
+ onu_id_shared_resource_mgr)
self.update_range_(PONResourceManager.ALLOC_ID_START_IDX, alloc_id_start_idx,
- PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
- PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
- alloc_id_shared_resource_mgr)
+ PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
+ PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
+ alloc_id_shared_resource_mgr)
self.update_range_(PONResourceManager.GEMPORT_ID_START_IDX, gemport_id_start_idx,
- PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
- PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
- gemport_id_shared_resource_mgr)
+ PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
+ PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
+ gemport_id_shared_resource_mgr)
+
+ self.update_range_(PONResourceManager.FLOW_ID_START_IDX, flow_id_start_idx,
+ PONResourceManager.FLOW_ID_END_IDX, flow_id_end_idx,
+ PONResourceManager.FLOW_ID_SHARED_IDX, flow_id_shared_pool_id,
+ flow_id_shared_resource_mgr)
def init_default_pon_resource_ranges(self,
onu_id_start_idx=1,
@@ -247,6 +275,9 @@
gemport_id_start_idx=1024,
gemport_id_end_idx=8960,
gemport_id_shared_pool_id=None,
+ flow_id_start_idx=1,
+ flow_id_end_idx=16383,
+ flow_id_shared_pool_id=None,
num_of_pon_ports=16,
intf_ids=None):
"""
@@ -261,6 +292,9 @@
:param gemport_id_start_idx: gemport id start index
:param gemport_id_end_idx: gemport id end index
:param gemport_id_shared_pool_id: pool idx for gemport id shared by all intfs or None for no sharing
+ :param flow_id_start_idx: flow id start index
+ :param flow_id_end_idx: flow id end index
+ :param flow_id_shared_pool_id: pool idx for flow id shared by all intfs or None for no sharing
:param num_of_pon_ports: number of PON ports
:param intf_ids: interfaces serviced by this manager
"""
@@ -268,7 +302,8 @@
self.update_ranges(onu_id_start_idx, onu_id_end_idx, onu_id_shared_pool_id, None,
alloc_id_start_idx, alloc_id_end_idx, alloc_id_shared_pool_id, None,
- gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None)
+ gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None,
+ flow_id_start_idx, flow_id_end_idx, flow_id_shared_pool_id, None)
if intf_ids is None:
intf_ids = range(0, num_of_pon_ports)
@@ -281,11 +316,12 @@
"""
self._log.info("init-device-resource-pool", technology=self.technology,
- pon_resource_ranges=self.pon_resource_ranges)
+ pon_resource_ranges=self.pon_resource_ranges)
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ONU_ID,
@@ -293,11 +329,13 @@
PONResourceManager.ONU_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.ONU_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ALLOC_ID,
@@ -305,11 +343,13 @@
PONResourceManager.ALLOC_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.ALLOC_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.GEMPORT_ID,
@@ -317,7 +357,22 @@
PONResourceManager.GEMPORT_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.GEMPORT_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
+
+ for i in self.intf_ids:
+ shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+ if shared_pool_id is not None:
+ i = shared_pool_id
+ self.init_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.FLOW_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.FLOW_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.FLOW_ID_END_IDX])
+ if shared_pool_id is not None:
+ break
def clear_device_resource_pool(self):
"""
@@ -325,30 +380,47 @@
"""
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ONU_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ALLOC_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.GEMPORT_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
+
+ for i in self.intf_ids:
+ shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+ if shared_pool_id is not None:
+ i = shared_pool_id
+ self.clear_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.FLOW_ID,
+ )
+ if shared_pool_id is not None:
+ break
def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
end_idx):
@@ -367,7 +439,7 @@
shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
if shared_resource_mgr is not None and shared_resource_mgr is not self:
return shared_resource_mgr.init_resource_id_pool(pon_intf_id, resource_type,
- start_idx, end_idx)
+ start_idx, end_idx)
path = self._get_path(pon_intf_id, resource_type)
if path is None:
@@ -398,7 +470,7 @@
def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
"""
- Create alloc/gemport/onu id for given OLT PON interface.
+ Create alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
:param resource_type: String to identify type of resource
@@ -409,6 +481,10 @@
"""
result = None
+ if num_of_id < 1:
+ self._log.error("invalid-num-of-resources-requested")
+ return result
+
# delegate to the master instance if sharing enabled across instances
shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
if shared_resource_mgr is not None and shared_resource_mgr is not self:
@@ -420,16 +496,20 @@
try:
resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
+ if resource is not None and \
+ (resource_type == PONResourceManager.ONU_ID or
+ resource_type == PONResourceManager.FLOW_ID):
result = self._generate_next_id(resource)
elif resource is not None and (
resource_type == PONResourceManager.GEMPORT_ID or
resource_type == PONResourceManager.ALLOC_ID):
- result = list()
- while num_of_id > 0:
- result.append(self._generate_next_id(resource))
- num_of_id -= 1
+ if num_of_id == 1:
+ result = self._generate_next_id(resource)
+ else:
+ result = list()
+ while num_of_id > 0:
+ result.append(self._generate_next_id(resource))
+ num_of_id -= 1
else:
raise Exception("get-resource-failed")
@@ -445,7 +525,7 @@
def free_resource_id(self, pon_intf_id, resource_type, release_content):
"""
- Release alloc/gemport/onu id for given OLT PON interface.
+ Release alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
:param resource_type: String to identify type of resource
@@ -466,8 +546,9 @@
try:
resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
+ if resource is not None and (
+ resource_type == PONResourceManager.ONU_ID or
+ resource_type == PONResourceManager.FLOW_ID):
self._release_id(resource, release_content)
elif resource is not None and (
resource_type == PONResourceManager.ALLOC_ID or
@@ -564,6 +645,8 @@
Get currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of alloc_ids if available, else None
"""
path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
self.device_id,
@@ -581,6 +664,8 @@
Get currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of gemport IDs if available, else None
"""
path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
@@ -594,11 +679,68 @@
return None
+ def get_current_flow_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured flow ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of Flow IDs if available, else None
+ """
+
+ path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ flow_id_list = json.loads(value)
+ assert(isinstance(flow_id_list, list))
+ if len(flow_id_list) > 0:
+ return flow_id_list
+
+ return None
+
+ def get_flow_id_info(self, pon_intf_onu_id, flow_id):
+ """
+ Get flow_id details configured for the ONU.
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+
+ :return blob: Flow data blob if available, else None
+ """
+
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id),
+ flow_id)
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ return ast.literal_eval(value)
+
+ return None
+
+ def remove_flow_id_info(self, pon_intf_onu_id, flow_id):
+ """
+ Get flow_id details configured for the ONU.
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+
+ """
+
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id),
+ flow_id)
+ self._kv_store.remove_from_kv_store(path)
+
def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
"""
Update currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+ :param alloc_ids: list of alloc ids
"""
path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
self.device_id, str(pon_intf_onu_id)
@@ -612,6 +754,7 @@
Update currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+ :param gemport_ids: list of gem port ids
"""
path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
self.device_id, str(pon_intf_onu_id)
@@ -620,6 +763,48 @@
path, json.dumps(gemport_ids)
)
+ def update_flow_id_for_onu(self, pon_intf_onu_id, flow_id, add=True):
+ """
+ Update the flow_id list of the ONU (add or remove flow_id from the list)
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: flow ID
+ :param add: Boolean flag to indicate whether the flow_id should be
+ added or removed from the list. Defaults to adding the flow.
+ """
+ path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ current_flow_ids = self.get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if not isinstance(current_flow_ids, list):
+ # When the first flow_id is being added, the current_flow_ids is None
+ current_flow_ids = list()
+
+ if add:
+ if flow_id not in current_flow_ids:
+ current_flow_ids.append(flow_id)
+ else:
+ if flow_id in current_flow_ids:
+ current_flow_ids.remove(flow_id)
+
+ self._kv_store.update_to_kv_store(path, current_flow_ids)
+
+ def update_flow_id_info_for_onu(self, pon_intf_onu_id, flow_id, flow_data):
+ """
+ Update any metadata associated with the flow_id. The flow_data could be json
+ or any of other data structure. The resource manager doesnt care
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow ID
+ :param flow_data: Flow data blob
+ """
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id, str(pon_intf_onu_id), flow_id
+ )
+
+ if not self._kv_store.update_to_kv_store(path, flow_data):
+ self._log.error("flow-info-update-failed", path=path, flow_id=flow_id)
+
def _get_olt_model(self):
"""
Get olt model variant
@@ -673,7 +858,8 @@
"""
shared_pool_id = self.pon_resource_ranges[self.shared_idx_by_type[resource_type]]
- if shared_pool_id is not None: pon_intf_id = shared_pool_id
+ if shared_pool_id is not None:
+ pon_intf_id = shared_pool_id
path = None
if resource_type == PONResourceManager.ONU_ID:
@@ -682,10 +868,22 @@
path = self._get_alloc_id_resource_path(pon_intf_id)
elif resource_type == PONResourceManager.GEMPORT_ID:
path = self._get_gemport_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.FLOW_ID:
+ path = self._get_flow_id_resource_path(pon_intf_id)
else:
self._log.error("invalid-resource-pool-identifier")
return path
+ def _get_flow_id_resource_path(self, pon_intf_id):
+ """
+ Get flow id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: flow id resource path
+ """
+ return PONResourceManager.FLOW_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
def _get_alloc_id_resource_path(self, pon_intf_id):
"""
Get alloc id resource path.