VOL-1233: Change pon resource manager to use blocking KV client library
VOL-1232: PON Resource Manager allocates same alloc id across different pon ports, and is not acceptable by asfvolt16 platform
VOL-1245: When OpenOlt device is delete, there is Unhandled error during the deletion of child brcm_openomci ONUs
Change-Id: I3f939cb12e0456b2014a133968a77fba1c4c6e63
diff --git a/common/pon_resource_manager/resource_kv_store.py b/common/pon_resource_manager/resource_kv_store.py
index 0ccbed8..a1a5c14 100644
--- a/common/pon_resource_manager/resource_kv_store.py
+++ b/common/pon_resource_manager/resource_kv_store.py
@@ -21,7 +21,7 @@
from voltha.core.config.config_backend import EtcdStore
# KV store uses this prefix to store resource info
-PATH_PREFIX = 'resource_manager/{}/{}'
+PATH_PREFIX = 'resource_manager/{}'
class ResourceKvStore(object):
@@ -44,14 +44,12 @@
# logger
self._log = structlog.get_logger()
- path = PATH_PREFIX.format(technology, device_id)
+ path = PATH_PREFIX.format(technology)
try:
if backend == 'consul':
self._kv_store = ConsulStore(host, port, path)
- self._recurse = '?recurse'
elif backend == 'etcd':
self._kv_store = EtcdStore(host, port, path)
- self._recurse = ''
else:
self._log.error('Invalid-backend')
raise Exception("Invalid-backend-for-kv-store")
@@ -99,7 +97,6 @@
:param path: path to remove the resource
"""
- path = path + self._recurse
try:
del self._kv_store[path]
self._log.debug("Resource-deleted-in-kv-store", path=path)
diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index 4abead3..86f9a96 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -23,10 +23,19 @@
import json
import structlog
from bitstring import BitArray
-from twisted.internet.defer import returnValue, inlineCallbacks
+from ast import literal_eval
+import shlex
+from argparse import ArgumentParser, ArgumentError
-from common.kvstore.kvstore import create_kv_client
-from common.utils.asleep import asleep
+from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+
+
+# Used to parse extra arguments to OpenOlt adapter from the NBI
+class OltVendorArgumentParser(ArgumentParser):
+ # Must override the exit command to prevent it from
+ # calling sys.exit(). Return exception instead.
+ def exit(self, status=0, message=None):
+ raise Exception(message)
class PONResourceManager(object):
@@ -44,34 +53,45 @@
# they are expected to be stored in the following format.
# Note: All parameters are MANDATORY for now.
'''
- {
- "onu_start_idx": 1,
- "onu_end_idx": 127,
- "alloc_id_start_idx": 1024,
- "alloc_id_end_idx": 65534,
- "gem_port_id_start_idx": 1024,
- "gem_port_id_end_idx": 16383,
- "num_of_pon_port": 16
- }
+ {
+ "onu_id_start": 1,
+ "onu_id_end": 127,
+ "alloc_id_start": 1024,
+ "alloc_id_end": 2816,
+ "gemport_id_start": 1024,
+ "gemport_id_end": 8960,
+ "pon_ports": 16
+ }
+
'''
# constants used as keys to reference the resource range parameters from
# and external KV store.
- ONU_START_IDX = "onu_start_idx"
- ONU_END_IDX = "onu_end_idx"
- ALLOC_ID_START_IDX = "alloc_id_start_idx"
- ALLOC_ID_END_IDX = "alloc_id_end_idx"
- GEM_PORT_ID_START_IDX = "gem_port_id_start_idx"
- GEM_PORT_ID_END_IDX = "gem_port_id_end_idx"
- NUM_OF_PON_PORT = "num_of_pon_port"
+ ONU_START_IDX = "onu_id_start"
+ ONU_END_IDX = "onu_id_end"
+ ALLOC_ID_START_IDX = "alloc_id_start"
+ ALLOC_ID_END_IDX = "alloc_id_end"
+ GEM_PORT_ID_START_IDX = "gemport_id_start"
+ GEM_PORT_ID_END_IDX = "gemport_id_end"
+ NUM_OF_PON_PORT = "pon_ports"
# PON Resource range configuration on the KV store.
# Format: 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
- PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_manager/{}/resource_ranges/{}'
+ # The KV store backend is initialized with a path prefix and we need to
+ # provide only the suffix.
+ PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_ranges/{}'
- # resource path in kv store
- ALLOC_ID_POOL_PATH = 'resource_manager/{}/{}/alloc_id_pool/{}'
- GEMPORT_ID_POOL_PATH = 'resource_manager/{}/{}/gemport_id_pool/{}'
- ONU_ID_POOL_PATH = 'resource_manager/{}/{}/onu_id_pool/{}'
+ # resource path suffix
+ ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
+ GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
+ ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+
+ # Path on the KV store for storing list of alloc IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
+ ALLOC_ID_RESOURCE_MAP_PATH = '{}/{}/alloc_ids'
+
+ # Path on the KV store for storing list of gemport IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
+ GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
# Constants for internal usage.
PON_INTF_ID = 'pon_intf_id'
@@ -79,15 +99,14 @@
END_IDX = 'end_idx'
POOL = 'pool'
- def __init__(self, technology, olt_vendor_type, device_id,
+ def __init__(self, technology, extra_args, device_id,
backend, host, port):
"""
Create PONResourceManager object.
:param technology: PON technology
- :param: olt_vendor_type: This string defines the OLT vendor type
- and is used as a Key to load the resource range configuration from
- KV store location.
+ :param: extra_args: This string contains extra arguments passed during
+ pre-provisioning of OLT and specifies the OLT Vendor type
:param device_id: OLT device id
:param backend: backend store
:param host: ip of backend store
@@ -98,10 +117,15 @@
self._log = structlog.get_logger()
try:
- self._kv_store = create_kv_client(backend, host, port)
self.technology = technology
- self.olt_vendor_type = olt_vendor_type
+ self.extra_args = extra_args
self.device_id = device_id
+ self.backend = backend
+ self.host = host
+ self.port = port
+ self.olt_vendor = None
+ self._kv_store = ResourceKvStore(technology, device_id, backend,
+ host, port)
# Below attribute, pon_resource_ranges, should be initialized
# by reading from KV store.
self.pon_resource_ranges = dict()
@@ -109,51 +133,79 @@
self._log.exception("exception-in-init")
raise Exception(e)
- @inlineCallbacks
- def init_pon_resource_ranges(self):
- # Try to initialize the PON Resource Ranges from KV store if available
- status = yield self.init_resource_ranges_from_kv_store()
- # If reading from KV store fails, initialize to default values.
- if not status:
- self._log.error("failed-to-read-resource-ranges-from-kv-store")
- self.init_default_pon_resource_ranges()
-
- @inlineCallbacks
def init_resource_ranges_from_kv_store(self):
- path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(
- self.technology, self.olt_vendor_type)
- # get resource from kv store
- result = yield self._kv_store.get(path)
- resource_range_config = result[0]
+ """
+ Initialize PON resource ranges with config fetched from kv store.
- if resource_range_config is not None:
- self.pon_resource_ranges = eval(resource_range_config.value)
- self._log.debug("Init-resource-ranges-from-kvstore-success",
- pon_resource_ranges=self.pon_resource_ranges,
- path=path)
- returnValue(True)
+ :return boolean: True if PON resource ranges initialized else false
+ """
+ self.olt_vendor = self._get_olt_vendor()
+ # Try to initialize the PON Resource Ranges from KV store based on the
+ # OLT vendor key, if available
+ if self.olt_vendor is None:
+ self._log.info("olt-vendor-unavailable--not-reading-from-kv-store")
+ return False
- returnValue(False)
+ path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(self.olt_vendor)
+ try:
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+
+ if result is None:
+ self._log.debug("resource-range-config-unavailable-on-kvstore")
+ return False
+
+ resource_range_config = result
+
+ if resource_range_config is not None:
+ self.pon_resource_ranges = json.loads(resource_range_config)
+ self._log.debug("Init-resource-ranges-from-kvstore-success",
+ pon_resource_ranges=self.pon_resource_ranges,
+ path=path)
+ return True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-range-from-kv-store",
+ e=e)
+ return False
def init_default_pon_resource_ranges(self, onu_start_idx=1,
onu_end_idx=127,
alloc_id_start_idx=1024,
- alloc_id_end_idx=65534,
+ alloc_id_end_idx=2816,
gem_port_id_start_idx=1024,
- gem_port_id_end_idx=16383,
+ gem_port_id_end_idx=8960,
num_of_pon_ports=16):
+ """
+ Initialize default PON resource ranges
+
+ :param onu_start_idx: onu id start index
+ :param onu_end_idx: onu id end index
+ :param alloc_id_start_idx: alloc id start index
+ :param alloc_id_end_idx: alloc id end index
+ :param gem_port_id_start_idx: gemport id start index
+ :param gem_port_id_end_idx: gemport id end index
+ :param num_of_pon_ports: number of PON ports
+ """
self._log.info("initialize-default-resource-range-values")
- self.pon_resource_ranges[PONResourceManager.ONU_START_IDX] = onu_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ONU_START_IDX] = onu_start_idx
self.pon_resource_ranges[PONResourceManager.ONU_END_IDX] = onu_end_idx
- self.pon_resource_ranges[PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
- self.pon_resource_ranges[PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
self.pon_resource_ranges[
PONResourceManager.GEM_PORT_ID_START_IDX] = gem_port_id_start_idx
self.pon_resource_ranges[
PONResourceManager.GEM_PORT_ID_END_IDX] = gem_port_id_end_idx
- self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
+ self.pon_resource_ranges[
+ PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
def init_device_resource_pool(self):
+ """
+ Initialize resource pool for all PON ports.
+ """
i = 0
while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
self.init_resource_id_pool(
@@ -164,43 +216,49 @@
end_idx=self.pon_resource_ranges[
PONResourceManager.ONU_END_IDX])
- self.init_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.ALLOC_ID,
- start_idx=self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_START_IDX],
- end_idx=self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_END_IDX])
-
- self.init_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.GEMPORT_ID,
- start_idx=self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_START_IDX],
- end_idx=self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_END_IDX])
i += 1
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX])
+
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_END_IDX])
+
def clear_device_resource_pool(self):
+ """
+ Clear resource pool of all PON ports.
+ """
i = 0
while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ONU_ID,
)
-
- self.clear_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.ALLOC_ID,
- )
-
- self.clear_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.GEMPORT_ID,
- )
i += 1
- @inlineCallbacks
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ )
+
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ )
+
def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
end_idx):
"""
@@ -215,26 +273,31 @@
status = False
path = self._get_path(pon_intf_id, resource_type)
if path is None:
- returnValue(status)
+ return status
- # In case of adapter reboot and reconciliation resource in kv store
- # checked for its presence if not kv store update happens
- resource = yield self._get_resource(path)
+ try:
+ # In case of adapter reboot and reconciliation resource in kv store
+ # checked for its presence if not kv store update happens
+ resource = self._get_resource(path)
- if resource is not None:
- self._log.info("Resource-already-present-in-store", path=path)
- status = True
- else:
- resource = self._format_resource(pon_intf_id, start_idx, end_idx)
- self._log.info("Resource-initialized", path=path)
-
- # Add resource as json in kv store.
- result = yield self._kv_store.put(path, resource)
- if result is None:
+ if resource is not None:
+ self._log.info("Resource-already-present-in-store", path=path)
status = True
- returnValue(status)
+ else:
+ resource = self._format_resource(pon_intf_id, start_idx,
+ end_idx)
+ self._log.info("Resource-initialized", path=path)
- @inlineCallbacks
+ # Add resource as json in kv store.
+ result = self._kv_store.update_to_kv_store(path, resource)
+ if result is True:
+ status = True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-pool", e=e)
+
+ return status
+
def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
"""
Create alloc/gemport/onu id for given OLT PON interface.
@@ -247,12 +310,21 @@
respectively
"""
result = None
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
path = self._get_path(pon_intf_id, resource_type)
if path is None:
- returnValue(result)
+ return result
- resource = yield self._get_resource(path)
try:
+ resource = self._get_resource(path)
if resource is not None and resource_type == \
PONResourceManager.ONU_ID:
result = self._generate_next_id(resource)
@@ -263,18 +335,19 @@
while num_of_id > 0:
result.append(self._generate_next_id(resource))
num_of_id -= 1
+ else:
+ raise Exception("get-resource-failed")
+ self._log.debug("Get-" + resource_type + "-success", result=result,
+ path=path)
# Update resource in kv store
self._update_resource(path, resource)
- except BaseException:
+ except Exception as e:
self._log.exception("Get-" + resource_type + "-id-failed",
- path=path)
- self._log.debug("Get-" + resource_type + "-success", result=result,
- path=path)
- returnValue(result)
+ path=path, e=e)
+ return result
- @inlineCallbacks
def free_resource_id(self, pon_intf_id, resource_type, release_content):
"""
Release alloc/gemport/onu id for given OLT PON interface.
@@ -286,12 +359,21 @@
else False
"""
status = False
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
path = self._get_path(pon_intf_id, resource_type)
if path is None:
- returnValue(status)
+ return status
- resource = yield self._get_resource(path)
try:
+ resource = self._get_resource(path)
if resource is not None and resource_type == \
PONResourceManager.ONU_ID:
self._release_id(resource, release_content)
@@ -300,16 +382,19 @@
resource_type == PONResourceManager.GEMPORT_ID):
for content in release_content:
self._release_id(resource, content)
+ else:
+ raise Exception("get-resource-failed")
+
self._log.debug("Free-" + resource_type + "-success", path=path)
# Update resource in kv store
- status = yield self._update_resource(path, resource)
+ status = self._update_resource(path, resource)
- except BaseException:
- self._log.exception("Free-" + resource_type + "-failed", path=path)
- returnValue(status)
+ except Exception as e:
+ self._log.exception("Free-" + resource_type + "-failed",
+ path=path, e=e)
+ return status
- @inlineCallbacks
def clear_resource_id_pool(self, pon_intf_id, resource_type):
"""
Clear Resource Pool for a given Resource Type on a given PON Port.
@@ -318,16 +403,147 @@
"""
path = self._get_path(pon_intf_id, resource_type)
if path is None:
- returnValue(False)
+ return False
- result = yield self._kv_store.delete(path)
- if result is None:
- self._log.debug("Resource-pool-cleared", device_id=self.device_id,
- path=path)
- returnValue(True)
+ try:
+ result = self._kv_store.remove_from_kv_store(path)
+ if result is True:
+ self._log.debug("Resource-pool-cleared",
+ device_id=self.device_id,
+ path=path)
+ return True
+ except Exception as e:
+ self._log.exception("error-clearing-resource-pool", e=e)
+
self._log.error("Clear-resource-pool-failed", device_id=self.device_id,
path=path)
- returnValue(False)
+ return False
+
+ def init_resource_map(self, pon_intf_onu_id):
+ """
+ Initialize resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # initialize pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ alloc_ids = list()
+ self._kv_store.update_to_kv_store(
+ alloc_id_path, json.dumps(alloc_ids)
+ )
+
+ # initialize pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ gemport_ids = list()
+ self._kv_store.update_to_kv_store(
+ gemport_id_path, json.dumps(gemport_ids)
+ )
+
+ def remove_resource_map(self, pon_intf_onu_id):
+ """
+ Remove resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # remove pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(alloc_id_path)
+
+ # remove pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(gemport_id_path)
+
+ def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ alloc_id_list = json.loads(value)
+ if len(alloc_id_list) > 0:
+ return alloc_id_list
+
+ return None
+
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ gemport_id_list = json.loads(value)
+ if len(gemport_id_list) > 0:
+ return gemport_id_list
+
+ return None
+
+ def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
+ """
+ Update currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(alloc_ids)
+ )
+
+ def update_gemport_ids_for_onu(self, pon_intf_onu_id, gemport_ids):
+ """
+ Update currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(gemport_ids)
+ )
+
+ def _get_olt_vendor(self):
+ """
+ Get olt vendor variant
+
+ :return: type of olt vendor
+ """
+ olt_vendor = None
+ if self.extra_args and len(self.extra_args) > 0:
+ parser = OltVendorArgumentParser(add_help=False)
+ parser.add_argument('--olt_vendor', '-o', action='store',
+ choices=['default', 'asfvolt16'],
+ default='default')
+ try:
+ args = parser.parse_args(shlex.split(self.extra_args))
+ self._log.debug('parsing-extra-arguments', args=args)
+ olt_vendor = args.olt_vendor
+ except ArgumentError as e:
+ self._log.exception('invalid-arguments: {}', e=e)
+ except Exception as e:
+ self._log.exception('option-parsing-error: {}', e=e)
+
+ return olt_vendor
def _generate_next_id(self, resource):
"""
@@ -377,7 +593,7 @@
:return: alloc id resource path
"""
return PONResourceManager.ALLOC_ID_POOL_PATH.format(
- self.technology, self.device_id, pon_intf_id)
+ self.device_id, pon_intf_id)
def _get_gemport_id_resource_path(self, pon_intf_id):
"""
@@ -387,7 +603,7 @@
:return: gemport id resource path
"""
return PONResourceManager.GEMPORT_ID_POOL_PATH.format(
- self.technology, self.device_id, pon_intf_id)
+ self.device_id, pon_intf_id)
def _get_onu_id_resource_path(self, pon_intf_id):
"""
@@ -397,9 +613,8 @@
:return: onu id resource path
"""
return PONResourceManager.ONU_ID_POOL_PATH.format(
- self.technology, self.device_id, pon_intf_id)
+ self.device_id, pon_intf_id)
- @inlineCallbacks
def _update_resource(self, path, resource):
"""
Update resource in resource kv store.
@@ -410,12 +625,11 @@
"""
resource[PONResourceManager.POOL] = \
resource[PONResourceManager.POOL].bin
- result = yield self._kv_store.put(path, json.dumps(resource))
- if result is None:
- returnValue(True)
- returnValue(False)
+ result = self._kv_store.update_to_kv_store(path, json.dumps(resource))
+ if result is True:
+ return True
+ return False
- @inlineCallbacks
def _get_resource(self, path):
"""
Get resource from kv store.
@@ -424,12 +638,15 @@
:return: resource if resource present in kv store else None
"""
# get resource from kv store
- result = yield self._kv_store.get(path)
- resource = result[0]
+ result = self._kv_store.get_from_kv_store(path)
+ if result is None:
+ return result
+ self._log.info("dumping resource", result=result)
+ resource = result
if resource is not None:
# decode resource fetched from backend store to dictionary
- resource = eval(resource.value)
+ resource = json.loads(resource)
# resource pool in backend store stored as binary string whereas to
# access the pool to generate/release IDs it need to be converted
@@ -437,7 +654,7 @@
resource[PONResourceManager.POOL] = \
BitArray('0b' + resource[PONResourceManager.POOL])
- returnValue(resource)
+ return resource
def _format_resource(self, pon_intf_id, start_idx, end_idx):
"""
diff --git a/tests/utests/common/test_pon_resource_manager.py b/tests/utests/common/test_pon_resource_manager.py
index 74c4736..6205ee4 100644
--- a/tests/utests/common/test_pon_resource_manager.py
+++ b/tests/utests/common/test_pon_resource_manager.py
@@ -17,10 +17,8 @@
from unittest import TestCase, main
from bitstring import BitArray
-from common.kvstore.kv_client import KVPair
from common.pon_resource_manager.resource_manager import PONResourceManager
from mock import Mock
-from twisted.internet.defer import inlineCallbacks
class TestResourceManager(TestCase):
@@ -29,97 +27,81 @@
'0001c889ee7189fb', 'consul',
'localhost', 8500)
self.default_resource_range = {
- "onu_start_idx": 1,
- "onu_end_idx": 127,
- "alloc_id_start_idx": 1024,
- "alloc_id_end_idx": 65534,
- "gem_port_id_start_idx": 1024,
- "gem_port_id_end_idx": 16383,
- "num_of_pon_port": 16
+ "onu_id_start": 1,
+ "onu_id_end": 127,
+ "alloc_id_start": 1024,
+ "alloc_id_end": 2816,
+ "gemport_id_start": 1024,
+ "gemport_id_end": 8960,
+ "pon_ports": 16
}
def tearDown(self):
self._rm = None
self.default_resource_range = None
- @inlineCallbacks
def test_init_pon_resource_ranges(self):
- key = PONResourceManager.PON_RESOURCE_RANGE_CONFIG_PATH.format(
- 'xgspon', 'default')
- value = json.dumps(self.default_resource_range).encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
+ output = json.dumps(self.default_resource_range).encode('utf-8')
+ self._rm._get_olt_vendor = Mock(return_value='default')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- yield self._rm.init_pon_resource_ranges()
+ self._rm.init_resource_ranges_from_kv_store()
self.assertEqual(self._rm.pon_resource_ranges,
self.default_resource_range)
- self._rm._kv_store.get = Mock(return_value=(None, None))
-
- yield self._rm.init_pon_resource_ranges()
+ self._rm.init_default_pon_resource_ranges()
self.assertEqual(self._rm.pon_resource_ranges,
self.default_resource_range)
- @inlineCallbacks
def test_init_resource_id_pool(self):
- self._rm._kv_store.get = Mock(return_value=(None, None))
- self._rm._kv_store.put = Mock(return_value=None)
- status = yield self._rm.init_resource_id_pool(0, 'ONU_ID', 1, 127)
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=None)
+ self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
+ status = self._rm.init_resource_id_pool(0, 'ONU_ID', 1, 127)
self.assertTrue(status)
- status = yield self._rm.init_resource_id_pool(
+ status = self._rm.init_resource_id_pool(
1, 'ALLOC_ID', 1024, 16383)
self.assertTrue(status)
- status = yield self._rm.init_resource_id_pool(
+ status = self._rm.init_resource_id_pool(
2, 'GEMPORT_ID', 1023, 65534)
self.assertTrue(status)
- @inlineCallbacks
def test_get_resource_id(self):
# Get onu id test
onu_id_resource = self._rm._format_resource(0, 1, 127)
- key = self._rm._get_path(0, PONResourceManager.ONU_ID)
- value = onu_id_resource.encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
- self._rm._kv_store.put = Mock(return_value=None)
- result = yield self._rm.get_resource_id(0, 'ONU_ID')
+ output = onu_id_resource.encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
+ self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
+ result = self._rm.get_resource_id(0, 'ONU_ID')
self.assertEqual(result, 1)
# Get alloc id test
alloc_id_resource = self._rm._format_resource(1, 1024, 16383)
- key = self._rm._get_path(1, PONResourceManager.ALLOC_ID)
- value = alloc_id_resource.encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
- result = yield self._rm.get_resource_id(1, 'ALLOC_ID', 1)
+ output = alloc_id_resource.encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
+ result = self._rm.get_resource_id(1, 'ALLOC_ID', 1)
self.assertEqual(result[0], 1024)
- result = yield self._rm.get_resource_id(1, 'ALLOC_ID', 4)
+ result = self._rm.get_resource_id(1, 'ALLOC_ID', 4)
self.assertEqual(result, [1024, 1025, 1026, 1027])
# Get gemport id test
gemport_id_resource = self._rm._format_resource(2, 1023, 65534)
- key = self._rm._get_path(2, PONResourceManager.GEMPORT_ID)
- value = gemport_id_resource.encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
- result = yield self._rm.get_resource_id(2, 'GEMPORT_ID', 1)
+ output = gemport_id_resource.encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
+ result = self._rm.get_resource_id(2, 'GEMPORT_ID', 1)
self.assertEqual(result[0], 1023)
- result = yield self._rm.get_resource_id(2, 'GEMPORT_ID', 5)
+ result = self._rm.get_resource_id(2, 'GEMPORT_ID', 5)
self.assertEqual(result, [1023, 1024, 1025, 1026, 1027])
- @inlineCallbacks
def test_free_resource_id(self):
# Free onu id test
- self._rm._kv_store.put = Mock(return_value=None)
+ self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
onu_id_resource = eval(self._rm._format_resource(0, 1, 127))
onu_id_resource['pool'] = BitArray('0b' + onu_id_resource['pool'])
self._rm._generate_next_id(onu_id_resource)
onu_id_resource['pool'] = onu_id_resource['pool'].bin
- key = self._rm._get_path(0, PONResourceManager.ONU_ID)
- value = json.dumps(onu_id_resource).encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
- result = yield self._rm.free_resource_id(0, 'ONU_ID', 1)
+ output = json.dumps(onu_id_resource).encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
+ result = self._rm.free_resource_id(0, 'ONU_ID', 1)
self.assertTrue(result)
# Free alloc id test
@@ -130,10 +112,8 @@
self._rm._generate_next_id(alloc_id_resource)
alloc_id_resource['pool'] = alloc_id_resource['pool'].bin
- key = self._rm._get_path(0, PONResourceManager.ALLOC_ID)
- value = json.dumps(alloc_id_resource).encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
+ output = json.dumps(alloc_id_resource).encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
result = self._rm.free_resource_id(1, 'ALLOC_ID',
[1024, 1025, 1026, 1027, 1028])
self.assertTrue(result)
@@ -147,21 +127,18 @@
self._rm._generate_next_id(gemport_id_resource)
gemport_id_resource['pool'] = gemport_id_resource['pool'].bin
- key = self._rm._get_path(0, PONResourceManager.GEMPORT_ID)
- value = json.dumps(gemport_id_resource).encode('utf-8')
- output = KVPair(key, value, None)
- self._rm._kv_store.get = Mock(return_value=(output, None))
+ output = json.dumps(gemport_id_resource).encode('utf-8')
+ self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
result = self._rm.free_resource_id(2, 'GEMPORT_ID',
[1023, 1024, 1025, 1026, 1027, 1028])
self.assertTrue(result)
- @inlineCallbacks
def test_clear_resource_id_pool(self):
- self._rm._kv_store.delete = Mock(return_value=None)
- status = yield self._rm.clear_resource_id_pool(0, 'ONU_ID')
+ self._rm._kv_store.remove_from_kv_store = Mock(return_value=True)
+ status = self._rm.clear_resource_id_pool(0, 'ONU_ID')
self.assertTrue(status)
- self._rm._kv_store.delete = Mock(return_value="error")
- status = yield self._rm.clear_resource_id_pool(1, 'ALLOC_ID')
+ self._rm._kv_store.remove_from_kv_store = Mock(return_value=False)
+ status = self._rm.clear_resource_id_pool(1, 'ALLOC_ID')
self.assertFalse(status)
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index 4e8b2dd..fa2a2dc 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -70,6 +70,8 @@
self.log.debug('function-entry')
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
+ self.parent_adapter = None
+ self.parent_id = None
self.device_id = device_id
self.incoming_messages = DeferredQueue()
self.event_messages = DeferredQueue()
@@ -153,6 +155,11 @@
# register for proxied messages right away
self.proxy_address = device.proxy_address
self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+ self.parent_id = device.parent_id
+ parent_device = self.adapter_agent.get_device(self.parent_id)
+ if parent_device.type == 'openolt':
+ self.parent_adapter = registry('adapter_loader').\
+ get_agent(parent_device.adapter).adapter
if self.enabled is not True:
self.log.info('activating-new-onu')
@@ -239,20 +246,16 @@
except Exception as e:
self.log.exception("exception-updating-port",e=e)
- @inlineCallbacks
def delete(self, device):
self.log.info('delete-onu', device=device)
-
- parent_device = self.adapter_agent.get_device(device.parent_id)
- if parent_device.type == 'openolt':
- parent_adapter = registry('adapter_loader').get_agent(parent_device.adapter).adapter
- self.log.debug('parent-adapter-delete-onu', onu_device=device,
- parent_device=parent_device,
- parent_adapter=parent_adapter)
+ if self.parent_adapter:
try:
- parent_adapter.delete_child_device(parent_device.id, device)
+ self.parent_adapter.delete_child_device(self.parent_id, device)
except AttributeError:
self.log.debug('parent-device-delete-child-not-implemented')
+ else:
+ self.log.debug("parent-adapter-not-available")
+
# Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
# flow decomposition that ultimately comes from onos
diff --git a/voltha/adapters/openolt/README.md b/voltha/adapters/openolt/README.md
index 5e174d3..8f3d5e6 100644
--- a/voltha/adapters/openolt/README.md
+++ b/voltha/adapters/openolt/README.md
@@ -28,13 +28,13 @@
To specify ASFvOLT16 OLT device specific resource ranges, first create a JSON file `asfvolt16_resource_range.json` with the following entry
```bash
{
- "onu_start_idx": 1,
- "onu_end_idx": 127,
- "alloc_id_start_idx": 1024,
- "alloc_id_end_idx": 65534,
- "gem_port_id_start_idx": 1024,
- "gem_port_id_end_idx": 16383,
- "num_of_pon_port": 16
+ "onu_id_start": 1,
+ "onu_id_end": 127,
+ "alloc_id_start": 1024,
+ "alloc_id_end": 2816,
+ "gemport_id_start": 1024,
+ "gemport_id_end": 8960,
+ "pon_ports": 16
}
```
This data should be put on the KV store location `resource_manager/xgspon/resource_ranges/asfvolt16`
diff --git a/voltha/adapters/openolt/openolt.py b/voltha/adapters/openolt/openolt.py
index 7ccec65..137096d 100644
--- a/voltha/adapters/openolt/openolt.py
+++ b/voltha/adapters/openolt/openolt.py
@@ -34,6 +34,7 @@
from voltha.adapters.openolt.openolt_statistics import OpenOltStatisticsMgr
from voltha.adapters.openolt.openolt_bw import OpenOltBW
from voltha.adapters.openolt.openolt_platform import OpenOltPlatform
+from voltha.adapters.openolt.openolt_resource_manager import OpenOltResourceMgr
_ = third_party
log = structlog.get_logger()
@@ -42,6 +43,7 @@
OpenOltDefaults = {
'support_classes': {
'platform': OpenOltPlatform,
+ 'resource_mgr': OpenOltResourceMgr,
'flow_mgr': OpenOltFlowMgr,
'alarm_mgr': OpenOltAlarmMgr,
'stats_mgr': OpenOltStatisticsMgr,
@@ -122,6 +124,7 @@
def reconcile_device(self, device):
log.info('reconcile-device', device=device)
kwargs = {
+ 'support_classes': OpenOltDefaults['support_classes'],
'adapter_agent': self.adapter_agent,
'device': device,
'device_num': self.num_devices + 1,
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index e5bf92f..943fb66 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -87,6 +87,7 @@
device = kwargs['device']
self.platform_class = kwargs['support_classes']['platform']
+ self.resource_mgr_class = kwargs['support_classes']['resource_mgr']
self.flow_mgr_class = kwargs['support_classes']['flow_mgr']
self.alarm_mgr_class = kwargs['support_classes']['alarm_mgr']
self.stats_mgr_class = kwargs['support_classes']['stats_mgr']
@@ -122,10 +123,10 @@
n_buffers=256, # TODO fake for now
n_tables=2, # TODO ditto
capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
)
),
desc=ofp_desc(
@@ -185,11 +186,16 @@
self.log.info('Device connected', device_info=device_info)
self.platform = self.platform_class(self.log, device_info)
+ self.resource_mgr = self.resource_mgr_class(self.device_id,
+ self.host_and_port,
+ self.extra_args,
+ device_info)
self.flow_mgr = self.flow_mgr_class(self.log, self.stub,
self.device_id,
self.logical_device_id,
- self.platform)
+ self.platform,
+ self.resource_mgr)
self.alarm_mgr = self.alarm_mgr_class(self.log, self.adapter_agent,
self.device_id,
self.logical_device_id,
@@ -201,6 +207,10 @@
device.model = device_info.model
device.hardware_version = device_info.hardware_version
device.firmware_version = device_info.firmware_version
+
+
+ # TODO: check for uptime and reboot if too long (VOL-1192)
+
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
@@ -312,12 +322,12 @@
reactor.callFromThread(self.packet_indication, ind.pkt_ind)
elif ind.HasField('port_stats'):
reactor.callFromThread(
- self.stats_mgr.port_statistics_indication,
- ind.port_stats)
+ self.stats_mgr.port_statistics_indication,
+ ind.port_stats)
elif ind.HasField('flow_stats'):
reactor.callFromThread(
- self.stats_mgr.flow_statistics_indication,
- ind.flow_stats)
+ self.stats_mgr.flow_statistics_indication,
+ ind.flow_stats)
elif ind.HasField('alarm_ind'):
reactor.callFromThread(self.alarm_mgr.process_alarms,
ind.alarm_ind)
@@ -396,23 +406,36 @@
onu_id = self.new_onu_id(intf_id)
try:
+ onu_id = self.resource_mgr.get_onu_id(intf_id)
+ if onu_id is None:
+ raise Exception("onu-id-unavailable")
+
+ pon_intf_onu_id = (intf_id, onu_id)
+ alloc_id = self.resource_mgr.get_alloc_id(
+ pon_intf_onu_id)
+ if alloc_id is None:
+ # Free up other PON resources if are unable to
+ # proceed ahead
+ self.resource_mgr.free_onu_id(intf_id, onu_id)
+ raise Exception("alloc-id-unavailable")
+
self.add_onu_device(
intf_id,
self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT),
onu_id, serial_number)
self.activate_onu(intf_id, onu_id, serial_number,
- serial_number_str)
+ serial_number_str, alloc_id)
except Exception as e:
self.log.exception('onu-activation-failed', e=e)
else:
if onu_device.connect_status != ConnectStatus.REACHABLE:
- onu_device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(onu_device)
+ onu_device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(onu_device)
onu_id = onu_device.proxy_address.onu_id
if onu_device.oper_status == OperStatus.DISCOVERED \
- or onu_device.oper_status == OperStatus.ACTIVATING:
+ or onu_device.oper_status == OperStatus.ACTIVATING:
self.log.debug("ignore onu discovery indication, \
the onu has been discovered and should be \
activating shorlty", intf_id=intf_id,
@@ -430,8 +453,14 @@
onu_device.oper_status = OperStatus.DISCOVERED
self.adapter_agent.update_device(onu_device)
try:
+ pon_intf_onu_id = (intf_id, onu_id)
+ # The ONU is already in the VOLTHA DB and resources were
+ # already allocated for this ONU. So we fetch the resource
+ # from local cache and not KV store.
+ alloc_id = self.resource_mgr.get_alloc_id(
+ pon_intf_onu_id)
self.activate_onu(intf_id, onu_id, serial_number,
- serial_number_str)
+ serial_number_str, alloc_id)
except Exception as e:
self.log.error('onu-activation-error',
serial_number=serial_number_str, error=e)
@@ -453,20 +482,25 @@
if serial_number_str is not None:
onu_device = self.adapter_agent.get_child_device(
- self.device_id,
- serial_number=serial_number_str)
+ self.device_id,
+ serial_number=serial_number_str)
else:
onu_device = self.adapter_agent.get_child_device(
- self.device_id,
- parent_port_no=self.platform.intf_id_to_port_no(
- onu_indication.intf_id, Port.PON_OLT),
- onu_id=onu_indication.onu_id)
+ self.device_id,
+ parent_port_no=self.platform.intf_id_to_port_no(
+ onu_indication.intf_id, Port.PON_OLT),
+ onu_id=onu_indication.onu_id)
if onu_device is None:
self.log.error('onu not found', intf_id=onu_indication.intf_id,
onu_id=onu_indication.onu_id)
return
+ # We will use this alloc_id and gemport_id to pass on to the onu adapter
+ pon_intf_onu_id = (onu_indication.intf_id, onu_indication.onu_id)
+ alloc_id = self.resource_mgr.get_alloc_id(pon_intf_onu_id)
+ gemport_id = self.resource_mgr.get_gemport_id(pon_intf_onu_id)
+
if self.platform.intf_id_from_pon_port_no(onu_device.parent_port_no) \
!= onu_indication.intf_id:
self.log.warn('ONU-is-on-a-different-intf-id-now',
@@ -555,14 +589,11 @@
# tcont creation (onu)
tcont = TcontsConfigData()
- tcont.alloc_id = self.platform.mk_alloc_id(
- onu_indication.intf_id, onu_indication.onu_id)
+ tcont.alloc_id = alloc_id
# gem port creation
gem_port = GemportsConfigData()
- gem_port.gemport_id = self.platform.mk_gemport_id(
- onu_indication.intf_id,
- onu_indication.onu_id)
+ gem_port.gemport_id = gemport_id
gem_port.tcont_ref = str(tcont.alloc_id)
@@ -638,7 +669,16 @@
flow_id=pkt_indication.flow_id)
if pkt_indication.intf_type == "pon":
- onu_id = self.platform.onu_id_from_gemport_id(pkt_indication.gemport_id)
+ pon_intf_gemport = (pkt_indication.intf_id, pkt_indication.gemport_id)
+ try:
+ onu_id = int(self.resource_mgr.kv_store[pon_intf_gemport])
+ if onu_id is None:
+ raise Exception("onu-id-none")
+ except Exception as e:
+ self.log.error("no-onu-reference-for-gem",
+ gemport_id=pkt_indication.gemport_id, e=e)
+ return
+
logical_port_num = self.platform.mk_uni_port_num(pkt_indication.intf_id,
onu_id)
elif pkt_indication.intf_type == "nni":
@@ -673,8 +713,8 @@
if isinstance(outer_shim.payload, Dot1Q):
# If double tag, remove the outer tag
payload = (
- Ether(src=pkt.src, dst=pkt.dst, type=outer_shim.type) /
- outer_shim.payload
+ Ether(src=pkt.src, dst=pkt.dst, type=outer_shim.type) /
+ outer_shim.payload
)
else:
payload = pkt
@@ -940,6 +980,10 @@
self.log.info('deleting-olt', device_id=self.device_id,
logical_device_id=self.logical_device_id)
+ # Clears up the data from the resource manager KV store
+ # for the device
+ del self.resource_mgr
+
try:
# Rebooting to reset the state
self.reboot()
@@ -966,13 +1010,15 @@
self.log.info('openolt device reenabled')
def activate_onu(self, intf_id, onu_id, serial_number,
- serial_number_str):
+ serial_number_str, alloc_id):
pir = self.bw_mgr.pir(serial_number_str)
self.log.debug("activating-onu", intf_id=intf_id, onu_id=onu_id,
serial_number_str=serial_number_str,
- serial_number=serial_number, pir=pir)
+ serial_number=serial_number, pir=pir,
+ alloc_id=alloc_id)
onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id,
- serial_number=serial_number, pir=pir)
+ serial_number=serial_number, pir=pir,
+ alloc_id=alloc_id)
self.stub.ActivateOnu(onu)
self.log.info('onu-activated', serial_number=serial_number_str)
@@ -997,9 +1043,16 @@
self.log.error('port delete error', error=e)
serial_number = self.destringify_serial_number(
child_device.serial_number)
+ pon_intf_id_onu_id = (child_device.proxy_address.channel_id,
+ child_device.proxy_address.onu_id)
+ alloc_id = self.resource_mgr.get_alloc_id(pon_intf_id_onu_id)
+ # Free any PON resources that were reserved for the ONU
+ self.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
+
onu = openolt_pb2.Onu(intf_id=child_device.proxy_address.channel_id,
onu_id=child_device.proxy_address.onu_id,
- serial_number=serial_number)
+ serial_number=serial_number,
+ alloc_id=alloc_id)
self.stub.DeleteOnu(onu)
def reboot(self):
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 9e21510..d098065 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -72,7 +72,7 @@
class OpenOltFlowMgr(object):
def __init__(self, log, stub, device_id, logical_device_id,
- platform):
+ platform, resource_mgr):
self.log = log
self.stub = stub
self.device_id = device_id
@@ -83,6 +83,7 @@
self.flows_proxy = registry('core').get_proxy(
'/devices/{}/flows'.format(self.device_id))
self.root_proxy = registry('core').get_proxy('/')
+ self.resource_mgr = resource_mgr
def add_flow(self, flow):
self.log.debug('add flow', flow=flow)
@@ -318,8 +319,14 @@
def add_hsia_flow(self, intf_id, onu_id, classifier, action,
direction, hsia_id, logical_flow):
- gemport_id = self.platform.mk_gemport_id(intf_id, onu_id)
- alloc_id = self.platform.mk_alloc_id(intf_id, onu_id)
+ pon_intf_onu_id = (intf_id, onu_id)
+ gemport_id = self.resource_mgr.get_gemport_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+ alloc_id = self.resource_mgr.get_alloc_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+
flow_id = self.platform.mk_flow_id(intf_id, onu_id, hsia_id)
flow = openolt_pb2.Flow(
@@ -342,8 +349,14 @@
classifier[PACKET_TAG_TYPE] = SINGLE_TAG
classifier.pop(VLAN_VID, None)
- gemport_id = self.platform.mk_gemport_id(intf_id, onu_id)
- alloc_id = self.platform.mk_alloc_id(intf_id, onu_id)
+ pon_intf_onu_id = (intf_id, onu_id)
+ gemport_id = self.resource_mgr.get_gemport_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+ alloc_id = self.resource_mgr.get_alloc_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+
flow_id = self.platform.mk_flow_id(intf_id, onu_id, DHCP_FLOW_INDEX)
upstream_flow = openolt_pb2.Flow(
@@ -396,8 +409,14 @@
# Add Upstream EAPOL Flow.
- gemport_id = self.platform.mk_gemport_id(intf_id, onu_id)
- alloc_id = self.platform.mk_alloc_id(intf_id, onu_id)
+ pon_intf_onu_id = (intf_id, onu_id)
+ gemport_id = self.resource_mgr.get_gemport_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+ alloc_id = self.resource_mgr.get_alloc_id(
+ pon_intf_onu_id=pon_intf_onu_id
+ )
+
uplink_flow_id = self.platform.mk_flow_id(intf_id, onu_id, eapol_id)
upstream_flow = openolt_pb2.Flow(
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
new file mode 100644
index 0000000..64232d7
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -0,0 +1,211 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+
+from common.pon_resource_manager.resource_manager import PONResourceManager
+from voltha.registry import registry
+from voltha.core.config.config_backend import ConsulStore
+from voltha.core.config.config_backend import EtcdStore
+
+
+class OpenOltResourceMgr(object):
+ GEMPORT_IDS = "gemport_ids"
+ ALLOC_IDS = "alloc_ids"
+ BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
+
+ def __init__(self, device_id, host_and_port, extra_args, device_info):
+ self.log = structlog.get_logger(id=device_id,
+ ip=host_and_port)
+ self.device_id = device_id
+ self.host_and_port = host_and_port
+ self.extra_args = extra_args
+ self.device_info = device_info
+ self.args = registry('main').get_args()
+
+ # KV store's IP Address and PORT
+ host, port = '127.0.0.1', 8500
+ if self.args.backend == 'etcd':
+ host, port = self.args.etcd.split(':', 1)
+ self.kv_store = EtcdStore(host, port,
+ OpenOltResourceMgr.BASE_PATH_KV_STORE.format(device_id))
+ elif self.args.backend == 'consul':
+ host, port = self.args.consul.split(':', 1)
+ self.kv_store = ConsulStore(host, port,
+ OpenOltResourceMgr.BASE_PATH_KV_STORE.format(device_id))
+ else:
+ self.log.error('Invalid-backend')
+ raise Exception("Invalid-backend-for-kv-store")
+
+ self.resource_mgr = PONResourceManager(
+ self.device_info.technology,
+ self.extra_args,
+ self.device_id, self.args.backend,
+ host, port
+ )
+
+ # Flag to indicate whether information fetched from device should
+ # be used to intialize PON Resource Ranges
+ self.use_device_info = False
+
+ self.initialize_device_resource_range_and_pool()
+
+ def __del__(self):
+ self.log.info("clearing-device-resource-pool")
+ self.resource_mgr.clear_device_resource_pool()
+
+ def get_onu_id(self, pon_intf_id):
+ onu_id = self.resource_mgr.get_resource_id(
+ pon_intf_id, PONResourceManager.ONU_ID, 1)
+
+ if onu_id is not None:
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgr.init_resource_map(
+ pon_intf_onu_id)
+
+ return onu_id
+
+ def get_alloc_id(self, pon_intf_onu_id):
+ # Derive the pon_intf from the pon_intf_onu_id tuple
+ pon_intf = pon_intf_onu_id[0]
+ alloc_id_list = self.resource_mgr.get_current_alloc_ids_for_onu(
+ pon_intf_onu_id)
+
+ if alloc_id_list and len(alloc_id_list) > 0:
+ # Since we support only one alloc_id for the ONU at the moment,
+ # return the first alloc_id in the list, if available, for that
+ # ONU.
+ return alloc_id_list[0]
+
+ alloc_id_list = self.resource_mgr.get_resource_id(
+ pon_intf_id=pon_intf,
+ resource_type=PONResourceManager.ALLOC_ID,
+ num_of_id=1
+ )
+ if alloc_id_list and len(alloc_id_list) == 0:
+ self.log.error("no-alloc-id-available")
+ return None
+
+ # update the resource map on KV store with the list of alloc_id
+ # allocated for the pon_intf_onu_id tuple
+ self.resource_mgr.update_alloc_ids_for_onu(pon_intf_onu_id,
+ alloc_id_list)
+
+ # Since we request only one alloc id, we refer the 0th
+ # index
+ alloc_id = alloc_id_list[0]
+
+ return alloc_id
+
+ def get_gemport_id(self, pon_intf_onu_id):
+ # Derive the pon_intf and onu_id from the pon_intf_onu_id tuple
+ pon_intf = pon_intf_onu_id[0]
+ onu_id = pon_intf_onu_id[1]
+
+ gemport_id_list = self.resource_mgr.get_current_gemport_ids_for_onu(
+ pon_intf_onu_id)
+ if gemport_id_list and len(gemport_id_list) > 0:
+ # Since we support only one gemport_id for the ONU at the moment,
+ # return the first gemport_id in the list, if available, for that
+ # ONU.
+ return gemport_id_list[0]
+
+ gemport_id_list = self.resource_mgr.get_resource_id(
+ pon_intf_id=pon_intf,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ num_of_id=1
+ )
+
+ if gemport_id_list and len(gemport_id_list) == 0:
+ self.log.error("no-gemport-id-available")
+ return None
+
+ # update the resource map on KV store with the list of gemport_id
+ # allocated for the pon_intf_onu_id tuple
+ self.resource_mgr.update_gemport_ids_for_onu(pon_intf_onu_id,
+ gemport_id_list)
+
+ # We currently use only one gemport
+ gemport = gemport_id_list[0]
+
+ pon_intf_gemport = (pon_intf, gemport)
+ # This information is used when packet_indication is received and
+ # we need to derive the ONU Id for which the packet arrived based
+ # on the pon_intf and gemport available in the packet_indication
+ self.kv_store[str(pon_intf_gemport)] = str(onu_id)
+
+ return gemport
+
+ def free_onu_id(self, pon_intf_id, onu_id):
+ result = self.resource_mgr.free_resource_id(
+ pon_intf_id, PONResourceManager.ONU_ID, onu_id)
+
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgr.remove_resource_map(
+ pon_intf_onu_id)
+
+ def free_pon_resources_for_onu(self, pon_intf_id_onu_id):
+
+ alloc_ids = \
+ self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_id_onu_id)
+ pon_intf_id = pon_intf_id_onu_id[0]
+ onu_id = pon_intf_id_onu_id[1]
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.ALLOC_ID,
+ alloc_ids)
+
+ gemport_ids = \
+ self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_id_onu_id)
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.GEMPORT_ID,
+ gemport_ids)
+
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.ONU_ID,
+ onu_id)
+
+ # Clear resource map associated with (pon_intf_id, gemport_id) tuple.
+ self.resource_mgr.remove_resource_map(pon_intf_id_onu_id)
+
+ # Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
+ for gemport_id in gemport_ids:
+ del self.kv_store[str((pon_intf_id, gemport_id))]
+
+ def initialize_device_resource_range_and_pool(self):
+ if not self.use_device_info:
+ status = self.resource_mgr.init_resource_ranges_from_kv_store()
+ if not status:
+ self.log.error("failed-to-load-resource-range-from-kv-store")
+ # When we have failed to read the PON Resource ranges from KV
+ # store, use the information fetched from device.
+ self.use_device_info = True
+
+ if self.use_device_info:
+ self.log.info("using-device-info-to-init-pon-resource-ranges")
+ self.resource_mgr.init_default_pon_resource_ranges(
+ self.device_info.onu_id_start,
+ self.device_info.onu_id_end,
+ self.device_info.alloc_id_start,
+ self.device_info.alloc_id_end,
+ self.device_info.gemport_id_start,
+ self.device_info.gemport_id_end,
+ self.device_info.pon_ports
+ )
+
+ # After we have initialized resource ranges, initialize the
+ # resource pools accordingly.
+ self.resource_mgr.init_device_resource_pool()
+