VOL-1221: OpenOLT Adapter/Driver will use a Technology Profile Instance to create the OLT Upstream and Downstream Queuing and Scheduling Constructs for a Bidirectional Flow.
Change-Id: Iaf1a782529e2c459c586b158bd4f6447f548e004
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index 6ac8a22..7e869d4 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -20,12 +20,13 @@
from voltha.registry import registry
from voltha.core.config.config_backend import ConsulStore
from voltha.core.config.config_backend import EtcdStore
+from voltha.adapters.openolt.openolt_flow_mgr import *
+
from voltha.adapters.openolt.protos import openolt_pb2
+
class OpenOltResourceMgr(object):
- GEMPORT_IDS = "gemport_ids"
- ALLOC_IDS = "alloc_ids"
- BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
+ BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
def __init__(self, device_id, host_and_port, extra_args, device_info):
self.log = structlog.get_logger(id=device_id,
@@ -37,7 +38,6 @@
self.args = registry('main').get_args()
# KV store's IP Address and PORT
- host, port = '127.0.0.1', 8500
if self.args.backend == 'etcd':
host, port = self.args.etcd.split(':', 1)
self.kv_store = EtcdStore(host, port,
@@ -80,6 +80,12 @@
pool.end = self.device_info.gemport_id_end
pool.sharing = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH
+ pool = arange.pools.add()
+ pool.type = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.FLOW_ID
+ pool.start = self.device_info.flow_id_start
+ pool.end = self.device_info.flow_id_end
+ pool.sharing = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH
+
# Create a separate Resource Manager instance for each range. This assumes that
# each technology is represented by only a single range
global_resource_mgr = None
@@ -87,11 +93,12 @@
technology = arange.technology
self.log.info("device-info", technology=technology)
ranges[technology] = arange
- extra_args = self.extra_args + ' ' + PONResourceManager.OLT_MODEL_ARG + ' {}'.format(self.device_info.model)
+ extra_args = self.extra_args + ' ' + PONResourceManager.OLT_MODEL_ARG + ' {}'.format(self.device_info.model)
resource_mgr = PONResourceManager(technology,
- extra_args, self.device_id, self.args.backend, host, port)
+ extra_args, self.device_id, self.args.backend, host, port)
resource_mgrs_by_tech[technology] = resource_mgr
- if global_resource_mgr is None: global_resource_mgr = resource_mgr
+ if global_resource_mgr is None:
+ global_resource_mgr = resource_mgr
for intf_id in arange.intf_ids:
self.resource_mgrs[intf_id] = resource_mgrs_by_tech[technology]
self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
@@ -103,7 +110,7 @@
def __del__(self):
self.log.info("clearing-device-resource-pool")
- for key, resource_mgr in self.resource_mgrs.iteritems():
+ for key, resource_mgr in self.resource_mgrs.iteritems():
resource_mgr.clear_device_resource_pool()
def get_onu_id(self, pon_intf_id):
@@ -117,6 +124,46 @@
return onu_id
+ def get_flow_id(self, pon_intf_onu_id):
+ pon_intf = pon_intf_onu_id[0]
+ flow_id = self.resource_mgrs[pon_intf].get_resource_id(
+ pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
+ if flow_id is not None:
+ self.resource_mgrs[pon_intf].update_flow_id_for_onu(pon_intf_onu_id, flow_id)
+
+ return flow_id
+
+ def get_flow_id_info(self, pon_intf_id, onu_id, flow_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ return self.resource_mgrs[pon_intf_id].get_flow_id_info(pon_intf_onu_id, flow_id)
+
+ def get_current_flow_ids_for_onu(self, pon_intf_id, onu_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ return self.resource_mgrs[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_onu_id)
+
+ def update_flow_id_info_for_onu(self, pon_intf_onu_id, flow_id, flow_data):
+ pon_intf_id = pon_intf_onu_id[0]
+ return self.resource_mgrs[pon_intf_id].update_flow_id_info_for_onu(
+ pon_intf_onu_id, flow_id, flow_data)
+
+ def get_hsia_flow_for_onu(self, pon_intf_id, onu_id, gemport_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ try:
+ flow_ids = self.resource_mgrs[pon_intf_id]. \
+ get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if flow_ids is not None:
+ for flow_id in flow_ids:
+ flows = self.get_flow_id_info(pon_intf_id, onu_id, flow_id)
+ assert (isinstance(flows, list))
+ for flow in flows:
+ if flow['flow_category'] == HSIA_FLOW and \
+ flow['gemport_id'] == gemport_id:
+ return flow_id
+ except Exception as e:
+ self.log.error("error-retrieving-flow-info", e=e)
+
+ return self.get_flow_id(pon_intf_onu_id)
+
def get_alloc_id(self, pon_intf_onu_id):
# Derive the pon_intf from the pon_intf_onu_id tuple
pon_intf = pon_intf_onu_id[0]
@@ -129,27 +176,35 @@
# ONU.
return alloc_id_list[0]
- alloc_id_list = self.resource_mgrs[pon_intf].get_resource_id(
+ alloc_id = self.resource_mgrs[pon_intf].get_resource_id(
pon_intf_id=pon_intf,
resource_type=PONResourceManager.ALLOC_ID,
num_of_id=1
)
- if alloc_id_list and len(alloc_id_list) == 0:
+ if alloc_id is None:
self.log.error("no-alloc-id-available")
return None
# update the resource map on KV store with the list of alloc_id
# allocated for the pon_intf_onu_id tuple
self.resource_mgrs[pon_intf].update_alloc_ids_for_onu(pon_intf_onu_id,
- alloc_id_list)
-
- # Since we request only one alloc id, we refer the 0th
- # index
- alloc_id = alloc_id_list[0]
+ list(alloc_id))
return alloc_id
- def get_gemport_id(self, pon_intf_onu_id):
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ pon_intf_id = pon_intf_onu_id[0]
+ return self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_onu_id)
+
+ def update_gemports_ponport_to_onu_map_on_kv_store(self, gemport_list, pon_port, onu_id):
+ for gemport in gemport_list:
+ pon_intf_gemport = (pon_port, gemport)
+ # This information is used when packet_indication is received and
+ # we need to derive the ONU Id for which the packet arrived based
+ # on the pon_intf and gemport available in the packet_indication
+ self.kv_store[str(pon_intf_gemport)] = str(onu_id)
+
+ def get_gemport_id(self, pon_intf_onu_id, num_of_id=1):
# Derive the pon_intf and onu_id from the pon_intf_onu_id tuple
pon_intf = pon_intf_onu_id[0]
onu_id = pon_intf_onu_id[1]
@@ -157,15 +212,12 @@
gemport_id_list = self.resource_mgrs[pon_intf].get_current_gemport_ids_for_onu(
pon_intf_onu_id)
if gemport_id_list and len(gemport_id_list) > 0:
- # Since we support only one gemport_id for the ONU at the moment,
- # return the first gemport_id in the list, if available, for that
- # ONU.
- return gemport_id_list[0]
+ return gemport_id_list
gemport_id_list = self.resource_mgrs[pon_intf].get_resource_id(
pon_intf_id=pon_intf,
resource_type=PONResourceManager.GEMPORT_ID,
- num_of_id=1
+ num_of_id=num_of_id
)
if gemport_id_list and len(gemport_id_list) == 0:
@@ -175,27 +227,29 @@
# update the resource map on KV store with the list of gemport_id
# allocated for the pon_intf_onu_id tuple
self.resource_mgrs[pon_intf].update_gemport_ids_for_onu(pon_intf_onu_id,
- gemport_id_list)
+ gemport_id_list)
- # We currently use only one gemport
- gemport = gemport_id_list[0]
-
- pon_intf_gemport = (pon_intf, gemport)
- # This information is used when packet_indication is received and
- # we need to derive the ONU Id for which the packet arrived based
- # on the pon_intf and gemport available in the packet_indication
- self.kv_store[str(pon_intf_gemport)] = str(onu_id)
-
- return gemport
+ self.update_gemports_ponport_to_onu_map_on_kv_store(gemport_id_list,
+ pon_intf, onu_id)
+ return gemport_id_list
def free_onu_id(self, pon_intf_id, onu_id):
- result = self.resource_mgrs[pon_intf_id].free_resource_id(
+ _ = self.resource_mgrs[pon_intf_id].free_resource_id(
pon_intf_id, PONResourceManager.ONU_ID, onu_id)
pon_intf_onu_id = (pon_intf_id, onu_id)
self.resource_mgrs[pon_intf_id].remove_resource_map(
pon_intf_onu_id)
+ def free_flow_id(self, pon_intf_id, onu_id, flow_id):
+ self.resource_mgrs[pon_intf_id].free_resource_id(
+ pon_intf_id, PONResourceManager.FLOW_ID, flow_id)
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(pon_intf_onu_id,
+ flow_id, False)
+ self.resource_mgrs[pon_intf_id].remove_flow_id_info(pon_intf_onu_id,
+ flow_id)
+
def free_pon_resources_for_onu(self, pon_intf_id_onu_id):
pon_intf_id = pon_intf_id_onu_id[0]
@@ -203,18 +257,18 @@
alloc_ids = \
self.resource_mgrs[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_id_onu_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.ALLOC_ID,
- alloc_ids)
+ PONResourceManager.ALLOC_ID,
+ alloc_ids)
gemport_ids = \
self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_id_onu_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.GEMPORT_ID,
- gemport_ids)
+ PONResourceManager.GEMPORT_ID,
+ gemport_ids)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.ONU_ID,
- onu_id)
+ PONResourceManager.ONU_ID,
+ onu_id)
# Clear resource map associated with (pon_intf_id, gemport_id) tuple.
self.resource_mgrs[pon_intf_id].remove_resource_map(pon_intf_id_onu_id)
@@ -242,20 +296,25 @@
onu_id_shared_pool_id = None
alloc_id_start = self.device_info.alloc_id_start
alloc_id_end = self.device_info.alloc_id_end
- alloc_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ alloc_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
alloc_id_shared_pool_id = None
gemport_id_start = self.device_info.gemport_id_start
gemport_id_end = self.device_info.gemport_id_end
- gemport_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ gemport_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
gemport_id_shared_pool_id = None
+ flow_id_start = self.device_info.flow_id_start
+ flow_id_end = self.device_info.flow_id_end
+ flow_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ flow_id_shared_pool_id = None
global_pool_id = 0
- for first_intf_pool_id in arange.intf_ids: break;
+ for first_intf_pool_id in arange.intf_ids:
+ break
for pool in arange.pools:
shared_pool_id = global_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH else \
- first_intf_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_SAME_TECH else \
- None
+ first_intf_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_SAME_TECH else \
+ None
if pool.type == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.ONU_ID:
onu_id_start = pool.start
@@ -272,40 +331,60 @@
gemport_id_end = pool.end
gemport_id_shared = pool.sharing
gemport_id_shared_pool_id = shared_pool_id
+ elif pool.type == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.FLOW_ID:
+ flow_id_start = pool.start
+ flow_id_end = pool.end
+ flow_id_shared = pool.sharing
+ flow_id_shared_pool_id = shared_pool_id
self.log.info("device-info-init", technology=arange.technology,
- onu_id_start=onu_id_start, onu_id_end=onu_id_end, onu_id_shared_pool_id=onu_id_shared_pool_id,
- alloc_id_start=alloc_id_start, alloc_id_end=alloc_id_end, alloc_id_shared_pool_id=alloc_id_shared_pool_id,
- gemport_id_start=gemport_id_start, gemport_id_end=gemport_id_end, gemport_id_shared_pool_id=gemport_id_shared_pool_id,
- intf_ids=arange.intf_ids)
+ onu_id_start=onu_id_start, onu_id_end=onu_id_end, onu_id_shared_pool_id=onu_id_shared_pool_id,
+ alloc_id_start=alloc_id_start, alloc_id_end=alloc_id_end,
+ alloc_id_shared_pool_id=alloc_id_shared_pool_id,
+ gemport_id_start=gemport_id_start, gemport_id_end=gemport_id_end,
+ gemport_id_shared_pool_id=gemport_id_shared_pool_id,
+ flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end,
+ flow_id_shared_pool_id=flow_id_shared_pool_id,
+ intf_ids=arange.intf_ids)
resource_mgr.init_default_pon_resource_ranges(
- onu_id_start_idx=onu_id_start,
- onu_id_end_idx=onu_id_end,
- onu_id_shared_pool_id=onu_id_shared_pool_id,
- alloc_id_start_idx=alloc_id_start,
- alloc_id_end_idx=alloc_id_end,
- alloc_id_shared_pool_id=alloc_id_shared_pool_id,
- gemport_id_start_idx=gemport_id_start,
- gemport_id_end_idx=gemport_id_end,
- gemport_id_shared_pool_id=gemport_id_shared_pool_id,
- num_of_pon_ports=self.device_info.pon_ports,
- intf_ids=arange.intf_ids
- )
+ onu_id_start_idx=onu_id_start,
+ onu_id_end_idx=onu_id_end,
+ onu_id_shared_pool_id=onu_id_shared_pool_id,
+ alloc_id_start_idx=alloc_id_start,
+ alloc_id_end_idx=alloc_id_end,
+ alloc_id_shared_pool_id=alloc_id_shared_pool_id,
+ gemport_id_start_idx=gemport_id_start,
+ gemport_id_end_idx=gemport_id_end,
+ gemport_id_shared_pool_id=gemport_id_shared_pool_id,
+ flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end,
+ flow_id_shared_pool_id=flow_id_shared_pool_id,
+ num_of_pon_ports=self.device_info.pon_ports,
+ intf_ids=arange.intf_ids
+ )
# For global sharing, make sure to refresh both local and global resource manager instances' range
if global_resource_mgr is not self:
if onu_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
global_resource_mgr.update_ranges(onu_id_start_idx=onu_id_start, onu_id_end_idx=onu_id_end)
resource_mgr.update_ranges(onu_id_start_idx=onu_id_start, onu_id_end_idx=onu_id_end,
- onu_id_shared_resource_mgr=global_resource_mgr)
+ onu_id_shared_resource_mgr=global_resource_mgr)
if alloc_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
global_resource_mgr.update_ranges(alloc_id_start_idx=alloc_id_start, alloc_id_end_idx=alloc_id_end)
resource_mgr.update_ranges(alloc_id_start_idx=alloc_id_start, alloc_id_end_idx=alloc_id_end,
- alloc_id_shared_resource_mgr=global_resource_mgr)
+ alloc_id_shared_resource_mgr=global_resource_mgr)
if gemport_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
- global_resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start, gemport_id_end_idx=gemport_id_end)
+ global_resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start,
+ gemport_id_end_idx=gemport_id_end)
resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start, gemport_id_end_idx=gemport_id_end,
- gemport_id_shared_resource_mgr=global_resource_mgr)
+ gemport_id_shared_resource_mgr=global_resource_mgr)
+
+ if flow_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
+ global_resource_mgr.update_ranges(flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end)
+ resource_mgr.update_ranges(flow_id_start_idx=flow_id_start, flow_id_end_idx=flow_id_end,
+ flow_id_shared_resource_mgr=global_resource_mgr)