VOL-1221: OpenOLT Adapter/Driver will use a Technology Profile Instance to create the OLT Upstream and Downstream Queuing and Scheduling Constructs for a Bidirectional Flow.
Change-Id: Iaf1a782529e2c459c586b158bd4f6447f548e004
diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index aa1b6ca..9e249fb 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -21,13 +21,14 @@
uses a KV store in backend to ensure resiliency of the data.
"""
import json
+import ast
import structlog
from bitstring import BitArray
-from ast import literal_eval
import shlex
from argparse import ArgumentParser, ArgumentError
from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+from common.tech_profile.tech_profile import TechProfile
# Used to parse extra arguments to OpenOlt adapter from the NBI
@@ -45,6 +46,7 @@
ONU_ID = 'ONU_ID'
ALLOC_ID = 'ALLOC_ID'
GEMPORT_ID = 'GEMPORT_ID'
+ FLOW_ID = 'FLOW_ID'
# Constants for passing command line arugments
OLT_MODEL_ARG = '--olt_model'
@@ -63,6 +65,8 @@
"alloc_id_end": 2816,
"gemport_id_start": 1024,
"gemport_id_end": 8960,
+ "flow_id_start": 1,
+ "flow_id_end": 16383,
"pon_ports": 16
}
@@ -78,6 +82,9 @@
GEMPORT_ID_START_IDX = "gemport_id_start"
GEMPORT_ID_END_IDX = "gemport_id_end"
GEMPORT_ID_SHARED_IDX = "gemport_id_shared"
+ FLOW_ID_START_IDX = "flow_id_start"
+ FLOW_ID_END_IDX = "flow_id_end"
+ FLOW_ID_SHARED_IDX = "flow_id_shared"
NUM_OF_PON_PORT = "pon_ports"
# PON Resource range configuration on the KV store.
@@ -90,6 +97,7 @@
ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+ FLOW_ID_POOL_PATH = '{}/flow_id_pool/{}'
# Path on the KV store for storing list of alloc IDs for a given ONU
# Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
@@ -99,6 +107,14 @@
# Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
+ # Path on the KV store for storing list of Flow IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_ids
+ FLOW_ID_RESOURCE_MAP_PATH = '{}/{}/flow_ids'
+
+ # Flow Id info: Use to store more metadata associated with the flow_id
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/flow_id_info/<flow_id>
+ FLOW_ID_INFO_PATH = '{}/{}/flow_id_info/{}'
+
# Constants for internal usage.
PON_INTF_ID = 'pon_intf_id'
START_IDX = 'start_idx'
@@ -133,6 +149,7 @@
self._kv_store = ResourceKvStore(technology, device_id, backend,
host, port)
+ self.tech_profile = TechProfile(self)
# Below attribute, pon_resource_ranges, should be initialized
# by reading from KV store.
@@ -140,16 +157,19 @@
self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX] = None
self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+ self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX] = None
self.shared_resource_mgrs = dict()
self.shared_resource_mgrs[PONResourceManager.ONU_ID_SHARED_IDX] = None
self.shared_resource_mgrs[PONResourceManager.ALLOC_ID_SHARED_IDX] = None
self.shared_resource_mgrs[PONResourceManager.GEMPORT_ID_SHARED_IDX] = None
+ self.shared_resource_mgrs[PONResourceManager.FLOW_ID_SHARED_IDX] = None
self.shared_idx_by_type = dict()
self.shared_idx_by_type[PONResourceManager.ONU_ID] = PONResourceManager.ONU_ID_SHARED_IDX
self.shared_idx_by_type[PONResourceManager.ALLOC_ID] = PONResourceManager.ALLOC_ID_SHARED_IDX
self.shared_idx_by_type[PONResourceManager.GEMPORT_ID] = PONResourceManager.GEMPORT_ID_SHARED_IDX
+ self.shared_idx_by_type[PONResourceManager.FLOW_ID] = PONResourceManager.FLOW_ID_SHARED_IDX
self.intf_ids = None
@@ -193,19 +213,18 @@
e=e)
return False
-
def update_range_(self, start_idx, start, end_idx, end, shared_idx, shared_pool_id, shared_resource_mgr):
if (start is not None) and \
- (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
- self.pon_resource_ranges[start_idx] = start
+ (start_idx not in self.pon_resource_ranges or self.pon_resource_ranges[start_idx] < start):
+ self.pon_resource_ranges[start_idx] = start
if (end is not None) and \
- (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
- self.pon_resource_ranges[end_idx] = end
+ (end_idx not in self.pon_resource_ranges or self.pon_resource_ranges[end_idx] > end):
+ self.pon_resource_ranges[end_idx] = end
if (shared_pool_id is not None) and \
- (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
+ (shared_idx not in self.pon_resource_ranges or self.pon_resource_ranges[shared_idx] is None):
self.pon_resource_ranges[shared_idx] = shared_pool_id
if (shared_resource_mgr is not None) and \
- (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
+ (shared_idx not in self.shared_resource_mgrs or self.shared_resource_mgrs[shared_idx] is None):
self.shared_resource_mgrs[shared_idx] = shared_resource_mgr
def update_ranges(self,
@@ -220,22 +239,31 @@
gemport_id_start_idx=None,
gemport_id_end_idx=None,
gemport_id_shared_pool_id=None,
- gemport_id_shared_resource_mgr=None):
+ gemport_id_shared_resource_mgr=None,
+ flow_id_start_idx=None,
+ flow_id_end_idx=None,
+ flow_id_shared_pool_id=None,
+ flow_id_shared_resource_mgr=None):
self.update_range_(PONResourceManager.ONU_ID_START_IDX, onu_id_start_idx,
- PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
- PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
- onu_id_shared_resource_mgr)
+ PONResourceManager.ONU_ID_END_IDX, onu_id_end_idx,
+ PONResourceManager.ONU_ID_SHARED_IDX, onu_id_shared_pool_id,
+ onu_id_shared_resource_mgr)
self.update_range_(PONResourceManager.ALLOC_ID_START_IDX, alloc_id_start_idx,
- PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
- PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
- alloc_id_shared_resource_mgr)
+ PONResourceManager.ALLOC_ID_END_IDX, alloc_id_end_idx,
+ PONResourceManager.ALLOC_ID_SHARED_IDX, alloc_id_shared_pool_id,
+ alloc_id_shared_resource_mgr)
self.update_range_(PONResourceManager.GEMPORT_ID_START_IDX, gemport_id_start_idx,
- PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
- PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
- gemport_id_shared_resource_mgr)
+ PONResourceManager.GEMPORT_ID_END_IDX, gemport_id_end_idx,
+ PONResourceManager.GEMPORT_ID_SHARED_IDX, gemport_id_shared_pool_id,
+ gemport_id_shared_resource_mgr)
+
+ self.update_range_(PONResourceManager.FLOW_ID_START_IDX, flow_id_start_idx,
+ PONResourceManager.FLOW_ID_END_IDX, flow_id_end_idx,
+ PONResourceManager.FLOW_ID_SHARED_IDX, flow_id_shared_pool_id,
+ flow_id_shared_resource_mgr)
def init_default_pon_resource_ranges(self,
onu_id_start_idx=1,
@@ -247,6 +275,9 @@
gemport_id_start_idx=1024,
gemport_id_end_idx=8960,
gemport_id_shared_pool_id=None,
+ flow_id_start_idx=1,
+ flow_id_end_idx=16383,
+ flow_id_shared_pool_id=None,
num_of_pon_ports=16,
intf_ids=None):
"""
@@ -261,6 +292,9 @@
:param gemport_id_start_idx: gemport id start index
:param gemport_id_end_idx: gemport id end index
:param gemport_id_shared_pool_id: pool idx for gemport id shared by all intfs or None for no sharing
+ :param flow_id_start_idx: flow id start index
+ :param flow_id_end_idx: flow id end index
+ :param flow_id_shared_pool_id: pool idx for flow id shared by all intfs or None for no sharing
:param num_of_pon_ports: number of PON ports
:param intf_ids: interfaces serviced by this manager
"""
@@ -268,7 +302,8 @@
self.update_ranges(onu_id_start_idx, onu_id_end_idx, onu_id_shared_pool_id, None,
alloc_id_start_idx, alloc_id_end_idx, alloc_id_shared_pool_id, None,
- gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None)
+ gemport_id_start_idx, gemport_id_end_idx, gemport_id_shared_pool_id, None,
+ flow_id_start_idx, flow_id_end_idx, flow_id_shared_pool_id, None)
if intf_ids is None:
intf_ids = range(0, num_of_pon_ports)
@@ -281,11 +316,12 @@
"""
self._log.info("init-device-resource-pool", technology=self.technology,
- pon_resource_ranges=self.pon_resource_ranges)
+ pon_resource_ranges=self.pon_resource_ranges)
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ONU_ID,
@@ -293,11 +329,13 @@
PONResourceManager.ONU_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.ONU_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ALLOC_ID,
@@ -305,11 +343,13 @@
PONResourceManager.ALLOC_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.ALLOC_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.init_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.GEMPORT_ID,
@@ -317,7 +357,22 @@
PONResourceManager.GEMPORT_ID_START_IDX],
end_idx=self.pon_resource_ranges[
PONResourceManager.GEMPORT_ID_END_IDX])
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
+
+ for i in self.intf_ids:
+ shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+ if shared_pool_id is not None:
+ i = shared_pool_id
+ self.init_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.FLOW_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.FLOW_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.FLOW_ID_END_IDX])
+ if shared_pool_id is not None:
+ break
def clear_device_resource_pool(self):
"""
@@ -325,30 +380,47 @@
"""
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ONU_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ONU_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.ALLOC_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.ALLOC_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
for i in self.intf_ids:
shared_pool_id = self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_SHARED_IDX]
- if shared_pool_id is not None: i = shared_pool_id
+ if shared_pool_id is not None:
+ i = shared_pool_id
self.clear_resource_id_pool(
pon_intf_id=i,
resource_type=PONResourceManager.GEMPORT_ID,
)
- if shared_pool_id is not None: break
+ if shared_pool_id is not None:
+ break
+
+ for i in self.intf_ids:
+ shared_pool_id = self.pon_resource_ranges[PONResourceManager.FLOW_ID_SHARED_IDX]
+ if shared_pool_id is not None:
+ i = shared_pool_id
+ self.clear_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.FLOW_ID,
+ )
+ if shared_pool_id is not None:
+ break
def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
end_idx):
@@ -367,7 +439,7 @@
shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
if shared_resource_mgr is not None and shared_resource_mgr is not self:
return shared_resource_mgr.init_resource_id_pool(pon_intf_id, resource_type,
- start_idx, end_idx)
+ start_idx, end_idx)
path = self._get_path(pon_intf_id, resource_type)
if path is None:
@@ -398,7 +470,7 @@
def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
"""
- Create alloc/gemport/onu id for given OLT PON interface.
+ Create alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
:param resource_type: String to identify type of resource
@@ -409,6 +481,10 @@
"""
result = None
+ if num_of_id < 1:
+ self._log.error("invalid-num-of-resources-requested")
+ return result
+
# delegate to the master instance if sharing enabled across instances
shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
if shared_resource_mgr is not None and shared_resource_mgr is not self:
@@ -420,16 +496,20 @@
try:
resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
+ if resource is not None and \
+ (resource_type == PONResourceManager.ONU_ID or
+ resource_type == PONResourceManager.FLOW_ID):
result = self._generate_next_id(resource)
elif resource is not None and (
resource_type == PONResourceManager.GEMPORT_ID or
resource_type == PONResourceManager.ALLOC_ID):
- result = list()
- while num_of_id > 0:
- result.append(self._generate_next_id(resource))
- num_of_id -= 1
+ if num_of_id == 1:
+ result = self._generate_next_id(resource)
+ else:
+ result = list()
+ while num_of_id > 0:
+ result.append(self._generate_next_id(resource))
+ num_of_id -= 1
else:
raise Exception("get-resource-failed")
@@ -445,7 +525,7 @@
def free_resource_id(self, pon_intf_id, resource_type, release_content):
"""
- Release alloc/gemport/onu id for given OLT PON interface.
+ Release alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
:param resource_type: String to identify type of resource
@@ -466,8 +546,9 @@
try:
resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
+ if resource is not None and (
+ resource_type == PONResourceManager.ONU_ID or
+ resource_type == PONResourceManager.FLOW_ID):
self._release_id(resource, release_content)
elif resource is not None and (
resource_type == PONResourceManager.ALLOC_ID or
@@ -564,6 +645,8 @@
Get currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of alloc_ids if available, else None
"""
path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
self.device_id,
@@ -581,6 +664,8 @@
Get currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of gemport IDs if available, else None
"""
path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
@@ -594,11 +679,68 @@
return None
+ def get_current_flow_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured flow ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+
+ :return list: List of Flow IDs if available, else None
+ """
+
+ path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ flow_id_list = json.loads(value)
+ assert(isinstance(flow_id_list, list))
+ if len(flow_id_list) > 0:
+ return flow_id_list
+
+ return None
+
+ def get_flow_id_info(self, pon_intf_onu_id, flow_id):
+ """
+ Get flow_id details configured for the ONU.
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+
+ :return blob: Flow data blob if available, else None
+ """
+
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id),
+ flow_id)
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ return ast.literal_eval(value)
+
+ return None
+
+ def remove_flow_id_info(self, pon_intf_onu_id, flow_id):
+ """
+ Get flow_id details configured for the ONU.
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+
+ """
+
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id),
+ flow_id)
+ self._kv_store.remove_from_kv_store(path)
+
def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
"""
Update currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+ :param alloc_ids: list of alloc ids
"""
path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
self.device_id, str(pon_intf_onu_id)
@@ -612,6 +754,7 @@
Update currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
+ :param gemport_ids: list of gem port ids
"""
path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
self.device_id, str(pon_intf_onu_id)
@@ -620,6 +763,48 @@
path, json.dumps(gemport_ids)
)
+ def update_flow_id_for_onu(self, pon_intf_onu_id, flow_id, add=True):
+ """
+ Update the flow_id list of the ONU (add or remove flow_id from the list)
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: flow ID
+ :param add: Boolean flag to indicate whether the flow_id should be
+ added or removed from the list. Defaults to adding the flow.
+ """
+ path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ current_flow_ids = self.get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if not isinstance(current_flow_ids, list):
+ # When the first flow_id is being added, the current_flow_ids is None
+ current_flow_ids = list()
+
+ if add:
+ if flow_id not in current_flow_ids:
+ current_flow_ids.append(flow_id)
+ else:
+ if flow_id in current_flow_ids:
+ current_flow_ids.remove(flow_id)
+
+ self._kv_store.update_to_kv_store(path, current_flow_ids)
+
+ def update_flow_id_info_for_onu(self, pon_intf_onu_id, flow_id, flow_data):
+ """
+ Update any metadata associated with the flow_id. The flow_data could be json
+ or any of other data structure. The resource manager doesnt care
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow ID
+ :param flow_data: Flow data blob
+ """
+ path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id, str(pon_intf_onu_id), flow_id
+ )
+
+ if not self._kv_store.update_to_kv_store(path, flow_data):
+ self._log.error("flow-info-update-failed", path=path, flow_id=flow_id)
+
def _get_olt_model(self):
"""
Get olt model variant
@@ -673,7 +858,8 @@
"""
shared_pool_id = self.pon_resource_ranges[self.shared_idx_by_type[resource_type]]
- if shared_pool_id is not None: pon_intf_id = shared_pool_id
+ if shared_pool_id is not None:
+ pon_intf_id = shared_pool_id
path = None
if resource_type == PONResourceManager.ONU_ID:
@@ -682,10 +868,22 @@
path = self._get_alloc_id_resource_path(pon_intf_id)
elif resource_type == PONResourceManager.GEMPORT_ID:
path = self._get_gemport_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.FLOW_ID:
+ path = self._get_flow_id_resource_path(pon_intf_id)
else:
self._log.error("invalid-resource-pool-identifier")
return path
+ def _get_flow_id_resource_path(self, pon_intf_id):
+ """
+ Get flow id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: flow id resource path
+ """
+ return PONResourceManager.FLOW_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
def _get_alloc_id_resource_path(self, pon_intf_id):
"""
Get alloc id resource path.
diff --git a/common/tech_profile/__init__.py b/common/tech_profile/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/common/tech_profile/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/common/tech_profile/tech_profile.py b/common/tech_profile/tech_profile.py
new file mode 100644
index 0000000..c3a9993
--- /dev/null
+++ b/common/tech_profile/tech_profile.py
@@ -0,0 +1,587 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import ast
+from collections import namedtuple
+import structlog
+from enum import Enum
+
+from voltha.core.config.config_backend import ConsulStore
+from voltha.core.config.config_backend import EtcdStore
+from voltha.registry import registry
+from voltha.adapters.openolt.protos import openolt_pb2
+
+
+# logger
+log = structlog.get_logger()
+
+DEFAULT_TECH_PROFILE_TABLE_ID = 64
+
+# Enums used while creating TechProfileInstance
+Direction = Enum('Direction', ['UPSTREAM', 'DOWNSTREAM', 'BIDIRECTIONAL'],
+ start=0)
+SchedulingPolicy = Enum('SchedulingPolicy',
+ ['WRR', 'StrictPriority', 'Hybrid'], start=0)
+AdditionalBW = Enum('AdditionalBW', ['None', 'NA', 'BestEffort', 'Auto'],
+ start=0)
+DiscardPolicy = Enum('DiscardPolicy',
+ ['TailDrop', 'WTailDrop', 'RED', 'WRED'], start=0)
+InferredAdditionBWIndication = Enum('InferredAdditionBWIndication',
+ ['None', 'NoneAssured', 'BestEffort'],
+ start=0)
+
+
+class InstanceControl(object):
+ # Default value constants
+ ONU_DEFAULT_INSTANCE = 'multi-instance'
+ UNI_DEFAULT_INSTANCE = 'single-instance'
+ DEFAULT_NUM_GEM_PORTS = 1
+ DEFAULT_GEM_PAYLOAD_SIZE = 'auto'
+
+ def __init__(self, onu=ONU_DEFAULT_INSTANCE,
+ uni=UNI_DEFAULT_INSTANCE,
+ num_gem_ports=DEFAULT_NUM_GEM_PORTS,
+ max_gem_payload_size=DEFAULT_GEM_PAYLOAD_SIZE):
+ self.onu = onu
+ self.uni = uni
+ self.num_gem_ports = num_gem_ports
+ self.max_gem_payload_size = max_gem_payload_size
+
+
+class Scheduler(object):
+ # Default value constants
+ DEFAULT_ADDITIONAL_BW = 'auto'
+ DEFAULT_PRIORITY = 0
+ DEFAULT_WEIGHT = 0
+ DEFAULT_Q_SCHED_POLICY = 'hybrid'
+
+ def __init__(self, direction, additional_bw=DEFAULT_ADDITIONAL_BW,
+ priority=DEFAULT_PRIORITY,
+ weight=DEFAULT_WEIGHT,
+ q_sched_policy=DEFAULT_Q_SCHED_POLICY):
+ self.direction = direction
+ self.additional_bw = additional_bw
+ self.priority = priority
+ self.weight = weight
+ self.q_sched_policy = q_sched_policy
+
+
+class GemPortAttribute(object):
+ # Default value constants
+ DEFAULT_AES_ENCRYPTION = 'True'
+ DEFAULT_PRIORITY_Q = 0
+ DEFAULT_WEIGHT = 0
+ DEFAULT_MAX_Q_SIZE = 'auto'
+ DEFAULT_DISCARD_POLICY = DiscardPolicy.TailDrop.name
+
+ def __init__(self, pbit_map, discard_config,
+ aes_encryption=DEFAULT_AES_ENCRYPTION,
+ scheduling_policy=SchedulingPolicy.WRR.name,
+ priority_q=DEFAULT_PRIORITY_Q,
+ weight=DEFAULT_WEIGHT,
+ max_q_size=DEFAULT_MAX_Q_SIZE,
+ discard_policy=DiscardPolicy.TailDrop.name):
+ self.max_q_size = max_q_size
+ self.pbit_map = pbit_map
+ self.aes_encryption = aes_encryption
+ self.scheduling_policy = scheduling_policy
+ self.priority_q = priority_q
+ self.weight = weight
+ self.discard_policy = discard_policy
+ self.discard_config = discard_config
+
+
+class DiscardConfig(object):
+ # Default value constants
+ DEFAULT_MIN_THRESHOLD = 0
+ DEFAULT_MAX_THRESHOLD = 0
+ DEFAULT_MAX_PROBABILITY = 0
+
+ def __init__(self, min_threshold=DEFAULT_MIN_THRESHOLD,
+ max_threshold=DEFAULT_MAX_THRESHOLD,
+ max_probability=DEFAULT_MAX_PROBABILITY):
+ self.min_threshold = min_threshold
+ self.max_threshold = max_threshold
+ self.max_probability = max_probability
+
+
+class TechProfile(object):
+ # Constants used in default tech profile
+ DEFAULT_TECH_PROFILE_NAME = 'Default_1tcont_1gem_Profile'
+ DEFAULT_VERSION = 1.0
+ DEFAULT_GEMPORTS_COUNT = 1
+ pbits = ['0b11111111']
+
+ # Tech profile path prefix in kv store
+ KV_STORE_TECH_PROFILE_PATH_PREFIX = 'voltha/technology_profiles'
+
+ # Tech profile path in kv store
+ TECH_PROFILE_PATH = '{}/{}' # <technology>/<table_id>
+
+ # Tech profile instance path in kv store
+ # Format: <technology>/<table_id>/<uni_port_name>
+ TECH_PROFILE_INSTANCE_PATH = '{}/{}/{}'
+
+ # Tech-Profile JSON String Keys
+ NAME = 'name'
+ PROFILE_TYPE = 'profile_type'
+ VERSION = 'version'
+ NUM_GEM_PORTS = 'num_gem_ports'
+ INSTANCE_CONTROL = 'instance_control'
+ US_SCHEDULER = 'us_scheduler'
+ DS_SCHEDULER = 'ds_scheduler'
+ UPSTREAM_GEM_PORT_ATTRIBUTE_LIST = 'upstream_gem_port_attribute_list'
+ DOWNSTREAM_GEM_PORT_ATTRIBUTE_LIST = 'downstream_gem_port_attribute_list'
+ ONU = 'onu'
+ UNI = 'uni'
+ MAX_GEM_PAYLOAD_SIZE = 'max_gem_payload_size'
+ DIRECTION = 'direction'
+ ADDITIONAL_BW = 'additional_bw'
+ PRIORITY = 'priority'
+ Q_SCHED_POLICY = 'q_sched_policy'
+ WEIGHT = 'weight'
+ PBIT_MAP = 'pbit_map'
+ DISCARD_CONFIG = 'discard_config'
+ MAX_THRESHOLD = 'max_threshold'
+ MIN_THRESHOLD = 'min_threshold'
+ MAX_PROBABILITY = 'max_probability'
+ DISCARD_POLICY = 'discard_policy'
+ PRIORITY_Q = 'priority_q'
+ SCHEDULING_POLICY = 'scheduling_policy'
+ MAX_Q_SIZE = 'max_q_size'
+ AES_ENCRYPTION = 'aes_encryption'
+
+ def __init__(self, resource_mgr):
+ try:
+ self.args = registry('main').get_args()
+ self.resource_mgr = resource_mgr
+
+ if self.args.backend == 'etcd':
+ # KV store's IP Address and PORT
+ host, port = self.args.etcd.split(':', 1)
+ self._kv_store = EtcdStore(
+ host, port, TechProfile.
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ elif self.args.backend == 'consul':
+ # KV store's IP Address and PORT
+ host, port = self.args.consul.split(':', 1)
+ self._kv_store = ConsulStore(
+ host, port, TechProfile.
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
+
+ # self.tech_profile_instance_store = dict()
+ except Exception as e:
+ log.exception("exception-in-init")
+ raise Exception(e)
+
+ class DefaultTechProfile(object):
+ def __init__(self, name, **kwargs):
+ self.name = name
+ self.profile_type = kwargs[TechProfile.PROFILE_TYPE]
+ self.version = kwargs[TechProfile.VERSION]
+ self.num_gem_ports = kwargs[TechProfile.NUM_GEM_PORTS]
+ self.instance_control = kwargs[TechProfile.INSTANCE_CONTROL]
+ self.us_scheduler = kwargs[TechProfile.US_SCHEDULER]
+ self.ds_scheduler = kwargs[TechProfile.DS_SCHEDULER]
+ self.upstream_gem_port_attribute_list = kwargs[
+ TechProfile.UPSTREAM_GEM_PORT_ATTRIBUTE_LIST]
+ self.downstream_gem_port_attribute_list = kwargs[
+ TechProfile.DOWNSTREAM_GEM_PORT_ATTRIBUTE_LIST]
+
+ def to_json(self):
+ return json.dumps(self, default=lambda o: o.__dict__,
+ indent=4)
+
+ def get_tp_path(self, table_id, uni_port_name):
+ return TechProfile.TECH_PROFILE_INSTANCE_PATH.format(
+ self.resource_mgr.technology, table_id, uni_port_name)
+
+ def create_tech_profile_instance(self, table_id, uni_port_name, intf_id):
+ tech_profile_instance = None
+ try:
+ # Get tech profile from kv store
+ tech_profile = self._get_tech_profile_from_kv_store(table_id)
+ path = self.get_tp_path(table_id, uni_port_name)
+
+ if tech_profile is not None:
+ tech_profile = self._get_tech_profile(tech_profile)
+ log.debug(
+ "Created-tech-profile-instance-with-values-from-kvstore")
+ else:
+ tech_profile = self._default_tech_profile()
+ log.debug(
+ "Created-tech-profile-instance-with-default-values")
+
+ tech_profile_instance = TechProfileInstance(
+ uni_port_name, tech_profile, self.resource_mgr, intf_id)
+ self._add_tech_profile_instance(path,
+ tech_profile_instance.to_json())
+ except Exception as e:
+ log.exception("Create-tech-profile-instance-failed", exception=e)
+
+ return tech_profile_instance
+
+ def get_tech_profile_instance(self, table_id, uni_port_name):
+ # path to fetch tech profile instance json from kv store
+ path = TechProfile.TECH_PROFILE_INSTANCE_PATH.format(
+ self.resource_mgr.technology, table_id, uni_port_name)
+
+ try:
+ tech_profile_instance = self._kv_store[path]
+ log.debug("Tech-profile-instance-present-in-kvstore", path=path,
+ tech_profile_instance=tech_profile_instance)
+
+ # Parse JSON into an object with attributes corresponding to dict keys.
+ tech_profile_instance = json.loads(tech_profile_instance,
+ object_hook=lambda d:
+ namedtuple('tech_profile_instance',
+ d.keys())(*d.values()))
+ log.debug("Tech-profile-instance-after-json-to-object-conversion", path=path,
+ tech_profile_instance=tech_profile_instance)
+ return tech_profile_instance
+ except BaseException as e:
+ log.debug("Tech-profile-instance-not-present-in-kvstore",
+ path=path, tech_profile_instance=None, exception=e)
+ return None
+
+ def delete_tech_profile_instance(self, table_id, uni_port_name):
+ # path to delete tech profile instance json from kv store
+ path = TechProfile.TECH_PROFILE_INSTANCE_PATH.format(
+ self.resource_mgr.technology, table_id, uni_port_name)
+
+ try:
+ del self._kv_store[path]
+ log.debug("Delete-tech-profile-instance-success", path=path)
+ return True
+ except Exception as e:
+ log.debug("Delete-tech-profile-instance-failed", path=path,
+ exception=e)
+ return False
+
+ def _get_tech_profile_from_kv_store(self, table_id):
+ """
+ Get tech profile from kv store.
+
+ :param table_id: reference to get tech profile
+ :return: tech profile if present in kv store else None
+ """
+ # get tech profile from kv store
+ path = TechProfile.TECH_PROFILE_PATH.format(self.resource_mgr.technology,
+ table_id)
+ try:
+ tech_profile = self._kv_store[path]
+ if tech_profile != '':
+ log.debug("Get-tech-profile-success", tech_profile=tech_profile)
+ return json.loads(tech_profile)
+ # return ast.literal_eval(tech_profile)
+ except KeyError as e:
+ log.info("Get-tech-profile-failed", exception=e)
+ return None
+
+ def _default_tech_profile(self):
+ # Default tech profile
+ upstream_gem_port_attribute_list = list()
+ downstream_gem_port_attribute_list = list()
+ for pbit in TechProfile.pbits:
+ upstream_gem_port_attribute_list.append(
+ GemPortAttribute(pbit_map=pbit,
+ discard_config=DiscardConfig()))
+ downstream_gem_port_attribute_list.append(
+ GemPortAttribute(pbit_map=pbit,
+ discard_config=DiscardConfig()))
+
+ return TechProfile.DefaultTechProfile(
+ TechProfile.DEFAULT_TECH_PROFILE_NAME,
+ profile_type=self.resource_mgr.technology,
+ version=TechProfile.DEFAULT_VERSION,
+ num_gem_ports=TechProfile.DEFAULT_GEMPORTS_COUNT,
+ instance_control=InstanceControl(),
+ us_scheduler=Scheduler(direction=Direction.UPSTREAM.name),
+ ds_scheduler=Scheduler(direction=Direction.DOWNSTREAM.name),
+ upstream_gem_port_attribute_list=upstream_gem_port_attribute_list,
+ downstream_gem_port_attribute_list=
+ downstream_gem_port_attribute_list)
+
+ @staticmethod
+ def _get_tech_profile(tech_profile):
+ # Tech profile fetched from kv store
+ instance_control = tech_profile[TechProfile.INSTANCE_CONTROL]
+ instance_control = InstanceControl(
+ onu=instance_control[TechProfile.ONU],
+ uni=instance_control[TechProfile.UNI],
+ max_gem_payload_size=instance_control[
+ TechProfile.MAX_GEM_PAYLOAD_SIZE])
+
+ us_scheduler = tech_profile[TechProfile.US_SCHEDULER]
+ us_scheduler = Scheduler(direction=us_scheduler[TechProfile.DIRECTION],
+ additional_bw=us_scheduler[
+ TechProfile.ADDITIONAL_BW],
+ priority=us_scheduler[TechProfile.PRIORITY],
+ weight=us_scheduler[TechProfile.WEIGHT],
+ q_sched_policy=us_scheduler[
+ TechProfile.Q_SCHED_POLICY])
+ ds_scheduler = tech_profile[TechProfile.DS_SCHEDULER]
+ ds_scheduler = Scheduler(direction=ds_scheduler[TechProfile.DIRECTION],
+ additional_bw=ds_scheduler[
+ TechProfile.ADDITIONAL_BW],
+ priority=ds_scheduler[TechProfile.PRIORITY],
+ weight=ds_scheduler[TechProfile.WEIGHT],
+ q_sched_policy=ds_scheduler[
+ TechProfile.Q_SCHED_POLICY])
+
+ upstream_gem_port_attribute_list = list()
+ downstream_gem_port_attribute_list = list()
+ us_gemport_attr_list = tech_profile[
+ TechProfile.UPSTREAM_GEM_PORT_ATTRIBUTE_LIST]
+ for i in range(len(us_gemport_attr_list)):
+ upstream_gem_port_attribute_list.append(
+ GemPortAttribute(pbit_map=us_gemport_attr_list[i][TechProfile.PBIT_MAP],
+ discard_config=DiscardConfig(
+ max_threshold=
+ us_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MAX_THRESHOLD],
+ min_threshold=
+ us_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MIN_THRESHOLD],
+ max_probability=
+ us_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MAX_PROBABILITY]),
+ discard_policy=us_gemport_attr_list[i][
+ TechProfile.DISCARD_POLICY],
+ priority_q=us_gemport_attr_list[i][
+ TechProfile.PRIORITY_Q],
+ weight=us_gemport_attr_list[i][TechProfile.WEIGHT],
+ scheduling_policy=us_gemport_attr_list[i][
+ TechProfile.SCHEDULING_POLICY],
+ max_q_size=us_gemport_attr_list[i][
+ TechProfile.MAX_Q_SIZE],
+ aes_encryption=us_gemport_attr_list[i][
+ TechProfile.AES_ENCRYPTION]))
+
+ ds_gemport_attr_list = tech_profile[
+ TechProfile.DOWNSTREAM_GEM_PORT_ATTRIBUTE_LIST]
+ for i in range(len(ds_gemport_attr_list)):
+ downstream_gem_port_attribute_list.append(
+ GemPortAttribute(pbit_map=ds_gemport_attr_list[i][TechProfile.PBIT_MAP],
+ discard_config=DiscardConfig(
+ max_threshold=
+ ds_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MAX_THRESHOLD],
+ min_threshold=
+ ds_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MIN_THRESHOLD],
+ max_probability=
+ ds_gemport_attr_list[i][TechProfile.DISCARD_CONFIG][
+ TechProfile.MAX_PROBABILITY]),
+ discard_policy=ds_gemport_attr_list[i][
+ TechProfile.DISCARD_POLICY],
+ priority_q=ds_gemport_attr_list[i][
+ TechProfile.PRIORITY_Q],
+ weight=ds_gemport_attr_list[i][TechProfile.WEIGHT],
+ scheduling_policy=ds_gemport_attr_list[i][
+ TechProfile.SCHEDULING_POLICY],
+ max_q_size=ds_gemport_attr_list[i][
+ TechProfile.MAX_Q_SIZE],
+ aes_encryption=ds_gemport_attr_list[i][
+ TechProfile.AES_ENCRYPTION]))
+
+ return TechProfile.DefaultTechProfile(
+ tech_profile[TechProfile.NAME],
+ profile_type=tech_profile[TechProfile.PROFILE_TYPE],
+ version=tech_profile[TechProfile.VERSION],
+ num_gem_ports=tech_profile[TechProfile.NUM_GEM_PORTS],
+ instance_control=instance_control,
+ us_scheduler=us_scheduler,
+ ds_scheduler=ds_scheduler,
+ upstream_gem_port_attribute_list=upstream_gem_port_attribute_list,
+ downstream_gem_port_attribute_list=
+ downstream_gem_port_attribute_list)
+
+ def _add_tech_profile_instance(self, path, tech_profile_instance):
+ """
+ Add tech profile to kv store.
+
+ :param path: path to add tech profile
+ :param tech_profile_instance: tech profile instance need to be added
+ """
+ try:
+ self._kv_store[path] = str(tech_profile_instance)
+ log.debug("Add-tech-profile-instance-success", path=path,
+ tech_profile_instance=tech_profile_instance)
+ return True
+ except BaseException as e:
+ log.exception("Add-tech-profile-instance-failed", path=path,
+ tech_profile_instance=tech_profile_instance,
+ exception=e)
+ return False
+
+ @staticmethod
+ def get_us_scheduler(tech_profile_instance):
+ # upstream scheduler
+ us_scheduler = openolt_pb2.Scheduler(
+ direction=TechProfile.get_parameter(
+ 'direction', tech_profile_instance.us_scheduler.
+ direction),
+ additional_bw=TechProfile.get_parameter(
+ 'additional_bw', tech_profile_instance.
+ us_scheduler.additional_bw),
+ priority=tech_profile_instance.us_scheduler.priority,
+ weight=tech_profile_instance.us_scheduler.weight,
+ sched_policy=TechProfile.get_parameter(
+ 'sched_policy', tech_profile_instance.
+ us_scheduler.q_sched_policy))
+
+ return us_scheduler
+
+ @staticmethod
+ def get_ds_scheduler(tech_profile_instance):
+ ds_scheduler = openolt_pb2.Scheduler(
+ direction=TechProfile.get_parameter(
+ 'direction', tech_profile_instance.ds_scheduler.
+ direction),
+ additional_bw=TechProfile.get_parameter(
+ 'additional_bw', tech_profile_instance.
+ ds_scheduler.additional_bw),
+ priority=tech_profile_instance.ds_scheduler.priority,
+ weight=tech_profile_instance.ds_scheduler.weight,
+ sched_policy=TechProfile.get_parameter(
+ 'sched_policy', tech_profile_instance.ds_scheduler.
+ q_sched_policy))
+
+ return ds_scheduler
+
+ @staticmethod
+ def get_tconts(tech_profile_instance, us_scheduler=None, ds_scheduler=None):
+ if us_scheduler is None:
+ us_scheduler = TechProfile.get_us_scheduler(tech_profile_instance)
+ if ds_scheduler is None:
+ ds_scheduler = TechProfile.get_ds_scheduler(tech_profile_instance)
+
+ tconts = [openolt_pb2.Tcont(direction=TechProfile.get_parameter(
+ 'direction',
+ tech_profile_instance.
+ us_scheduler.direction),
+ alloc_id=tech_profile_instance.
+ us_scheduler.alloc_id,
+ scheduler=us_scheduler),
+ openolt_pb2.Tcont(direction=TechProfile.get_parameter(
+ 'direction',
+ tech_profile_instance.
+ ds_scheduler.direction),
+ alloc_id=tech_profile_instance.
+ ds_scheduler.alloc_id,
+ scheduler=ds_scheduler)]
+
+ return tconts
+
+ @staticmethod
+ def get_parameter(param_type, param_value):
+ parameter = None
+ try:
+ if param_type == 'direction':
+ for direction in openolt_pb2.Direction.keys():
+ if param_value == direction:
+ parameter = direction
+ elif param_type == 'discard_policy':
+ for discard_policy in openolt_pb2.DiscardPolicy.keys():
+ if param_value == discard_policy:
+ parameter = discard_policy
+ elif param_type == 'sched_policy':
+ for sched_policy in openolt_pb2.SchedulingPolicy.keys():
+ if param_value == sched_policy:
+ parameter = sched_policy
+ elif param_type == 'additional_bw':
+ for bw_component in openolt_pb2.AdditionalBW.keys():
+ if param_value == bw_component:
+ parameter = bw_component
+ except BaseException as e:
+ log.exception(exception=e)
+ return parameter
+
+
+class TechProfileInstance(object):
+ def __init__(self, subscriber_identifier, tech_profile, resource_mgr,
+ intf_id, num_of_tconts=1):
+ if tech_profile is not None:
+ self.subscriber_identifier = subscriber_identifier
+ self.num_of_tconts = num_of_tconts
+ self.num_of_gem_ports = tech_profile.num_gem_ports
+ self.name = tech_profile.name
+ self.profile_type = tech_profile.profile_type
+ self.version = tech_profile.version
+ self.instance_control = tech_profile.instance_control
+
+ # TODO: Fixed num_of_tconts to 1 per TP Instance.
+ # This may change in future
+ assert (num_of_tconts == 1)
+ # Get alloc id and gemport id using resource manager
+ alloc_id = resource_mgr.get_resource_id(intf_id,
+ 'ALLOC_ID',
+ num_of_tconts)
+ gem_ports = resource_mgr.get_resource_id(intf_id,
+ 'GEMPORT_ID',
+ self.num_of_gem_ports)
+
+ gemport_list = list()
+ if isinstance(gem_ports, int):
+ gemport_list.append(gem_ports)
+ elif isinstance(gem_ports, list):
+ for gem in gem_ports:
+ gemport_list.append(gem)
+ else:
+ raise Exception("invalid-type")
+
+ self.us_scheduler = TechProfileInstance.IScheduler(
+ alloc_id, tech_profile.us_scheduler)
+ self.ds_scheduler = TechProfileInstance.IScheduler(
+ alloc_id, tech_profile.ds_scheduler)
+
+ self.upstream_gem_port_attribute_list = list()
+ self.downstream_gem_port_attribute_list = list()
+ for i in range(self.num_of_gem_ports):
+ self.upstream_gem_port_attribute_list.append(
+ TechProfileInstance.IGemPortAttribute(
+ gemport_list[i],
+ tech_profile.upstream_gem_port_attribute_list[
+ i]))
+ self.downstream_gem_port_attribute_list.append(
+ TechProfileInstance.IGemPortAttribute(
+ gemport_list[i],
+ tech_profile.downstream_gem_port_attribute_list[
+ i]))
+
+ class IScheduler(Scheduler):
+ def __init__(self, alloc_id, scheduler):
+ super(TechProfileInstance.IScheduler, self).__init__(
+ scheduler.direction, scheduler.additional_bw,
+ scheduler.priority,
+ scheduler.weight, scheduler.q_sched_policy)
+ self.alloc_id = alloc_id
+
+ class IGemPortAttribute(GemPortAttribute):
+ def __init__(self, gemport_id, gem_port_attribute):
+ super(TechProfileInstance.IGemPortAttribute, self).__init__(
+ gem_port_attribute.pbit_map, gem_port_attribute.discard_config,
+ gem_port_attribute.aes_encryption,
+ gem_port_attribute.scheduling_policy,
+ gem_port_attribute.priority_q, gem_port_attribute.weight,
+ gem_port_attribute.max_q_size,
+ gem_port_attribute.discard_policy)
+ self.gemport_id = gemport_id
+
+ def to_json(self):
+ return json.dumps(self, default=lambda o: o.__dict__,
+ indent=4)
diff --git a/tests/utests/common/test_pon_resource_manager.py b/tests/utests/common/test_pon_resource_manager.py
deleted file mode 100644
index 8a03294..0000000
--- a/tests/utests/common/test_pon_resource_manager.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#
-# Copyright 2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import json
-from unittest import TestCase, main
-
-from bitstring import BitArray
-from common.pon_resource_manager.resource_manager import PONResourceManager
-from mock import Mock
-
-
-class TestResourceManager(TestCase):
- def setUp(self):
- self._rm = PONResourceManager('xgspon', 'default',
- '0001c889ee7189fb', 'consul',
- 'localhost', 8500)
- self.default_resource_range = {
- "onu_id_start": 1,
- "onu_id_end": 127,
- "alloc_id_start": 1024,
- "alloc_id_end": 2816,
- "gemport_id_start": 1024,
- "gemport_id_end": 8960,
- "pon_ports": 16
- }
-
- def tearDown(self):
- self._rm = None
- self.default_resource_range = None
-
- def test_init_pon_resource_ranges(self):
- output = json.dumps(self.default_resource_range).encode('utf-8')
- self._rm._get_olt_model = Mock(return_value='default')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
-
- self._rm.init_resource_ranges_from_kv_store()
- self.assertEqual(self._rm.pon_resource_ranges,
- self.default_resource_range)
-
- self._rm.init_default_pon_resource_ranges()
- self.assertEqual(self._rm.pon_resource_ranges,
- self.default_resource_range)
-
- def test_init_resource_id_pool(self):
- self._rm._kv_store.get_from_kv_store = Mock(return_value=None)
- self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
- status = self._rm.init_resource_id_pool(0, 'ONU_ID', 1, 127)
- self.assertTrue(status)
- status = self._rm.init_resource_id_pool(
- 1, 'ALLOC_ID', 1024, 16383)
- self.assertTrue(status)
- status = self._rm.init_resource_id_pool(
- 2, 'GEMPORT_ID', 1023, 65534)
- self.assertTrue(status)
-
- def test_get_resource_id(self):
- # Get onu id test
- onu_id_resource = self._rm._format_resource(0, 1, 127)
- output = onu_id_resource.encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
- result = self._rm.get_resource_id(0, 'ONU_ID')
- self.assertEqual(result, 1)
-
- # Get alloc id test
- alloc_id_resource = self._rm._format_resource(1, 1024, 16383)
- output = alloc_id_resource.encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- result = self._rm.get_resource_id(1, 'ALLOC_ID', 1)
- self.assertEqual(result[0], 1024)
- result = self._rm.get_resource_id(1, 'ALLOC_ID', 4)
- self.assertEqual(result, [1024, 1025, 1026, 1027])
-
- # Get gemport id test
- gemport_id_resource = self._rm._format_resource(2, 1023, 65534)
- output = gemport_id_resource.encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- result = self._rm.get_resource_id(2, 'GEMPORT_ID', 1)
- self.assertEqual(result[0], 1023)
- result = self._rm.get_resource_id(2, 'GEMPORT_ID', 5)
- self.assertEqual(result, [1023, 1024, 1025, 1026, 1027])
-
- def test_free_resource_id(self):
- # Free onu id test
- self._rm._kv_store.update_to_kv_store = Mock(return_value=True)
- onu_id_resource = eval(self._rm._format_resource(0, 1, 127))
- onu_id_resource['pool'] = BitArray('0b' + onu_id_resource['pool'])
- self._rm._generate_next_id(onu_id_resource)
- onu_id_resource['pool'] = onu_id_resource['pool'].bin
- output = json.dumps(onu_id_resource).encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- result = self._rm.free_resource_id(0, 'ONU_ID', 1)
- self.assertTrue(result)
-
- # Free alloc id test
- alloc_id_resource = eval(self._rm._format_resource(1, 1024, 16383))
- alloc_id_resource['pool'] = BitArray('0b' + alloc_id_resource['pool'])
-
- for num in range(5):
- self._rm._generate_next_id(alloc_id_resource)
-
- alloc_id_resource['pool'] = alloc_id_resource['pool'].bin
- output = json.dumps(alloc_id_resource).encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- result = self._rm.free_resource_id(1, 'ALLOC_ID',
- [1024, 1025, 1026, 1027, 1028])
- self.assertTrue(result)
-
- # Free gemport id test
- gemport_id_resource = eval(self._rm._format_resource(2, 1023, 65534))
- gemport_id_resource['pool'] = BitArray(
- '0b' + gemport_id_resource['pool'])
-
- for num in range(6):
- self._rm._generate_next_id(gemport_id_resource)
-
- gemport_id_resource['pool'] = gemport_id_resource['pool'].bin
- output = json.dumps(gemport_id_resource).encode('utf-8')
- self._rm._kv_store.get_from_kv_store = Mock(return_value=output)
- result = self._rm.free_resource_id(2, 'GEMPORT_ID',
- [1023, 1024, 1025, 1026, 1027, 1028])
- self.assertTrue(result)
-
- def test_clear_resource_id_pool(self):
- self._rm._kv_store.remove_from_kv_store = Mock(return_value=True)
- status = self._rm.clear_resource_id_pool(0, 'ONU_ID')
- self.assertTrue(status)
- self._rm._kv_store.remove_from_kv_store = Mock(return_value=False)
- status = self._rm.clear_resource_id_pool(1, 'ALLOC_ID')
- self.assertFalse(status)
-
-
-if __name__ == '__main__':
- main()
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index 94784ec..c96b27a 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -18,6 +18,8 @@
Broadcom OpenOMCI OLT/ONU adapter handler.
"""
+import json
+import ast
import structlog
from twisted.internet import reactor, task
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
@@ -30,6 +32,8 @@
from common.utils.indexpool import IndexPool
import voltha.core.flow_decomposer as fd
from voltha.registry import registry
+from voltha.core.config.config_backend import ConsulStore
+from voltha.core.config.config_backend import EtcdStore
from voltha.protos import third_party
from voltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
@@ -37,8 +41,9 @@
from voltha.protos.bbf_fiber_gemport_body_pb2 import GemportsConfigData
from voltha.extensions.omci.onu_configuration import OMCCVersion
from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, \
- OnuDeviceEntry, IN_SYNC_KEY
+ OnuDeviceEntry, IN_SYNC_KEY
from voltha.adapters.brcm_openomci_onu.omci.brcm_mib_download_task import BrcmMibDownloadTask
+from voltha.adapters.brcm_openomci_onu.omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
from voltha.adapters.brcm_openomci_onu.omci.brcm_uni_lock_task import BrcmUniLockTask
from voltha.adapters.brcm_openomci_onu.omci.brcm_vlan_filter_task import BrcmVlanFilterTask
from voltha.adapters.brcm_openomci_onu.onu_gem_port import *
@@ -54,7 +59,7 @@
log = structlog.get_logger()
_STARTUP_RETRY_WAIT = 20
-_MAXIMUM_PORT = 128 # UNI ports
+_MAXIMUM_PORT = 128 # UNI ports
class BrcmOpenomciOnuHandler(object):
@@ -83,7 +88,7 @@
self._port_number_pool = IndexPool(_MAXIMUM_PORT, 0)
self._pon = None
- #TODO: probably shouldnt be hardcoded, determine from olt maybe?
+ # TODO: probably shouldnt be hardcoded, determine from olt maybe?
self._pon_port_number = 100
self.logical_device_id = None
@@ -98,6 +103,27 @@
self._connectivity_subscription = None
self._capabilities_subscription = None
+ self.mac_bridge_service_profile_entity_id = 0x201
+ self.gal_enet_profile_entity_id = 0x1
+
+ self._tp_service_specific_task = dict()
+ self._tech_profile_download_done = dict()
+
+ # Initialize KV store client
+ self.args = registry('main').get_args()
+ if self.args.backend == 'etcd':
+ host, port = self.args.etcd.split(':', 1)
+ self.kv_client = EtcdStore(host, port, '.')
+ elif self.args.backend == 'consul':
+ host, port = self.args.consul.split(':', 1)
+ self.kv_client = ConsulStore(host, port, '.')
+ else:
+ self.log.error('Invalid-backend')
+ raise Exception("Invalid-backend-for-kv-store")
+
+ # Handle received ONU event messages
+ reactor.callLater(0, self.handle_onu_events)
+
@property
def enabled(self):
return self._enabled
@@ -160,8 +186,8 @@
self.parent_id = device.parent_id
parent_device = self.adapter_agent.get_device(self.parent_id)
if parent_device.type == 'openolt':
- self.parent_adapter = registry('adapter_loader').\
- get_agent(parent_device.adapter).adapter
+ self.parent_adapter = registry('adapter_loader'). \
+ get_agent(parent_device.adapter).adapter
if self.enabled is not True:
self.log.info('activating-new-onu')
@@ -209,7 +235,6 @@
else:
self.log.info('onu-already-activated')
-
# Called once when the adapter needs to re-create device. usually on vcore restart
def reconcile(self, device):
self.log.debug('function-entry', device=device)
@@ -241,6 +266,19 @@
else:
self.log.info('onu-already-activated')
+ @inlineCallbacks
+ def handle_onu_events(self):
+ event_msg = yield self.event_messages.get()
+ try:
+ if event_msg['event'] == 'download_tech_profile':
+ tp_path = event_msg['event_data']
+ self.load_and_configure_tech_profile(tp_path)
+
+ except Exception as e:
+ self.log.error("exception-handling-onu-event", e=e)
+
+ # Handle next event
+ reactor.callLater(0, self.handle_onu_events)
def _init_pon_state(self, device):
self.log.debug('function-entry', device=device)
@@ -275,7 +313,7 @@
self.adapter_agent.update_logical_port(logical_device_id,
logical_port)
except Exception as e:
- self.log.exception("exception-updating-port",e=e)
+ self.log.exception("exception-updating-port", e=e)
def delete(self, device):
self.log.info('delete-onu', device=device)
@@ -287,6 +325,120 @@
else:
self.log.debug("parent-adapter-not-available")
+ def _create_tconts(self, us_scheduler):
+ alloc_id = us_scheduler['alloc_id']
+ q_sched_policy = us_scheduler['q_sched_policy']
+ self.log.debug('create-tcont', us_scheduler=us_scheduler)
+
+ tcontdict = dict()
+ tcontdict['alloc-id'] = alloc_id
+ tcontdict['q_sched_policy'] = q_sched_policy
+
+ # TODO: Not sure what to do with any of this...
+ tddata = dict()
+ tddata['name'] = 'not-sure-td-profile'
+ tddata['fixed-bandwidth'] = "not-sure-fixed"
+ tddata['assured-bandwidth'] = "not-sure-assured"
+ tddata['maximum-bandwidth'] = "not-sure-max"
+ tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
+
+ td = OnuTrafficDescriptor.create(tddata)
+ tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
+
+ self._pon.add_tcont(tcont)
+
+ self.log.debug('pon-add-tcont', tcont=tcont)
+
+ # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
+ def _create_gemports(self, gem_ports, alloc_id_ref, direction):
+ self.log.debug('create-gemport',
+ gem_ports=gem_ports, direction=direction)
+
+ for gem_port in gem_ports:
+ gemdict = dict()
+ gemdict['gemport_id'] = gem_port['gemport_id']
+ gemdict['direction'] = direction
+ gemdict['alloc_id_ref'] = alloc_id_ref
+ gemdict['encryption'] = gem_port['aes_encryption']
+ gemdict['discard_config'] = dict()
+ gemdict['discard_config']['max_probability'] = \
+ gem_port['discard_config']['max_probability']
+ gemdict['discard_config']['max_threshold'] = \
+ gem_port['discard_config']['max_threshold']
+ gemdict['discard_config']['min_threshold'] = \
+ gem_port['discard_config']['min_threshold']
+ gemdict['discard_policy'] = gem_port['discard_policy']
+ gemdict['max_q_size'] = gem_port['max_q_size']
+ gemdict['pbit_map'] = gem_port['pbit_map']
+ gemdict['priority_q'] = gem_port['priority_q']
+ gemdict['scheduling_policy'] = gem_port['scheduling_policy']
+ gemdict['weight'] = gem_port['weight']
+
+ gem_port = OnuGemPort.create(self, gem_port=gemdict, entity_id=self._pon.next_gem_entity_id)
+
+ self._pon.add_gem_port(gem_port)
+
+ self.log.debug('pon-add-gemport', gem_port=gem_port)
+
+ def _do_tech_profile_configuration(self, tp):
+ num_of_tconts = tp['num_of_tconts']
+ us_scheduler = tp['us_scheduler']
+ alloc_id = us_scheduler['alloc_id']
+ self._create_tconts(us_scheduler)
+ upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
+ self._create_gemports(upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
+ downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
+ self._create_gemports(downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
+
+ def load_and_configure_tech_profile(self, tp_path):
+ self.log.debug("loading-tech-profile-configuration")
+ if tp_path not in self._tech_profile_download_done:
+ self._tech_profile_download_done[tp_path] = False
+
+ if not self._tech_profile_download_done[tp_path]:
+ try:
+ if tp_path in self._tp_service_specific_task:
+ self.log.info("tech-profile-config-already-in-progress",
+ tp_path=tp_path)
+ return
+
+ tp = self.kv_client[tp_path]
+ tp = ast.literal_eval(tp)
+ self.log.debug("tp-instance", tp=tp)
+ self._do_tech_profile_configuration(tp)
+
+ def success(_results):
+ self.log.info("tech-profile-config-done-successfully")
+ device = self.adapter_agent.get_device(self.device_id)
+ device.reason = 'tech-profile-config-download-success'
+ self.adapter_agent.update_device(device)
+ if tp_path in self._tp_service_specific_task:
+ del self._tp_service_specific_task[tp_path]
+ self._tech_profile_download_done[tp_path] = True
+
+ def failure(_reason):
+ self.log.warn('tech-profile-config-failure-retrying',
+ _reason=_reason)
+ device = self.adapter_agent.get_device(self.device_id)
+ device.reason = 'tech-profile-config-download-failure-retrying'
+ self.adapter_agent.update_device(device)
+ if tp_path in self._tp_service_specific_task:
+ del self._tp_service_specific_task[tp_path]
+ self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
+ tp_path)
+
+ self.log.info('downloading-tech-profile-configuration')
+ self._tp_service_specific_task[tp_path] = \
+ BrcmTpServiceSpecificTask(self.omci_agent, self)
+ self._deferred = \
+ self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[tp_path])
+ self._deferred.addCallbacks(success, failure)
+
+ except Exception as e:
+ self.log.exception("error-loading-tech-profile", e=e)
+ else:
+ self.log.info("tech-profile-config-already-done")
+
def update_pm_config(self, device, pm_config):
# TODO: This has not been tested
self.log.info('update_pm_config', pm_config=pm_config)
@@ -296,11 +448,12 @@
# flow decomposition that ultimately comes from onos
def update_flow_table(self, device, flows):
self.log.debug('function-entry', device=device, flows=flows)
+
#
# We need to proxy through the OLT to get to the ONU
# Configuration from here should be using OMCI
#
- #self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+ # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
# no point in pushing omci flows if the device isnt reachable
if device.connect_status != ConnectStatus.REACHABLE or \
@@ -347,52 +500,52 @@
if field.type == fd.ETH_TYPE:
_type = field.eth_type
self.log.debug('field-type-eth-type',
- eth_type=_type)
+ eth_type=_type)
elif field.type == fd.IP_PROTO:
_proto = field.ip_proto
self.log.debug('field-type-ip-proto',
- ip_proto=_proto)
+ ip_proto=_proto)
elif field.type == fd.IN_PORT:
_port = field.port
self.log.debug('field-type-in-port',
- in_port=_port)
+ in_port=_port)
elif field.type == fd.VLAN_VID:
_vlan_vid = field.vlan_vid & 0xfff
self.log.debug('field-type-vlan-vid',
- vlan=_vlan_vid)
+ vlan=_vlan_vid)
elif field.type == fd.VLAN_PCP:
_vlan_pcp = field.vlan_pcp
self.log.debug('field-type-vlan-pcp',
- pcp=_vlan_pcp)
+ pcp=_vlan_pcp)
elif field.type == fd.UDP_DST:
_udp_dst = field.udp_dst
self.log.debug('field-type-udp-dst',
- udp_dst=_udp_dst)
+ udp_dst=_udp_dst)
elif field.type == fd.UDP_SRC:
_udp_src = field.udp_src
self.log.debug('field-type-udp-src',
- udp_src=_udp_src)
+ udp_src=_udp_src)
elif field.type == fd.IPV4_DST:
_ipv4_dst = field.ipv4_dst
self.log.debug('field-type-ipv4-dst',
- ipv4_dst=_ipv4_dst)
+ ipv4_dst=_ipv4_dst)
elif field.type == fd.IPV4_SRC:
_ipv4_src = field.ipv4_src
self.log.debug('field-type-ipv4-src',
- ipv4_dst=_ipv4_src)
+ ipv4_dst=_ipv4_src)
elif field.type == fd.METADATA:
_metadata = field.table_metadata
self.log.debug('field-type-metadata',
- metadata=_metadata)
+ metadata=_metadata)
else:
raise NotImplementedError('field.type={}'.format(
@@ -403,16 +556,16 @@
if action.type == fd.OUTPUT:
_output = action.output.port
self.log.debug('action-type-output',
- output=_output, in_port=_in_port)
+ output=_output, in_port=_in_port)
elif action.type == fd.POP_VLAN:
self.log.debug('action-type-pop-vlan',
- in_port=_in_port)
+ in_port=_in_port)
elif action.type == fd.PUSH_VLAN:
_push_tpid = action.push.ethertype
self.log.debug('action-type-push-vlan',
- push_tpid=_push_tpid, in_port=_in_port)
+ push_tpid=_push_tpid, in_port=_in_port)
if action.push.ethertype != 0x8100:
self.log.error('unhandled-tpid',
ethertype=action.push.ethertype)
@@ -422,7 +575,7 @@
assert (action.set_field.field.oxm_class ==
OFPXMC_OPENFLOW_BASIC)
self.log.debug('action-type-set-field',
- field=_field, in_port=_in_port)
+ field=_field, in_port=_in_port)
if _field.type == fd.VLAN_VID:
_set_vlan_vid = _field.vlan_vid & 0xfff
self.log.debug('set-field-type-vlan-vid', _set_vlan_vid)
@@ -431,7 +584,7 @@
field_type=_field.type)
else:
self.log.error('unsupported-action-type',
- action_type=action.type, in_port=_in_port)
+ action_type=action.type, in_port=_in_port)
# TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
if _type is not None:
@@ -456,6 +609,7 @@
device.reason = 'omci-flows-failed-retrying'
self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
self._add_vlan_filter_task, device, _set_vlan_vid)
+
self.log.info('setting-vlan-tag')
self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, _set_vlan_vid)
self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
@@ -493,6 +647,11 @@
if oper_state == 'down':
self.log.debug('stopping-openomci-statemachine')
reactor.callLater(0, self._onu_omci_device.stop)
+
+ # Let TP download happen again
+ self._tp_service_specific_task.clear()
+ self._tech_profile_download_done.clear()
+
self.disable_ports(onu_device)
onu_device.reason = "stopping-openomci"
onu_device.connect_status = ConnectStatus.UNREACHABLE
@@ -509,35 +668,17 @@
self.log.debug('stopping-openomci-statemachine')
reactor.callLater(0, self._onu_omci_device.stop)
+
+ # Let TP download happen again
+ self._tp_service_specific_task.clear()
+ self._tech_profile_download_done.clear()
+
self.disable_ports(onu_device)
onu_device.reason = "stopping-openomci"
self.adapter_agent.update_device(onu_device)
# TODO: im sure there is more to do here
- # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
- def create_gemport(self, data):
- self.log.debug('create-gemport', data=data)
- gem_portdata = GemportsConfigData()
- gem_portdata.CopyFrom(data)
-
- # TODO: fill in what i have. This needs to be provided from the OLT
- # currently its hardcoded/static
- gemdict = dict()
- gemdict['gemport-id'] = gem_portdata.gemport_id
- gemdict['encryption'] = gem_portdata.aes_indicator
- gemdict['tcont-ref'] = int(gem_portdata.tcont_ref)
- gemdict['name'] = gem_portdata.gemport_id
- gemdict['traffic-class'] = gem_portdata.traffic_class
- gemdict['traffic-class'] = gem_portdata.traffic_class
-
- gem_port = OnuGemPort.create(self, gem_port=gemdict, entity_id=self._pon.next_gem_entity_id)
-
- self._pon.add_gem_port(gem_port)
-
- self.log.debug('pon-add-gemport', gem_port=gem_port)
-
-
# Not currently called. Would be called presumably from the olt handler
def remove_gemport(self, data):
self.log.debug('remove-gemport', data=data)
@@ -548,41 +689,6 @@
self.log.error('device-unreachable')
return
- #TODO: Create a remove task that encompasses this
-
- # Called when there is an olt up indication, providing the tcont id chosen by the olt handler
- def create_tcont(self, tcont_data, traffic_descriptor_data):
- self.log.debug('create-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
- tcontdata = TcontsConfigData()
- tcontdata.CopyFrom(tcont_data)
-
- # TODO: fill in what i have. This needs to be provided from the OLT
- # currently its hardcoded/static
- tcontdict = dict()
- tcontdict['alloc-id'] = tcontdata.alloc_id
- tcontdict['name'] = tcontdata.name
- tcontdict['vont-ani'] = tcontdata.interface_reference
-
- # TODO: Not sure what to do with any of this...
- tddata = dict()
- tddata['name'] = 'not-sure-td-profile'
- tddata['fixed-bandwidth'] = "not-sure-fixed"
- tddata['assured-bandwidth'] = "not-sure-assured"
- tddata['maximum-bandwidth'] = "not-sure-max"
- tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
-
- td = OnuTrafficDescriptor.create(tddata)
- tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
-
- self._pon.add_tcont(tcont)
-
- self.log.debug('pon-add-tcont', tcont=tcont)
-
- if tcontdata.interface_reference is not None:
- self.log.debug('tcont', tcont=tcont.alloc_id)
- else:
- self.log.info('received-null-tcont-data', tcont=tcont.alloc_id)
-
# Not currently called. Would be called presumably from the olt handler
def remove_tcont(self, tcont_data, traffic_descriptor_data):
self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
@@ -608,6 +714,11 @@
# proceed with disable regardless if we could reach the onu. for example onu is unplugged
self.log.debug('stopping-openomci-statemachine')
reactor.callLater(0, self._onu_omci_device.stop)
+
+ # Let TP download happen again
+ self._tp_service_specific_task.clear()
+ self._tech_profile_download_done.clear()
+
self.disable_ports(device)
device.oper_status = OperStatus.UNKNOWN
device.reason = "omci-admin-lock"
@@ -657,7 +768,7 @@
def disable_ports(self, onu_device):
self.log.info('disable-ports', device_id=self.device_id,
- onu_device=onu_device)
+ onu_device=onu_device)
# Disable all ports on that device
self.adapter_agent.disable_all_ports(self.device_id)
@@ -688,7 +799,6 @@
# TODO: move to UniPort
self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
-
# Called just before openomci state machine is started. These listen for events from selected state machines,
# most importantly, mib in sync. Which ultimately leads to downloading the mib
def _subscribe_to_events(self):
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
index 3bb1e78..e597d76 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
@@ -21,6 +21,8 @@
from voltha.extensions.omci.tasks.task import Task
from voltha.extensions.omci.omci_defs import *
from voltha.adapters.brcm_openomci_onu.uni_port import *
+from voltha.adapters.brcm_openomci_onu.pon_port \
+ import BRDCM_DEFAULT_VLAN, TASK_PRIORITY, DEFAULT_TPID, DEFAULT_GEM_PAYLOAD
OP = EntityOperations
RC = ReasonCodes
@@ -54,10 +56,6 @@
Currently, the only service tech profiles expected by v2.0 will be for AT&T
residential data service and DT residential data service.
"""
- task_priority = Task.DEFAULT_PRIORITY + 10
- default_tpid = 0x8100
- default_gem_payload = 48
- BRDCM_DEFAULT_VLAN = 4091
name = "Broadcom MIB Download Example Task"
@@ -75,35 +73,42 @@
super(BrcmMibDownloadTask, self).__init__(BrcmMibDownloadTask.name,
omci_agent,
handler.device_id,
- priority=BrcmMibDownloadTask.task_priority)
+ priority=TASK_PRIORITY)
self._handler = handler
self._onu_device = omci_agent.get_device(handler.device_id)
self._local_deferred = None
# Frame size
- self._max_gem_payload = BrcmMibDownloadTask.default_gem_payload
+ self._max_gem_payload = DEFAULT_GEM_PAYLOAD
# TODO: only using a single UNI/ethernet port
self._uni_port = self._handler.uni_ports[0]
+ self._pon = handler.pon_port
+
# Port numbers
- self._pon_port_num = 3 # TODO why 3. maybe this is the ani port number. look at anis list
+ self._pon_port_num = self._pon.pon_port_num
+ self._input_tpid = DEFAULT_TPID
+ self._output_tpid = DEFAULT_TPID
- self._input_tpid = BrcmMibDownloadTask.default_tpid
- self._output_tpid = BrcmMibDownloadTask.default_tpid
-
- self._vlan_tcis_1 = self.BRDCM_DEFAULT_VLAN
- self._cvid = self.BRDCM_DEFAULT_VLAN
+ self._vlan_tcis_1 = BRDCM_DEFAULT_VLAN
+ self._cvid = BRDCM_DEFAULT_VLAN
self._vlan_config_entity_id = self._vlan_tcis_1
# Entity IDs. IDs with values can probably be most anything for most ONUs,
# IDs set to None are discovered/set
- # TODO: lots of magic numbers
- self._mac_bridge_service_profile_entity_id = 0x201
- self._ieee_mapper_service_profile_entity_id = 0x8001
- self._mac_bridge_port_ani_entity_id = 0x2102 # TODO: can we just use the entity id from the anis list?
- self._gal_enet_profile_entity_id = 0x1
+ self._mac_bridge_service_profile_entity_id = \
+ self._handler.mac_bridge_service_profile_entity_id
+ self._ieee_mapper_service_profile_entity_id = \
+ self._pon.ieee_mapper_service_profile_entity_id
+ self._mac_bridge_port_ani_entity_id = \
+ self._pon.mac_bridge_port_ani_entity_id
+ self._gal_enet_profile_entity_id = \
+ self._handler.gal_enet_profile_entity_id
+
+ self._free_ul_prior_q_entity_ids = set()
+ self._free_dl_prior_q_entity_ids = set()
def cancel_deferred(self):
self.log.debug('function-entry')
@@ -171,53 +176,45 @@
UNI ports for this device. The application of any service flows
and other characteristics are done as needed.
"""
- self.log.debug('function-entry')
- self.log.info('perform-download')
+ try:
+ self.log.debug('function-entry')
+ self.log.info('perform-download')
- device = self._handler.adapter_agent.get_device(self.device_id)
+ device = self._handler.adapter_agent.get_device(self.device_id)
- def resources_available():
- return (len(self._handler.uni_ports) > 0 and
- len(self._handler.pon_port.tconts) and
- len(self._handler.pon_port.gem_ports))
+ if self._handler.enabled and len(self._handler.uni_ports) > 0:
+ device.reason = 'performing-initial-mib-download'
+ self._handler.adapter_agent.update_device(device)
- if self._handler.enabled and resources_available():
- device.reason = 'performing-initial-mib-download'
- self._handler.adapter_agent.update_device(device)
+ try:
+ # Lock the UNI ports to prevent any alarms during initial configuration
+ # of the ONU
+ self.strobe_watchdog()
+ yield self.enable_uni(self._uni_port, True)
- try:
- # Lock the UNI ports to prevent any alarms during initial configuration
- # of the ONU
- self.strobe_watchdog()
- yield self.enable_uni(self._uni_port, True)
+ # Provision the initial bridge configuration
+ yield self.perform_initial_bridge_setup()
- # Provision the initial bridge configuration
- yield self.perform_initial_bridge_setup()
+ # And re-enable the UNIs if needed
+ yield self.enable_uni(self._uni_port, False)
- # And not all the service specific work
- yield self.perform_service_specific_steps()
+ self.deferred.callback('initial-download-success')
- # And re-enable the UNIs if needed
- yield self.enable_uni(self._uni_port, False)
+ except TimeoutError as e:
+ self.deferred.errback(failure.Failure(e))
- self.deferred.callback('initial-download-success')
-
- except TimeoutError as e:
+ else:
+ e = MibResourcesFailure('Required resources are not available',
+ len(self._handler.uni_ports))
self.deferred.errback(failure.Failure(e))
-
- else:
- e = MibResourcesFailure('Required resources are not available',
- tconts=len(self._handler.pon_port.tconts),
- gems=len(self._handler.pon_port.gem_ports),
- unis=len(self._handler.uni_ports))
- self.deferred.errback(failure.Failure(e))
+ except BaseException as e:
+ self.log.debug('@thyy_mib_check:', exception=e)
@inlineCallbacks
def perform_initial_bridge_setup(self):
self.log.debug('function-entry')
omci_cc = self._onu_device.omci_cc
- frame = None
# TODO: too many magic numbers
try:
@@ -251,12 +248,12 @@
# TODO: magic. event if static, assign to a meaningful variable name
attributes = {
'spanning_tree_ind': False,
- 'learning_ind' : True,
- 'priority' : 0x8000,
- 'max_age' : 20 * 256,
- 'hello_time' : 2 * 256,
- 'forward_delay' : 15 * 256,
- 'unknown_mac_address_discard' : True
+ 'learning_ind': True,
+ 'priority': 0x8000,
+ 'max_age': 20 * 256,
+ 'hello_time': 2 * 256,
+ 'forward_delay': 15 * 256,
+ 'unknown_mac_address_discard': True
}
msg = MacBridgeServiceProfileFrame(
self._mac_bridge_service_profile_entity_id,
@@ -303,8 +300,8 @@
msg = MacBridgePortConfigurationDataFrame(
self._mac_bridge_port_ani_entity_id,
bridge_id_pointer=self._mac_bridge_service_profile_entity_id, # Bridge Entity ID
- port_num=self._pon_port_num, # Port ID ##TODO associated with what?
- tp_type=3, # TP Type (IEEE 802.1p mapper service)
+ port_num=self._pon_port_num, # Port ID ##TODO associated with what?
+ tp_type=3, # TP Type (IEEE 802.1p mapper service)
tp_pointer=self._ieee_mapper_service_profile_entity_id # TP ID, 8021p mapper ID
)
frame = msg.create()
@@ -325,7 +322,7 @@
# TODO: magic. make a static variable for forward_op
msg = VlanTaggingFilterDataFrame(
self._mac_bridge_port_ani_entity_id, # Entity ID
- vlan_tcis=[self._vlan_tcis_1], # VLAN IDs
+ vlan_tcis=[self._vlan_tcis_1], # VLAN IDs
forward_operation=0x10
)
frame = msg.create()
@@ -333,7 +330,7 @@
results = yield omci_cc.send(frame)
self.check_status_and_state(results, 'create-vlan-tagging-filter-data')
- ################################################################################
+ ################################################################################
# UNI Specific #
################################################################################
# MAC Bridge Port config
@@ -349,7 +346,6 @@
# TODO: magic. make a static variable for tp_type
# default to PPTP
- tp_type = None
if self._uni_port.type is UniType.VEIP:
tp_type = 11
elif self._uni_port.type is UniType.PPTP:
@@ -358,11 +354,11 @@
tp_type = 1
msg = MacBridgePortConfigurationDataFrame(
- self._uni_port.entity_id, # Entity ID - This is read-only/set-by-create !!!
+ self._uni_port.entity_id, # Entity ID - This is read-only/set-by-create !!!
bridge_id_pointer=self._mac_bridge_service_profile_entity_id, # Bridge Entity ID
- port_num=self._uni_port.mac_bridge_port_num, # Port ID
- tp_type=tp_type, # PPTP Ethernet or VEIP UNI
- tp_pointer=self._uni_port.entity_id # Ethernet UNI ID
+ port_num=self._uni_port.mac_bridge_port_num, # Port ID
+ tp_type=tp_type, # PPTP Ethernet or VEIP UNI
+ tp_pointer=self._uni_port.entity_id # Ethernet UNI ID
)
frame = msg.create()
self.log.debug('openomci-msg', msg=msg)
@@ -418,7 +414,6 @@
# TODO: Need to restore on failure. Need to check status/results
yield tcont.add_to_hardware(omci_cc, free_entity_id)
-
################################################################################
# GEMS (GemPortNetworkCtp and GemInterworkingTp)
#
@@ -472,7 +467,7 @@
msg = Ieee8021pMapperServiceProfileFrame(
self._ieee_mapper_service_profile_entity_id, # 802.1p mapper Service Mapper Profile ID
- interwork_tp_pointers=gem_entity_ids # Interworking TP IDs
+ interwork_tp_pointers=gem_entity_ids # Interworking TP IDs
)
frame = msg.set()
self.log.debug('openomci-msg', msg=msg)
@@ -492,7 +487,6 @@
# TODO: magic. static variable for assoc_type
# default to PPTP
- association_type = None
if self._uni_port.type is UniType.VEIP:
association_type = 10
elif self._uni_port.type is UniType.PPTP:
@@ -501,8 +495,8 @@
association_type = 2
attributes = dict(
- association_type=association_type, # Assoc Type, PPTP/VEIP Ethernet UNI
- associated_me_pointer=self._uni_port.entity_id, # Assoc ME, PPTP/VEIP Entity Id
+ association_type=association_type, # Assoc Type, PPTP/VEIP Ethernet UNI
+ associated_me_pointer=self._uni_port.entity_id, # Assoc ME, PPTP/VEIP Entity Id
# See VOL-1311 - Need to set table during create to avoid exception
# trying to read back table during post-create-read-missing-attributes
@@ -525,7 +519,7 @@
attributes = dict(
# Specifies the TPIDs in use and that operations in the downstream direction are
# inverse to the operations in the upstream direction
- input_tpid=self._input_tpid, # input TPID
+ input_tpid=self._input_tpid, # input TPID
output_tpid=self._output_tpid, # output TPID
downstream_mode=0, # inverse of upstream
)
@@ -552,8 +546,8 @@
received_frame_vlan_tagging_operation_table=
VlanTaggingOperation(
filter_outer_priority=15, # This entry is not a double-tag rule
- filter_outer_vid=4096, # Do not filter on the outer VID value
- filter_outer_tpid_de=0, # Do not filter on the outer TPID field
+ filter_outer_vid=4096, # Do not filter on the outer VID value
+ filter_outer_tpid_de=0, # Do not filter on the outer TPID field
filter_inner_priority=15,
filter_inner_vid=4096,
@@ -614,20 +608,20 @@
try:
state = 1 if force_lock or not uni_port.enabled else 0
msg = None
- if (uni_port.type is UniType.PPTP):
+ if uni_port.type is UniType.PPTP:
msg = PptpEthernetUniFrame(uni_port.entity_id,
attributes=dict(administrative_state=state))
- elif (uni_port.type is UniType.VEIP):
+ elif uni_port.type is UniType.VEIP:
msg = VeipUniFrame(uni_port.entity_id,
attributes=dict(administrative_state=state))
else:
self.log.warn('unknown-uni-type', uni_port=uni_port)
if msg:
- frame = msg.set()
- self.log.debug('openomci-msg', msg=msg)
- results = yield omci_cc.send(frame)
- self.check_status_and_state(results, 'set-pptp-ethernet-uni-lock-restore')
+ frame = msg.set()
+ self.log.debug('openomci-msg', msg=msg)
+ results = yield omci_cc.send(frame)
+ self.check_status_and_state(results, 'set-pptp-ethernet-uni-lock-restore')
except TimeoutError as e:
self.log.warn('rx-timeout', e=e)
@@ -638,6 +632,3 @@
raise
returnValue(None)
-
-
-
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py
new file mode 100644
index 0000000..2425693
--- /dev/null
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py
@@ -0,0 +1,460 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from common.frameio.frameio import hexify
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, TimeoutError, failure
+from voltha.extensions.omci.omci_me import *
+from voltha.extensions.omci.tasks.task import Task
+from voltha.extensions.omci.omci_defs import *
+from voltha.adapters.brcm_openomci_onu.uni_port import *
+from voltha.adapters.brcm_openomci_onu.pon_port \
+ import BRDCM_DEFAULT_VLAN, TASK_PRIORITY, DEFAULT_TPID, DEFAULT_GEM_PAYLOAD
+
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class TechProfileDownloadFailure(Exception):
+ """
+ This error is raised by default when the download fails
+ """
+
+
+class TechProfileResourcesFailure(Exception):
+ """
+ This error is raised by when one or more resources required is not available
+ """
+
+
+class BrcmTpServiceSpecificTask(Task):
+ """
+ OpenOMCI Tech-Profile Download Task
+
+ """
+
+ name = "Broadcom Tech-Profile Download Task"
+
+ def __init__(self, omci_agent, handler):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+
+ self.log = structlog.get_logger(device_id=handler.device_id)
+ self.log.debug('function-entry')
+
+ super(BrcmTpServiceSpecificTask, self).__init__(BrcmTpServiceSpecificTask.name,
+ omci_agent,
+ handler.device_id,
+ priority=TASK_PRIORITY)
+ self._handler = handler
+ self._onu_device = omci_agent.get_device(handler.device_id)
+ self._local_deferred = None
+
+ # Frame size
+ self._max_gem_payload = DEFAULT_GEM_PAYLOAD
+
+ # TODO: only using a single UNI/ethernet port
+ self._uni_port = self._handler.uni_ports[0]
+ self._uni_port_num = self._uni_port.mac_bridge_port_num
+ self._ethernet_uni_entity_id = self._uni_port.entity_id
+
+ self._pon = handler.pon_port
+
+ # Port numbers
+ self._input_tpid = DEFAULT_TPID
+ self._output_tpid = DEFAULT_TPID
+
+ self._vlan_tcis_1 = BRDCM_DEFAULT_VLAN
+ self._cvid = BRDCM_DEFAULT_VLAN
+ self._vlan_config_entity_id = self._vlan_tcis_1
+
+ # Entity IDs. IDs with values can probably be most anything for most ONUs,
+ # IDs set to None are discovered/set
+
+ self._mac_bridge_service_profile_entity_id = \
+ self._handler.mac_bridge_service_profile_entity_id
+ self._ieee_mapper_service_profile_entity_id = \
+ self._pon.ieee_mapper_service_profile_entity_id
+ self._mac_bridge_port_ani_entity_id = \
+ self._pon.mac_bridge_port_ani_entity_id
+ self._gal_enet_profile_entity_id = \
+ self._handler.gal_enet_profile_entity_id
+
+ self.tcont_me_to_queue_map = dict()
+ self.uni_port_to_queue_map = dict()
+
+ def cancel_deferred(self):
+ self.log.debug('function-entry')
+ super(BrcmTpServiceSpecificTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the Tech-Profile Download
+ """
+ self.log.debug('function-entry')
+ super(BrcmTpServiceSpecificTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_service_specific_steps)
+
+ def stop(self):
+ """
+ Shutdown Tech-Profile download tasks
+ """
+ self.log.debug('function-entry')
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(BrcmTpServiceSpecificTask, self).stop()
+
+ def check_status_and_state(self, results, operation=''):
+ """
+ Check the results of an OMCI response. An exception is thrown
+ if the task was cancelled or an error was detected.
+
+ :param results: (OmciFrame) OMCI Response frame
+ :param operation: (str) what operation was being performed
+ :return: True if successful, False if the entity existed (already created)
+ """
+ self.log.debug('function-entry')
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
+ failed_mask = omci_msg.get('failed_attributes_mask', 'n/a')
+ unsupported_mask = omci_msg.get('unsupported_attributes_mask', 'n/a')
+
+ self.log.debug("OMCI Result:", operation, omci_msg=omci_msg, status=status, error_mask=error_mask,
+ failed_mask=failed_mask, unsupported_mask=unsupported_mask)
+
+ if status == RC.Success:
+ self.strobe_watchdog()
+ return True
+
+ elif status == RC.InstanceExists:
+ return False
+
+ raise TechProfileDownloadFailure(
+ '{} failed with a status of {}, error_mask: {}, failed_mask: {}, unsupported_mask: {}'
+ .format(operation, status, error_mask, failed_mask, unsupported_mask))
+
+ @inlineCallbacks
+ def perform_service_specific_steps(self):
+ self.log.debug('function-entry')
+
+ omci_cc = self._onu_device.omci_cc
+
+ try:
+ ################################################################################
+ # TCONTS
+ #
+ # EntityID will be referenced by:
+ # - GemPortNetworkCtp
+ # References:
+ # - ONU created TCONT (created on ONU startup)
+
+ tcont_idents = self._onu_device.query_mib(Tcont.class_id)
+ self.log.debug('tcont-idents', tcont_idents=tcont_idents)
+
+ for tcont in self._handler.pon_port.tconts.itervalues():
+ free_entity_id = None
+ for k, v in tcont_idents.items():
+ alloc_check = v.get('attributes', {}).get('alloc_id', 0)
+ # Some onu report both to indicate an available tcont
+ if alloc_check == 0xFF or alloc_check == 0xFFFF:
+ free_entity_id = k
+ break
+ else:
+ free_entity_id = None
+
+ self.log.debug('tcont-loop', free_entity_id=free_entity_id)
+
+ if free_entity_id is None:
+ self.log.error('no-available-tconts')
+ break
+
+ # TODO: Need to restore on failure. Need to check status/results
+ results = yield tcont.add_to_hardware(omci_cc, free_entity_id)
+ self.check_status_and_state(results, 'create-tcont')
+
+ ################################################################################
+ # GEMS (GemPortNetworkCtp and GemInterworkingTp)
+ #
+ # For both of these MEs, the entity_id is the GEM Port ID. The entity id of the
+ # GemInterworkingTp ME could be different since it has an attribute to specify
+ # the GemPortNetworkCtp entity id.
+ #
+ # for the GemPortNetworkCtp ME
+ #
+ # GemPortNetworkCtp
+ # EntityID will be referenced by:
+ # - GemInterworkingTp
+ # References:
+ # - TCONT
+ # - Hardcoded upstream TM Entity ID
+ # - (Possibly in Future) Upstream Traffic descriptor profile pointer
+ #
+ # GemInterworkingTp
+ # EntityID will be referenced by:
+ # - Ieee8021pMapperServiceProfile
+ # References:
+ # - GemPortNetworkCtp
+ # - Ieee8021pMapperServiceProfile
+ # - GalEthernetProfile
+ #
+
+ onu_g = self._onu_device.query_mib(OntG.class_id)
+ # If the traffic management option attribute in the ONU-G ME is 0
+ # (priority controlled) or 2 (priority and rate controlled), this
+ # pointer specifies the priority queue ME serving this GEM port
+ # network CTP. If the traffic management option attribute is 1
+ # (rate controlled), this attribute redundantly points to the
+ # T-CONT serving this GEM port network CTP.
+ traffic_mgmt_opt = \
+ onu_g.get('attributes', {}).get('traffic_management_options', 0)
+ self.log.debug("traffic-mgmt-option", traffic_mgmt_opt=traffic_mgmt_opt)
+
+ prior_q = self._onu_device.query_mib(PriorityQueueG.class_id)
+ for k, v in prior_q.items():
+ self.log.debug("prior-q", k=k, v=v)
+
+ try:
+ _ = iter(v)
+ except TypeError:
+ continue
+
+ if 'instance_id' in v:
+ related_port = v['attributes']['related_port']
+ if v['instance_id'] & 0b1000000000000000:
+ tcont_me = (related_port & 0xffff0000) >> 16
+ if tcont_me not in self.tcont_me_to_queue_map:
+ self.log.debug("prior-q-related-port-and-tcont-me",
+ related_port=related_port,
+ tcont_me=tcont_me)
+ self.tcont_me_to_queue_map[tcont_me] = list()
+
+ self.tcont_me_to_queue_map[tcont_me].append(k)
+ else:
+ uni_port = (related_port & 0xffff0000) >> 16
+ if uni_port not in self.uni_port_to_queue_map:
+ self.log.debug("prior-q-related-port-and-uni-port-me",
+ related_port=related_port,
+ uni_port_me=uni_port)
+ self.uni_port_to_queue_map[uni_port] = list()
+
+ self.uni_port_to_queue_map[uni_port].append(k)
+
+
+ self.log.debug("ul-prior-q", ul_prior_q=self.tcont_me_to_queue_map)
+ self.log.debug("dl-prior-q", dl_prior_q=self.uni_port_to_queue_map)
+
+ for gem_port in self._handler.pon_port.gem_ports.itervalues():
+ # TODO: Traffic descriptor will be available after meter bands are available
+ tcont = gem_port.tcont
+ if tcont is None:
+ self.log.error('unknown-tcont-reference', gem_id=gem_port.gem_id)
+ continue
+
+ ul_prior_q_entity_id = None
+ dl_prior_q_entity_id = None
+ if gem_port.direction == "upstream" or \
+ gem_port.direction == "bi-directional":
+
+ # Sort the priority queue list in order of priority.
+ # 0 is highest priority and 0x0fff is lowest.
+ self.tcont_me_to_queue_map[tcont.entity_id].sort()
+ self.uni_port_to_queue_map[self._uni_port.entity_id].sort()
+ # Get the priority queue associated with p-bit that is
+ # mapped to the gem port.
+ # p-bit-7 is highest priority and p-bit-0 is lowest
+ # Gem port associated with p-bit-7 should be mapped to
+ # highest priority queue and gem port associated with p-bit-0
+ # should be mapped to lowest priority queue.
+ # The self.tcont_me_to_queue_map and self.uni_port_to_queue_map
+ # have priority queue entities ordered in descending order
+ # of priority
+ for i, p in enumerate(gem_port.pbit_map):
+ if p == '1':
+ ul_prior_q_entity_id = \
+ self.tcont_me_to_queue_map[tcont.entity_id][i]
+ dl_prior_q_entity_id = \
+ self.uni_port_to_queue_map[self._uni_port.entity_id][i]
+ break
+
+ assert ul_prior_q_entity_id is not None and \
+ dl_prior_q_entity_id is not None
+
+ # TODO: Need to restore on failure. Need to check status/results
+ results = yield gem_port.add_to_hardware(omci_cc,
+ tcont.entity_id,
+ self._ieee_mapper_service_profile_entity_id,
+ self._gal_enet_profile_entity_id,
+ ul_prior_q_entity_id, dl_prior_q_entity_id)
+ self.check_status_and_state(results, 'create-gem-port')
+ elif gem_port.direction == "downstream":
+ # Downstream is inverse of upstream
+ # TODO: could also be a case of multicast. Not supported for now
+ pass
+
+ ################################################################################
+ # Update the IEEE 802.1p Mapper Service Profile config
+ #
+ # EntityID was created prior to this call. This is a set
+ #
+ # References:
+ # - Gem Interwork TPs are set here
+ #
+
+ gem_entity_ids = [OmciNullPointer] * 8
+ for gem_port in self._handler.pon_port.gem_ports.itervalues():
+ if gem_port.direction == "upstream" or \
+ gem_port.direction == "bi-directional":
+ for i, p in enumerate(gem_port.pbit_map):
+ if p == '1':
+ gem_entity_ids[i] = gem_port.entity_id
+ elif gem_port.direction == "downstream":
+ # Downstream gem port p-bit mapper is inverse of upstream
+ # TODO: Could also be a case of multicast. Not supported for now
+ pass
+
+ msg = Ieee8021pMapperServiceProfileFrame(
+ self._ieee_mapper_service_profile_entity_id, # 802.1p mapper Service Mapper Profile ID
+ interwork_tp_pointers=gem_entity_ids # Interworking TP IDs
+ )
+ frame = msg.set()
+ self.log.debug('openomci-msg', msg=msg)
+ results = yield omci_cc.send(frame)
+ self.check_status_and_state(results, 'set-8021p-mapper-service-profile-ul')
+
+ ################################################################################
+ # Create Extended VLAN Tagging Operation config (PON-side)
+ #
+ # EntityID relates to the VLAN TCIS
+ # References:
+ # - VLAN TCIS from previously created VLAN Tagging filter data
+ # - PPTP Ethernet or VEIP UNI
+ #
+
+ # TODO: do this for all uni/ports...
+ # TODO: magic. static variable for assoc_type
+
+ # default to PPTP
+ if self._uni_port.type is UniType.VEIP:
+ association_type = 10
+ elif self._uni_port.type is UniType.PPTP:
+ association_type = 2
+ else:
+ association_type = 2
+
+ attributes = dict(
+ association_type=association_type, # Assoc Type, PPTP/VEIP Ethernet UNI
+ associated_me_pointer=self._uni_port.entity_id, # Assoc ME, PPTP/VEIP Entity Id
+
+ # See VOL-1311 - Need to set table during create to avoid exception
+ # trying to read back table during post-create-read-missing-attributes
+ # But, because this is a R/W attribute. Some ONU may not accept the
+ # value during create. It is repeated again in a set below.
+ input_tpid=self._input_tpid, # input TPID
+ output_tpid=self._output_tpid, # output TPID
+ )
+
+ msg = ExtendedVlanTaggingOperationConfigurationDataFrame(
+ self._mac_bridge_service_profile_entity_id, # Bridge Entity ID
+ attributes=attributes
+ )
+
+ frame = msg.create()
+ self.log.debug('openomci-msg', msg=msg)
+ results = yield omci_cc.send(frame)
+ self.check_status_and_state(results, 'create-extended-vlan-tagging-operation-configuration-data')
+
+ attributes = dict(
+ # Specifies the TPIDs in use and that operations in the downstream direction are
+ # inverse to the operations in the upstream direction
+ input_tpid=self._input_tpid, # input TPID
+ output_tpid=self._output_tpid, # output TPID
+ downstream_mode=0, # inverse of upstream
+ )
+
+ msg = ExtendedVlanTaggingOperationConfigurationDataFrame(
+ self._mac_bridge_service_profile_entity_id, # Bridge Entity ID
+ attributes=attributes
+ )
+
+ frame = msg.set()
+ self.log.debug('openomci-msg', msg=msg)
+ results = yield omci_cc.send(frame)
+ self.check_status_and_state(results, 'set-extended-vlan-tagging-operation-configuration-data')
+
+ attributes = dict(
+ # parameters: Entity Id ( 0x900), Filter Inner Vlan Id(0x1000-4096,do not filter on Inner vid,
+ # Treatment Inner Vlan Id : 2
+
+ # Update uni side extended vlan filter
+ # filter for untagged
+ # probably for eapol
+ # TODO: lots of magic
+ # TODO: magic 0x1000 / 4096?
+ received_frame_vlan_tagging_operation_table=
+ VlanTaggingOperation(
+ filter_outer_priority=15, # This entry is not a double-tag rule
+ filter_outer_vid=4096, # Do not filter on the outer VID value
+ filter_outer_tpid_de=0, # Do not filter on the outer TPID field
+
+ filter_inner_priority=15,
+ filter_inner_vid=4096,
+ filter_inner_tpid_de=0,
+ filter_ether_type=0,
+
+ treatment_tags_to_remove=0,
+ treatment_outer_priority=15,
+ treatment_outer_vid=0,
+ treatment_outer_tpid_de=0,
+
+ treatment_inner_priority=0,
+ treatment_inner_vid=self._cvid,
+ treatment_inner_tpid_de=4,
+ )
+ )
+
+ msg = ExtendedVlanTaggingOperationConfigurationDataFrame(
+ self._mac_bridge_service_profile_entity_id, # Bridge Entity ID
+ attributes=attributes
+ )
+
+ frame = msg.set()
+ self.log.debug('openomci-msg', msg=msg)
+ results = yield omci_cc.send(frame)
+ self.check_status_and_state(results, 'set-extended-vlan-tagging-operation-configuration-data-table')
+
+ self.deferred.callback("tech-profile-download-success")
+
+ except TimeoutError as e:
+ self.log.warn('rx-timeout-2', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('omci-setup-2', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
index c8b3a62..9d22723 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
@@ -44,10 +44,10 @@
:param priority: (int) OpenOMCI Task priority (0..255) 255 is the highest
"""
super(BrcmVlanFilterTask, self).__init__(BrcmVlanFilterTask.name,
- omci_agent,
- device_id,
- priority=priority,
- exclusive=False)
+ omci_agent,
+ device_id,
+ priority=priority,
+ exclusive=False)
self._device = omci_agent.get_device(device_id)
self._set_vlan_id = set_vlan_id
self._results = None
@@ -82,8 +82,7 @@
# TODO: parameterize these from the handler, or objects in the handler
# TODO: make this a member of the onu gem port or the uni port
_mac_bridge_service_profile_entity_id = 0x201
- _mac_bridge_port_ani_entity_id = 0x2102 # TODO: can we just use the entity id from the anis list?
-
+ _mac_bridge_port_ani_entity_id = 0x2102 # TODO: can we just use the entity id from the anis list?
# Delete bridge ani side vlan filter
msg = VlanTaggingFilterDataFrame(_mac_bridge_port_ani_entity_id)
frame = msg.delete()
@@ -95,7 +94,7 @@
# Re-Create bridge ani side vlan filter
msg = VlanTaggingFilterDataFrame(
_mac_bridge_port_ani_entity_id, # Entity ID
- vlan_tcis=[self._set_vlan_id], # VLAN IDs
+ vlan_tcis=[self._set_vlan_id], # VLAN IDs
forward_operation=0x10
)
frame = msg.create()
@@ -104,6 +103,8 @@
results = yield self._device.omci_cc.send(frame)
self.check_status_and_state(results, 'flow-create-vlan-tagging-filter-data')
+ # Re-Create bridge ani side vlan filter
+
# Update uni side extended vlan filter
# filter for untagged
# probably for eapol
@@ -130,7 +131,6 @@
treatment_inner_tpid_de=4
)
)
-
msg = ExtendedVlanTaggingOperationConfigurationDataFrame(
_mac_bridge_service_profile_entity_id, # Bridge Entity ID
attributes=attributes # See above
diff --git a/voltha/adapters/brcm_openomci_onu/onu_gem_port.py b/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
index ae729da..a7403ed 100644
--- a/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
+++ b/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
@@ -14,19 +14,28 @@
# limitations under the License.
import structlog
-from common.frameio.frameio import hexify
from twisted.internet.defer import inlineCallbacks, returnValue
from voltha.extensions.omci.omci_me import *
from voltha.extensions.omci.omci_defs import *
RC = ReasonCodes
+
class OnuGemPort(object):
"""
Broadcom ONU specific implementation
"""
+
def __init__(self, gem_id, alloc_id, entity_id,
+ direction="BIDIRECTIONAL",
encryption=False,
+ discard_config=None,
+ discard_policy=None,
+ max_q_size="auto",
+ pbit_map="0b00000011",
+ priority_q=3,
+ scheduling_policy="WRR",
+ weight=8,
omci_transport=False,
multicast=False,
tcont_ref=None,
@@ -45,12 +54,28 @@
self.tcont_ref = tcont_ref
self.intf_ref = intf_ref
self.traffic_class = traffic_class
+ self._direction = None
self._encryption = encryption
+ self._discard_config = None
+ self._discard_policy = None
+ self._max_q_size = None
+ self._pbit_map = None
+ self._scheduling_policy = None
self._omci_transport = omci_transport
self.multicast = multicast
self.untagged = untagged
self._handler = handler
+ self.direction = direction
+ self.encryption = encryption
+ self.discard_config = discard_config
+ self.discard_policy = discard_policy
+ self.max_q_size = max_q_size
+ self.pbit_map = pbit_map
+ self.priority_q = priority_q
+ self.scheduling_policy = scheduling_policy
+ self.weight = weight
+
self._pon_id = None
self._onu_id = None
self._entity_id = entity_id
@@ -61,11 +86,9 @@
self.tx_packets = 0
self.tx_bytes = 0
-
def __str__(self):
- return "GemPort: {}, alloc-id: {}, gem-id: {}".format(self.name,
- self.alloc_id,
- self.gem_id)
+ return "alloc-id: {}, gem-id: {}".format(self.alloc_id, self.gem_id)
+
@property
def pon_id(self):
self.log.debug('function-entry')
@@ -91,19 +114,33 @@
@property
def alloc_id(self):
self.log.debug('function-entry')
- if self._alloc_id is None and self._handler is not None:
- try:
- self._alloc_id = self._handler.pon_port.tconts.get(self.tcont_ref).get('alloc-id')
-
- except Exception:
- pass
-
return self._alloc_id
@property
+ def direction(self):
+ self.log.debug('function-entry')
+ return self._direction
+
+ @direction.setter
+ def direction(self, direction):
+ self.log.debug('function-entry')
+ # GEM Port CTP are configured separately in UPSTREAM and DOWNSTREAM.
+ # BIDIRECTIONAL is not supported.
+ assert direction == "UPSTREAM" or direction == "DOWNSTREAM" or \
+ direction == "BIDIRECTIONAL", "invalid-direction"
+
+ # OMCI framework expects string in lower-case. Tech-Profile sends in upper-case.
+ if direction == "UPSTREAM":
+ self._direction = "upstream"
+ elif direction == "DOWNSTREAM":
+ self._direction = "downstream"
+ elif direction == "BIDIRECTIONAL":
+ self._direction = "bi-directional"
+
+ @property
def tcont(self):
self.log.debug('function-entry')
- tcont_item = self._handler.pon_port.tconts.get(self.tcont_ref)
+ tcont_item = self._handler.pon_port.tconts.get(self.alloc_id)
return tcont_item
@property
@@ -133,23 +170,102 @@
@encryption.setter
def encryption(self, value):
self.log.debug('function-entry')
+ # FIXME The encryption should come as boolean by default
+ value = eval(value)
assert isinstance(value, bool), 'encryption is a boolean'
if self._encryption != value:
self._encryption = value
+ @property
+ def discard_config(self):
+ self.log.debug('function-entry')
+ return self._discard_config
+
+ @discard_config.setter
+ def discard_config(self, discard_config):
+ self.log.debug('function-entry')
+ assert isinstance(discard_config, dict), "discard_config not dict"
+ assert 'max_probability' in discard_config, "max_probability missing"
+ assert 'max_threshold' in discard_config, "max_threshold missing"
+ assert 'min_threshold' in discard_config, "min_threshold missing"
+ self._discard_config = discard_config
+
+ @property
+ def discard_policy(self):
+ self.log.debug('function-entry')
+ return self._discard_policy
+
+ @discard_policy.setter
+ def discard_policy(self, discard_policy):
+ self.log.debug('function-entry')
+ dp = ("TailDrop", "WTailDrop", "RED", "WRED")
+ assert (isinstance(discard_policy, str))
+ assert (discard_policy in dp)
+ self._discard_policy = discard_policy
+
+ @property
+ def max_q_size(self):
+ self.log.debug('function-entry')
+ return self._max_q_size
+
+ @max_q_size.setter
+ def max_q_size(self, max_q_size):
+ self.log.debug('function-entry')
+ if isinstance(max_q_size, str):
+ assert (max_q_size == "auto")
+ else:
+ assert (isinstance(max_q_size, int))
+
+ self._max_q_size = max_q_size
+
+ @property
+ def pbit_map(self):
+ self.log.debug('function-entry')
+ return self._pbit_map
+
+ @pbit_map.setter
+ def pbit_map(self, pbit_map):
+ self.log.debug('function-entry')
+ assert (isinstance(pbit_map, str))
+ assert (len(pbit_map[2:]) == 8) # Example format of pbit_map: "0b00000101"
+ try:
+ _ = int(pbit_map[2], 2)
+ except ValueError:
+ raise Exception("pbit_map-not-binary-string-{}".format(pbit_map))
+
+ # remove '0b'
+ self._pbit_map = pbit_map[2:]
+
+ @property
+ def scheduling_policy(self):
+ self.log.debug('function-entry')
+ return self._scheduling_policy
+
+ @scheduling_policy.setter
+ def scheduling_policy(self, scheduling_policy):
+ self.log.debug('function-entry')
+ sp = ("WRR", "StrictPriority")
+ assert (isinstance(scheduling_policy, str))
+ assert (scheduling_policy in sp)
+ self._scheduling_policy = scheduling_policy
+
@staticmethod
def create(handler, gem_port, entity_id):
- log = structlog.get_logger(device_id=handler.device_id, gem_port=gem_port, entity_id=entity_id)
log.debug('function-entry', gem_port=gem_port, entity_id=entity_id)
- return OnuGemPort(gem_port['gemport-id'],
- None,
- entity_id,
+ return OnuGemPort(gem_id=gem_port['gemport_id'],
+ alloc_id=gem_port['alloc_id_ref'],
+ entity_id=entity_id,
+ direction=gem_port['direction'],
encryption=gem_port['encryption'], # aes_indicator,
- tcont_ref=gem_port['tcont-ref'],
- name=gem_port['name'],
- traffic_class=gem_port['traffic-class'],
+ discard_config=gem_port['discard_config'],
+ discard_policy=gem_port['discard_policy'],
+ max_q_size=gem_port['max_q_size'],
+ pbit_map=gem_port['pbit_map'],
+ priority_q=gem_port['priority_q'],
+ scheduling_policy=gem_port['scheduling_policy'],
+ weight=gem_port['weight'],
handler=handler,
untagged=False)
@@ -157,27 +273,32 @@
def add_to_hardware(self, omci,
tcont_entity_id,
ieee_mapper_service_profile_entity_id,
- gal_enet_profile_entity_id):
+ gal_enet_profile_entity_id,
+ ul_prior_q_entity_id,
+ dl_prior_q_entity_id):
self.log.debug('function-entry')
self.log.debug('add-to-hardware', gem_id=self.gem_id,
tcont_entity_id=tcont_entity_id,
ieee_mapper_service_profile_entity_id=ieee_mapper_service_profile_entity_id,
- gal_enet_profile_entity_id=gal_enet_profile_entity_id)
+ gal_enet_profile_entity_id=gal_enet_profile_entity_id,
+ ul_prior_q_entity_id=ul_prior_q_entity_id,
+ dl_prior_q_entity_id=dl_prior_q_entity_id)
try:
direction = "downstream" if self.multicast else "bi-directional"
assert not self.multicast, 'MCAST is not supported yet'
# TODO: magic numbers here
+ attributes = dict()
+ attributes['priority_queue_pointer_downstream'] = dl_prior_q_entity_id
msg = GemPortNetworkCtpFrame(
- self.entity_id, # same entity id as GEM port
- port_id=self.gem_id,
- tcont_id=tcont_entity_id,
- direction=direction,
- # TODO: This points to the Priority Queue ME. Class #277. Use whats discovered in relation to tcont
- upstream_tm=0x8001
- #upstream_tm=0x100
+ self.entity_id, # same entity id as GEM port
+ port_id=self.gem_id,
+ tcont_id=tcont_entity_id,
+ direction=direction,
+ upstream_tm=ul_prior_q_entity_id,
+ attributes=attributes
)
frame = msg.create()
self.log.debug('openomci-msg', msg=msg)
@@ -191,9 +312,9 @@
try:
# TODO: magic numbers here
msg = GemInterworkingTpFrame(
- self.entity_id, # same entity id as GEM port
+ self.entity_id, # same entity id as GEM port
gem_port_network_ctp_pointer=self.entity_id,
- interworking_option=5, # IEEE 802.1
+ interworking_option=5, # IEEE 802.1
service_profile_pointer=ieee_mapper_service_profile_entity_id,
interworking_tp_pointer=0x0,
pptp_counter=1,
@@ -213,7 +334,7 @@
@inlineCallbacks
def remove_from_hardware(self, omci):
self.log.debug('function-entry', omci=omci)
- self.log.debug('remove-from-hardware', gem_id=self.gem_id)
+ self.log.debug('remove-from-hardware', gem_id=self.gem_id)
try:
msg = GemInterworkingTpFrame(self.entity_id)
@@ -237,7 +358,6 @@
returnValue(results)
-
def check_status_and_state(self, results, operation=''):
self.log.debug('function-entry')
omci_msg = results.fields['omci_message'].fields
@@ -254,4 +374,3 @@
elif status == RC.InstanceExists:
return False
-
diff --git a/voltha/adapters/brcm_openomci_onu/onu_tcont.py b/voltha/adapters/brcm_openomci_onu/onu_tcont.py
index b353a7b..c1f5a89 100644
--- a/voltha/adapters/brcm_openomci_onu/onu_tcont.py
+++ b/voltha/adapters/brcm_openomci_onu/onu_tcont.py
@@ -26,16 +26,15 @@
"""
Broadcom ONU specific implementation
"""
- def __init__(self, handler, alloc_id, traffic_descriptor,
- name=None, vont_ani=None):
+ def __init__(self, handler, alloc_id, q_sched_policy, traffic_descriptor):
self.log = structlog.get_logger(device_id=handler.device_id, alloc_id=alloc_id)
self.log.debug('function-entry')
self.alloc_id = alloc_id
+ self._q_sched_policy = 0
+ self.q_sched_policy = q_sched_policy
self.traffic_descriptor = traffic_descriptor
- self.name = name
- self.vont_ani = vont_ani # (string) reference
self._handler = handler
self._entity_id = None
@@ -45,6 +44,20 @@
self.log.debug('function-entry')
return self._entity_id
+ @property
+ def q_sched_policy(self):
+ self.log.debug('function-entry')
+ return self._q_sched_policy
+
+
+ @q_sched_policy.setter
+ def q_sched_policy(self, q_sched_policy):
+ sp = ('Null', 'WRR', 'StrictPriority')
+ if q_sched_policy in sp:
+ self._q_sched_policy = sp.index(q_sched_policy)
+ else:
+ self._q_sched_policy = 0
+
@staticmethod
def create(handler, tcont, td):
log = structlog.get_logger(tcont=tcont, td=td)
@@ -52,9 +65,9 @@
return OnuTCont(handler,
tcont['alloc-id'],
- td,
- name=tcont['name'],
- vont_ani=tcont['vont-ani'])
+ tcont['q_sched_policy'],
+ td
+ )
@inlineCallbacks
def add_to_hardware(self, omci, tcont_entity_id):
@@ -64,6 +77,9 @@
self._entity_id = tcont_entity_id
try:
+ # FIXME: self.q_sched_policy seems to be READ-ONLY
+ # Ideally the READ-ONLY or NOT attribute is available from ONU-2G ME
+ #msg = TcontFrame(self.entity_id, self.alloc_id, self.q_sched_policy)
msg = TcontFrame(self.entity_id, self.alloc_id)
frame = msg.set()
self.log.debug('openomci-msg', msg=msg)
@@ -97,7 +113,6 @@
returnValue(results)
-
def check_status_and_state(self, results, operation=''):
self.log.debug('function-entry')
omci_msg = results.fields['omci_message'].fields
diff --git a/voltha/adapters/brcm_openomci_onu/pon_port.py b/voltha/adapters/brcm_openomci_onu/pon_port.py
index a555118..7fb6a9e 100644
--- a/voltha/adapters/brcm_openomci_onu/pon_port.py
+++ b/voltha/adapters/brcm_openomci_onu/pon_port.py
@@ -17,6 +17,12 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from voltha.protos.common_pb2 import AdminState, OperStatus
from voltha.protos.device_pb2 import Port
+from voltha.extensions.omci.tasks.task import Task
+
+BRDCM_DEFAULT_VLAN = 4091
+TASK_PRIORITY = Task.DEFAULT_PRIORITY + 10
+DEFAULT_TPID = 0x8100
+DEFAULT_GEM_PAYLOAD = 48
class PonPort(object):
@@ -43,6 +49,11 @@
self._gem_ports = {} # gem-id -> GemPort
self._tconts = {} # alloc-id -> TCont
+ self.pon_port_num = 3 # TODO why 3. maybe this is the ani port number. look at anis list
+
+ self.ieee_mapper_service_profile_entity_id = 0x8001
+ self.mac_bridge_port_ani_entity_id = 0x2102 # TODO: can we just use the entity id from the anis list?
+
def __str__(self):
return "PonPort" # TODO: Encode current state
@@ -221,15 +232,15 @@
self.log.exception('delete', e=e)
raise
- def gem_port(self, gem_id):
+ def gem_port(self, gem_id, direction):
self.log.debug('function-entry')
- return self._gem_ports.get(gem_id)
+ return self._gem_ports.get((gem_id, direction))
@property
def gem_ids(self):
"""Get all GEM Port IDs used by this ONU"""
self.log.debug('function-entry')
- return sorted([gem_id for gem_id, gem in self._gem_ports.items()])
+ return sorted([gem_id_and_direction[0] for gem_id_and_direction, gem in self._gem_ports.items()])
def add_gem_port(self, gem_port, reflow=False):
"""
@@ -248,25 +259,26 @@
return # nop
self.log.info('add', gem_port=gem_port, reflow=reflow)
- self._gem_ports[gem_port.gem_id] = gem_port
+ self._gem_ports[(gem_port.gem_id, gem_port.direction)] = gem_port
@inlineCallbacks
- def remove_gem_id(self, gem_id):
+ def remove_gem_id(self, gem_id, direction):
"""
Remove a GEM Port from this ONU
- :param gem_port: (GemPort) GEM Port to remove
+ :param gem_id: (GemPort) GEM Port to remove
+ :param direction: Direction of the gem port
:return: deferred
"""
self.log.debug('function-entry', gem_id=gem_id)
- gem_port = self._gem_ports.get(gem_id)
+ gem_port = self._gem_ports.get((gem_id, direction))
if gem_port is None:
returnValue('nop')
try:
- del self._gem_ports[gem_id]
+ del self._gem_ports[(gem_id, direction)]
results = yield gem_port.remove_from_hardware(self._handler.openomci.omci_cc)
returnValue(results)
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index fc4696a..bdd7c74 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -192,11 +192,11 @@
self.extra_args,
device_info)
- self.flow_mgr = self.flow_mgr_class(self.log, self.stub,
- self.device_id,
+ self.flow_mgr = self.flow_mgr_class(self.adapter_agent, self.log,
+ self.stub, self.device_id,
self.logical_device_id,
- self.platform,
- self.resource_mgr)
+ self.platform, self.resource_mgr)
+
self.alarm_mgr = self.alarm_mgr_class(self.log, self.adapter_agent,
self.device_id,
self.logical_device_id,
@@ -408,21 +408,12 @@
if onu_id is None:
raise Exception("onu-id-unavailable")
- pon_intf_onu_id = (intf_id, onu_id)
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id)
- if alloc_id is None:
- # Free up other PON resources if are unable to
- # proceed ahead
- self.resource_mgr.free_onu_id(intf_id, onu_id)
- raise Exception("alloc-id-unavailable")
-
self.add_onu_device(
intf_id,
self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT),
onu_id, serial_number)
self.activate_onu(intf_id, onu_id, serial_number,
- serial_number_str, alloc_id)
+ serial_number_str)
except Exception as e:
self.log.exception('onu-activation-failed', e=e)
@@ -451,14 +442,8 @@
onu_device.oper_status = OperStatus.DISCOVERED
self.adapter_agent.update_device(onu_device)
try:
- pon_intf_onu_id = (intf_id, onu_id)
- # The ONU is already in the VOLTHA DB and resources were
- # already allocated for this ONU. So we fetch the resource
- # from local cache and not KV store.
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id)
self.activate_onu(intf_id, onu_id, serial_number,
- serial_number_str, alloc_id)
+ serial_number_str)
except Exception as e:
self.log.error('onu-activation-error',
serial_number=serial_number_str, error=e)
@@ -494,11 +479,6 @@
onu_id=onu_indication.onu_id)
return
- # We will use this alloc_id, gemport_id to pass on to the onu adapter
- pon_intf_onu_id = (onu_indication.intf_id, onu_indication.onu_id)
- alloc_id = self.resource_mgr.get_alloc_id(pon_intf_onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(pon_intf_onu_id)
-
if self.platform.intf_id_from_pon_port_no(onu_device.parent_port_no) \
!= onu_indication.intf_id:
self.log.warn('ONU-is-on-a-different-intf-id-now',
@@ -585,23 +565,6 @@
# Prepare onu configuration
- # tcont creation (onu)
- tcont = TcontsConfigData()
- tcont.alloc_id = alloc_id
-
- # gem port creation
- gem_port = GemportsConfigData()
- gem_port.gemport_id = gemport_id
-
- gem_port.tcont_ref = str(tcont.alloc_id)
-
- self.log.info('inject-tcont-gem-data-onu-handler',
- onu_indication=onu_indication, tcont=tcont,
- gem_port=gem_port)
-
- onu_adapter_agent.create_tcont(onu_device, tcont,
- traffic_descriptor_data=None)
- onu_adapter_agent.create_gemport(onu_device, gem_port)
onu_adapter_agent.create_interface(onu_device, onu_indication)
else:
@@ -856,14 +819,16 @@
return port_no, label
- def delete_logical_port(self, child_device_id):
+ def delete_logical_port(self, child_device):
logical_ports = self.proxy.get('/logical_devices/{}/ports'.format(
self.logical_device_id))
for logical_port in logical_ports:
- if logical_port.device_id == child_device_id:
+ if logical_port.device_id == child_device.id:
self.log.debug('delete-logical-port',
- onu_device_id=child_device_id,
+ onu_device_id=child_device.id,
logical_port=logical_port)
+ self.flow_mgr.clear_flows_and_scheduler_for_logical_port(
+ child_device, logical_port)
self.adapter_agent.delete_logical_port(
self.logical_device_id, logical_port)
return
@@ -995,15 +960,13 @@
self.log.info('openolt device reenabled')
def activate_onu(self, intf_id, onu_id, serial_number,
- serial_number_str, alloc_id):
+ serial_number_str):
pir = self.bw_mgr.pir(serial_number_str)
self.log.debug("activating-onu", intf_id=intf_id, onu_id=onu_id,
serial_number_str=serial_number_str,
- serial_number=serial_number, pir=pir,
- alloc_id=alloc_id)
+ serial_number=serial_number, pir=pir)
onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id,
- serial_number=serial_number, pir=pir,
- alloc_id=alloc_id)
+ serial_number=serial_number, pir=pir)
self.stub.ActivateOnu(onu)
self.log.info('onu-activated', serial_number=serial_number_str)
@@ -1019,7 +982,7 @@
except Exception as e:
self.log.error('adapter_agent error', error=e)
try:
- self.delete_logical_port(child_device.id)
+ self.delete_logical_port(child_device)
except Exception as e:
self.log.error('logical_port delete error', error=e)
try:
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 59c6c27..7e6c258 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -16,6 +16,7 @@
import copy
from twisted.internet import reactor
import grpc
+from google.protobuf.json_format import MessageToDict
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -25,27 +26,31 @@
from voltha.adapters.openolt.protos import openolt_pb2
from voltha.registry import registry
-HSIA_FLOW_INDEX = 0 # FIXME
-DHCP_FLOW_INDEX = 1 # FIXME
-DHCP_DOWNLINK_FLOW_INDEX = 6 # FIXME
-EAPOL_FLOW_INDEX = 2 # FIXME
-EAPOL_SECONDARY_FLOW_INDEX = 5 # FIXME
-DOWNSTREAM_FLOW_FOR_PACKET_OUT = 7
-LLDP_FLOW_ID = 0x3FF8 # FIXME (16376)
+from common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+
+# Flow categories
+HSIA_FLOW = "HSIA_FLOW"
+DHCP_FLOW = "DHCP_FLOW"
+EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
+EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
+IGMP_FLOW = "IGMP_FLOW"
+LLDP_FLOW = "LLDP_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
+IGMP_PROTO = 2
+
# FIXME - see also BRDCM_DEFAULT_VLAN in broadcom_onu.py
DEFAULT_MGMT_VLAN = 4091
# Openolt Flow
-UPSTREAM = 'upstream'
-DOWNSTREAM = 'downstream'
-PACKET_TAG_TYPE = 'pkt_tag_type'
-UNTAGGED = 'untagged'
-SINGLE_TAG = 'single_tag'
-DOUBLE_TAG = 'double_tag'
+UPSTREAM = "upstream"
+DOWNSTREAM = "downstream"
+PACKET_TAG_TYPE = "pkt_tag_type"
+UNTAGGED = "untagged"
+SINGLE_TAG = "single_tag"
+DOUBLE_TAG = "double_tag"
# Classifier
ETH_TYPE = 'eth_type'
@@ -65,13 +70,14 @@
PUSH_VLAN = 'push_vlan'
TRAP_TO_HOST = 'trap_to_host'
-
+KV_STORE_TECH_PROFILE_PATH_PREFIX = 'voltha/technology_profiles'
class OpenOltFlowMgr(object):
- def __init__(self, log, stub, device_id, logical_device_id,
+ def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
platform, resource_mgr):
+ self.adapter_agent = adapter_agent
self.log = log
self.stub = stub
self.device_id = device_id
@@ -83,6 +89,8 @@
'/devices/{}/flows'.format(self.device_id))
self.root_proxy = registry('core').get_proxy('/')
self.resource_mgr = resource_mgr
+ self.tech_profile = dict()
+ self._populate_tech_profile_per_pon_port()
def add_flow(self, flow):
self.log.debug('add flow', flow=flow)
@@ -173,13 +181,12 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=classifier_info[IN_PORT])
- if fd.get_goto_table_id(flow) is not None and not POP_VLAN in \
- action_info:
+ if fd.get_goto_table_id(flow) is not None and POP_VLAN not in action_info:
self.log.debug('being taken care of by ONU', flow=flow)
return
- if not OUTPUT in action_info and METADATA in classifier_info:
- #find flow in the next table
+ if OUTPUT not in action_info and METADATA in classifier_info:
+ # find flow in the next table
next_flow = self.find_next_flow(flow)
if next_flow is None:
return
@@ -188,14 +195,84 @@
if field.type == fd.VLAN_VID:
classifier_info[METADATA] = field.vlan_vid & 0xfff
-
(intf_id, onu_id) = self.platform.extract_access_from_flow(
classifier_info[IN_PORT], action_info[OUTPUT])
-
self.divide_and_add_flow(intf_id, onu_id, classifier_info,
action_info, flow)
+ def _is_uni_port(self, port_no):
+ try:
+ port = self.adapter_agent.get_logical_port(self.logical_device_id,
+ 'uni-{}'.format(port_no))
+ if port is not None:
+ return (not port.root_port), port.device_id
+ else:
+ return False, None
+ except Exception as e:
+ self.log.error("error-retrieving-port", e=e)
+ return False, None
+
+ def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
+ uni_port_no = None
+ flow_category = HSIA_FLOW # default
+ child_device_id = None
+ if flow_direction == UPSTREAM:
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.IN_PORT:
+ is_uni, child_device_id = self._is_uni_port(field.port)
+ if is_uni:
+ uni_port_no = field.port
+ elif field.type == fd.IP_PROTO:
+ if field.ip_proto == IGMP_PROTO:
+ flow_category = IGMP_FLOW
+ elif field.type == fd.ETH_TYPE:
+ if field.eth_type == EAP_ETH_TYPE:
+ flow_category = EAPOL_PRIMARY_FLOW
+ elif field.eth_type == LLDP_ETH_TYPE:
+ flow_category = LLDP_FLOW
+
+ elif flow_direction == DOWNSTREAM:
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.METADATA:
+ uni_port = field.table_metadata & 0xFFFFFFFF
+ is_uni, child_device_id = self._is_uni_port(uni_port)
+ if is_uni:
+ uni_port_no = field.port
+
+ if uni_port_no is None:
+ for action in fd.get_actions(flow):
+ if action.type == fd.OUTPUT:
+ is_uni, child_device_id = \
+ self._is_uni_port(action.output.port)
+ if is_uni:
+ uni_port_no = action.output.port
+
+ if flow_category and child_device_id:
+ child_device = self.adapter_agent.get_device(child_device_id)
+ pon_intf = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+ flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id, flow_id)
+ assert (isinstance(flows, list))
+ self.log.debug("retrieved-flows", flows=flows)
+ for idx in range(len(flows)):
+ if flow_direction == flows[idx]['flow_type']:
+ flows.pop(idx)
+ self.update_flow_info_to_kv_store(pon_intf, onu_id,
+ flow_id, flows)
+ if len(flows) > 0:
+ # There are still flows referencing the same flow_id.
+ # So the flow should not be freed yet.
+ # For ex: Case of HSIA where same flow is shared
+ # between DS and US.
+ return
+
+ self.resource_mgr.free_flow_id(pon_intf, onu_id, flow_id)
+ else:
+ self.log.error("invalid-info", uni_port_no=uni_port_no,
+ flow_category=flow_category,
+ child_device_id=child_device_id)
+
def remove_flow(self, flow):
self.log.debug('trying to remove flows from logical flow :',
logical_flow=flow)
@@ -218,6 +295,10 @@
else:
raise grpc_e
+ # once we have successfully deleted the flow on the device
+ # release the flow_id on resource pool and also clear any
+ # data associated with the flow_id on KV store.
+ self._clear_flow_id_from_rm(f, id, direction)
self.log.debug('flow removed from device', flow=f,
flow_key=flow_to_remove)
@@ -233,203 +314,330 @@
flow_ids_removed=flows_ids_to_remove,
number_of_flows_removed=(len(device_flows) - len(
new_flows)), expected_flows_removed=len(
- device_flows_to_remove))
+ device_flows_to_remove))
else:
self.log.debug('no device flow to remove for this flow (normal '
'for multi table flows)', flow=flow)
+ def _get_ofp_port_name(self, intf_id, onu_id):
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
+ child_device = self.adapter_agent.get_child_device(self.device_id,
+ parent_port_no=parent_port_no, onu_id=onu_id)
+ if child_device is None:
+ self.log.error("could-not-find-child-device",
+ parent_port_no=intf_id, onu_id=onu_id)
+ return None
+ # FIXME: Assumes single UNI for a ONU device which is visible at ONOS
+ ports = self.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, ports[0].label)
+ ofp_port_name = logical_port.ofp_port.name
+ return ofp_port_name
+
def divide_and_add_flow(self, intf_id, onu_id, classifier,
action, flow):
self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
classifier=classifier, action=action)
- if IP_PROTO in classifier:
- if classifier[IP_PROTO] == 17:
- self.log.debug('dhcp flow add')
- self.add_dhcp_trap(intf_id, onu_id, classifier,
- action, flow)
- elif classifier[IP_PROTO] == 2:
- self.log.warn('igmp flow add ignored, not implemented yet')
- else:
- self.log.warn("Invalid-Classifier-to-handle",
- classifier=classifier,
- action=action)
- elif ETH_TYPE in classifier:
- if classifier[ETH_TYPE] == EAP_ETH_TYPE:
- self.log.debug('eapol flow add')
- self.add_eapol_flow(intf_id, onu_id, flow)
- vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
- if vlan_id is not None:
- self.add_eapol_flow(
- intf_id, onu_id, flow,
- eapol_id=EAPOL_SECONDARY_FLOW_INDEX,
- vlan_id=vlan_id)
- if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
- self.log.debug('lldp flow add')
- self.add_lldp_flow(flow)
+ alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id,
+ flow.table_id)
+ if alloc_id is None or gem_ports is None:
+ self.log.error("alloc-id-gem-ports-unavailable", alloc_id=alloc_id,
+ gem_ports=gem_ports)
+ return
- elif PUSH_VLAN in action:
- self.add_upstream_data_flow(intf_id, onu_id, classifier,
- action,
- flow)
- elif POP_VLAN in action:
- self.add_downstream_data_flow(intf_id, onu_id, classifier,
- action, flow)
- else:
- self.log.debug('Invalid-flow-type-to-handle',
- classifier=classifier,
- action=action, flow=flow)
+ self.log.debug('Generated required alloc and gemport ids',
+ alloc_id=alloc_id, gemports=gem_ports)
+
+ # Flows can't be added specific to gemport unless p-bits are received.
+ # Hence adding flows for all gemports
+ for gemport_id in gem_ports:
+ if IP_PROTO in classifier:
+ if classifier[IP_PROTO] == 17:
+ self.log.debug('dhcp flow add')
+ self.add_dhcp_trap(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ elif classifier[IP_PROTO] == 2:
+ self.log.warn('igmp flow add ignored, not implemented yet')
+ else:
+ self.log.warn("Invalid-Classifier-to-handle",
+ classifier=classifier,
+ action=action)
+ elif ETH_TYPE in classifier:
+ if classifier[ETH_TYPE] == EAP_ETH_TYPE:
+ self.log.debug('eapol flow add')
+ self.add_eapol_flow(intf_id, onu_id, flow, alloc_id,
+ gemport_id)
+ vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
+ if vlan_id is not None:
+ self.add_eapol_flow(
+ intf_id, onu_id, flow, alloc_id, gemport_id,
+ eapol_flow_category=EAPOL_SECONDARY_FLOW,
+ vlan_id=vlan_id)
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
+ onu_device = self.adapter_agent.get_child_device(self.device_id,
+ onu_id=onu_id,
+ parent_port_no=parent_port_no)
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return
+
+ # FIXME Should get Table id form the flow, as of now hardcoded to
+ # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+ tp_path = KV_STORE_TECH_PROFILE_PATH_PREFIX + '/' + \
+ self.tech_profile[intf_id]. \
+ get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+
+ self.log.debug('Load-tech-profile-request-to-brcm-handler',
+ tp_path=tp_path)
+ msg = {'proxy_address': onu_device.proxy_address,
+ 'event': 'download_tech_profile', 'event_data': tp_path}
+
+ # Send the event message to the ONU adapter
+ self.adapter_agent.publish_inter_adapter_message(onu_device.id,
+ msg)
+
+ if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
+ self.log.debug('lldp flow add')
+ self.add_lldp_flow(flow)
+
+ elif PUSH_VLAN in action:
+ self.add_upstream_data_flow(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ elif POP_VLAN in action:
+ self.add_downstream_data_flow(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ else:
+ self.log.debug('Invalid-flow-type-to-handle',
+ classifier=classifier,
+ action=action, flow=flow)
+
+ def create_tcont_gemport(self, intf_id, onu_id, table_id):
+ alloc_id, gem_port_ids = None, None
+ try:
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return alloc_id, gem_port_ids
+ # FIXME: If table id is <= 63 using 64 as table id
+ if table_id < DEFAULT_TECH_PROFILE_TABLE_ID:
+ table_id = DEFAULT_TECH_PROFILE_TABLE_ID
+
+ # Check tech profile instance already exists for derived port name
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ get_tech_profile_instance(table_id, ofp_port_name)
+ self.log.debug('Get-tech-profile-instance-status', tech_profile_instance=tech_profile_instance)
+
+ if tech_profile_instance is None:
+ # create tech profile instance
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ create_tech_profile_instance(table_id, ofp_port_name,
+ intf_id)
+ if tech_profile_instance is not None:
+
+ # upstream scheduler
+ us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+ tech_profile_instance)
+ # downstream scheduler
+ ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+ tech_profile_instance)
+ # create Tcont
+ tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+ us_scheduler,
+ ds_scheduler)
+
+ self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+ onu_id=onu_id,
+ tconts=tconts))
+ else:
+ raise Exception('Tech-profile-instance-creation-failed')
+ else:
+ self.log.debug(
+ 'Tech-profile-instance-already-exist-for-given port-name',
+ ofp_port_name=ofp_port_name)
+
+ # Fetch alloc id and gemports from tech profile instance
+ alloc_id = tech_profile_instance.us_scheduler.alloc_id
+ gem_port_ids = []
+ for i in range(len(
+ tech_profile_instance.upstream_gem_port_attribute_list)):
+ gem_port_ids.append(
+ tech_profile_instance.upstream_gem_port_attribute_list[i].
+ gemport_id)
+ except BaseException as e:
+ self.log.exception(exception=e)
+
+ # Update the allocated alloc_id and gem_port_id for the ONU to KV store
+ pon_intf_onu_id = (intf_id, onu_id)
+ self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
+ pon_intf_onu_id,
+ list([alloc_id])
+ )
+ self.resource_mgr.resource_mgrs[intf_id].update_gemport_ids_for_onu(
+ pon_intf_onu_id,
+ gem_port_ids
+ )
+
+ self.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(
+ gem_port_ids, intf_id, onu_id
+ )
+
+ return alloc_id, gem_port_ids
def add_upstream_data_flow(self, intf_id, onu_id, uplink_classifier,
- uplink_action, logical_flow):
-
+ uplink_action, logical_flow, alloc_id,
+ gemport_id):
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
-
self.add_hsia_flow(intf_id, onu_id, uplink_classifier,
- uplink_action, UPSTREAM, HSIA_FLOW_INDEX,
- logical_flow)
+ uplink_action, UPSTREAM,
+ logical_flow, alloc_id, gemport_id)
# Secondary EAP on the subscriber vlan
(eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id)
if eap_active:
- self.add_eapol_flow(intf_id, onu_id, eap_logical_flow,
- eapol_id=EAPOL_SECONDARY_FLOW_INDEX,
- vlan_id=uplink_classifier[VLAN_VID])
-
+ self.add_eapol_flow(intf_id, onu_id, eap_logical_flow, alloc_id,
+ gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
+ vlan_id=uplink_classifier[VLAN_VID])
def add_downstream_data_flow(self, intf_id, onu_id, downlink_classifier,
- downlink_action, flow):
+ downlink_action, flow, alloc_id, gemport_id):
downlink_classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
# Needed ???? It should be already there
downlink_action[POP_VLAN] = True
downlink_action[VLAN_VID] = downlink_classifier[VLAN_VID]
self.add_hsia_flow(intf_id, onu_id, downlink_classifier,
- downlink_action, DOWNSTREAM, HSIA_FLOW_INDEX,
- flow)
+ downlink_action, DOWNSTREAM,
+ flow, alloc_id, gemport_id)
- # To-Do right now only one GEM port is supported, so below method
- # will take care of handling all the p bits.
- # We need to revisit when mulitple gem port per p bits is needed.
- # Waiting for Technology profile
def add_hsia_flow(self, intf_id, onu_id, classifier, action,
- direction, hsia_id, logical_flow):
+ direction, logical_flow, alloc_id, gemport_id):
- pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- flow_id = self.platform.mk_flow_id(intf_id, onu_id, hsia_id)
+ flow_id = self.resource_mgr.get_hsia_flow_for_onu(intf_id, onu_id, gemport_id)
+ if flow_id is None:
+ self.log.error("hsia-flow-unavailable")
+ return
flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=flow_id, flow_type=direction,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
- priority=logical_flow.priority,
- classifier=self.mk_classifier(classifier),
- action=self.mk_action(action))
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=flow_id,
+ flow_type=direction, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ priority=logical_flow.priority)
- self.add_flow_to_device(flow, logical_flow)
+ if self.add_flow_to_device(flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
+ self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id,
+ flow.flow_id, flow_info)
- def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow):
+ def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow,
+ alloc_id, gemport_id):
self.log.debug('add dhcp upstream trap', classifier=classifier,
intf_id=intf_id, onu_id=onu_id, action=action)
action.clear()
action[TRAP_TO_HOST] = True
+ classifier[UDP_SRC] = 68
+ classifier[UDP_DST] = 67
classifier[PACKET_TAG_TYPE] = SINGLE_TAG
classifier.pop(VLAN_VID, None)
pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- flow_id = self.platform.mk_flow_id(intf_id, onu_id, DHCP_FLOW_INDEX)
+ flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
dhcp_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=flow_id, flow_type=UPSTREAM,
access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
+ alloc_id=alloc_id, network_intf_id=0,
priority=logical_flow.priority,
classifier=self.mk_classifier(classifier),
action=self.mk_action(action))
- self.add_flow_to_device(dhcp_flow, logical_flow)
+ if self.add_flow_to_device(dhcp_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+ self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
+ dhcp_flow.onu_id,
+ dhcp_flow.flow_id,
+ flow_info)
- def add_eapol_flow(self, intf_id, onu_id, logical_flow,
- eapol_id=EAPOL_FLOW_INDEX,
+ def add_eapol_flow(self, intf_id, onu_id, logical_flow, alloc_id,
+ gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
vlan_id=DEFAULT_MGMT_VLAN):
- uplink_classifier = {}
+ uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
uplink_classifier[VLAN_VID] = vlan_id
- uplink_action = {}
+ uplink_action = dict()
uplink_action[TRAP_TO_HOST] = True
- # Add Upstream EAPOL Flow.
-
pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- uplink_flow_id = self.platform.mk_flow_id(intf_id, onu_id, eapol_id)
+ # Add Upstream EAPOL Flow.
+ if eapol_flow_category == EAPOL_PRIMARY_FLOW:
+ uplink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
+ else:
+ uplink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
upstream_flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=uplink_flow_id, flow_type=UPSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
- priority=logical_flow.priority,
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=uplink_flow_id,
+ flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
classifier=self.mk_classifier(uplink_classifier),
- action=self.mk_action(uplink_action))
+ action=self.mk_action(uplink_action),
+ priority=logical_flow.priority)
logical_flow = copy.deepcopy(logical_flow)
logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
vlan_id | 0x1000)]))
logical_flow.match.type = OFPMT_OXM
- self.add_flow_to_device(upstream_flow, logical_flow)
+ if self.add_flow_to_device(upstream_flow, logical_flow):
+ if eapol_flow_category == EAPOL_PRIMARY_FLOW:
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ EAPOL_PRIMARY_FLOW)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.flow_id,
+ flow_info)
+ else:
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ EAPOL_SECONDARY_FLOW)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.flow_id,
+ flow_info)
if vlan_id == DEFAULT_MGMT_VLAN:
-
# Add Downstream EAPOL Flow, Only for first EAP flow (BAL
# requirement)
special_vlan_downstream_flow = 4000 - onu_id
- downlink_classifier = {}
+ downlink_classifier = dict()
downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
downlink_classifier[VLAN_VID] = special_vlan_downstream_flow
- downlink_action = {}
+ downlink_action = dict()
downlink_action[PUSH_VLAN] = True
downlink_action[VLAN_VID] = vlan_id
- downlink_flow_id = self.platform.mk_flow_id(
- intf_id, onu_id, DOWNSTREAM_FLOW_FOR_PACKET_OUT)
+ pon_intf_onu_id = (intf_id, onu_id)
+ downlink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
downstream_flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=downlink_flow_id, flow_type=DOWNSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- priority=logical_flow.priority,
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=downlink_flow_id,
+ flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
classifier=self.mk_classifier(downlink_classifier),
- action=self.mk_action(downlink_action))
+ action=self.mk_action(downlink_action),
+ priority=logical_flow.priority)
downstream_logical_flow = ofp_flow_stats(
id=logical_flow.id, cookie=logical_flow.cookie,
@@ -438,14 +646,20 @@
downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
fd.in_port(fd.get_out_port(logical_flow)),
- fd.vlan_vid((special_vlan_downstream_flow) | 0x1000)]))
+ fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
downstream_logical_flow.match.type = OFPMT_OXM
downstream_logical_flow.instructions.extend(
fd.mk_instructions_from_actions([fd.output(
self.platform.mk_uni_port_num(intf_id, onu_id))]))
- self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+ if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+ EAPOL_PRIMARY_FLOW)
+ self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
+ downstream_flow.onu_id,
+ downstream_flow.flow_id,
+ flow_info)
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
@@ -463,33 +677,48 @@
def reset_flows(self):
self.flows_proxy.update('/', Flows())
-
""" Add a downstream LLDP trap flow on the NNI interface
"""
+
def add_lldp_flow(self, logical_flow, network_intf_id=0):
- classifier = {}
+ classifier = dict()
classifier[ETH_TYPE] = LLDP_ETH_TYPE
classifier[PACKET_TAG_TYPE] = UNTAGGED
- action = {}
+ action = dict()
action[TRAP_TO_HOST] = True
- flow_id = LLDP_FLOW_ID # FIXME
+ # LLDP flow is installed to trap LLDP packets on the NNI port.
+ # We manage flow_id resource pool on per PON port basis.
+ # Since this situation is tricky, as a hack, we pass the NNI port
+ # index (network_intf_id) as PON port Index for the flow_id resource
+ # pool. Also, there is no ONU Id available for trapping LLDP packets
+ # on NNI port, use onu_id as -1 (invalid)
+ # ****************** CAVEAT *******************
+ # This logic works if the NNI Port Id falls within the same valid
+ # range of PON Port Ids. If this doesn't work for some OLT Vendor
+ # we need to have a re-look at this.
+ # *********************************************
+ onu_id = -1
+ intf_id_onu_id = (network_intf_id, onu_id)
+ flow_id = self.resource_mgr.get_flow_id(intf_id_onu_id)
downstream_flow = openolt_pb2.Flow(
- onu_id=-1, # onu_id not required
- gemport_id=-1, # gemport_id not required
access_intf_id=-1, # access_intf_id not required
+ onu_id=onu_id, # onu_id not required
flow_id=flow_id,
flow_type=DOWNSTREAM,
- priority=logical_flow.priority,
network_intf_id=network_intf_id,
+ gemport_id=-1, # gemport_id not required
classifier=self.mk_classifier(classifier),
- action=self.mk_action(action))
+ action=self.mk_action(action),
+ priority=logical_flow.priority)
self.log.debug('add lldp downstream trap', classifier=classifier,
action=action, flow=downstream_flow)
- self.add_flow_to_device(downstream_flow, logical_flow)
+ if self.add_flow_to_device(downstream_flow, logical_flow):
+ self.update_flow_info_to_kv_store(network_intf_id, onu_id,
+ flow_id, downstream_flow)
def mk_classifier(self, classifier_info):
@@ -562,9 +791,9 @@
intf_id=intf_id, eap_intf_id=eap_intf_id,
eap_onu_id=eap_onu_id)
if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id:
- return (True, flow)
+ return True, flow
- return (False, None)
+ return False, None
def get_subscriber_vlan(self, port):
self.log.debug('looking from subscriber flow for port', port=port)
@@ -574,7 +803,7 @@
in_port = fd.get_in_port(flow)
out_port = fd.get_out_port(flow)
if in_port == port and \
- self.platform.intf_id_to_port_type_name(out_port) \
+ self.platform.intf_id_to_port_type_name(out_port) \
== Port.ETHERNET_NNI:
fields = fd.get_ofb_fields(flow)
self.log.debug('subscriber flow found', fields=fields)
@@ -597,8 +826,15 @@
self.log.error('failed to add flow',
logical_flow=logical_flow, flow=flow,
grpc_error=grpc_e)
+ return False
else:
self.register_flow(logical_flow, flow)
+ return True
+
+ def update_flow_info_to_kv_store(self, intf_id, onu_id, flow_id, flow):
+ pon_intf_onu_id = (intf_id, onu_id)
+ self.resource_mgr.update_flow_id_info_for_onu(pon_intf_onu_id,
+ flow_id, flow)
def register_flow(self, logical_flow, device_flow):
self.log.debug('registering flow in device',
@@ -634,7 +870,6 @@
# FIXME
if fd.get_in_port(f) == fd.get_in_port(flow) and \
fd.get_out_port(f) == metadata:
-
next_flows.append(f)
if len(next_flows) == 0:
@@ -656,6 +891,51 @@
self.root_proxy.update('/devices/{}/flow_groups'.format(
device_id), FlowGroups(items=groups.values()))
+ def clear_flows_and_scheduler_for_logical_port(self, child_device, logical_port):
+ ofp_port_name = logical_port.ofp_port.name
+ pon_port = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+ # TODO: The DEFAULT_TECH_PROFILE_ID is assumed. Right way to do,
+ # is probably to maintain a list of Tech-profile table IDs associated
+ # with the UNI logical_port. This way, when the logical port is deleted,
+ # all the associated tech-profile configuration with the UNI logical_port
+ # can be cleared.
+ tech_profile_instance = self.tech_profile[pon_port]. \
+ get_tech_profile_instance(
+ DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+ flow_ids = self.resource_mgr.get_current_flow_ids_for_onu(pon_port,
+ onu_id)
+ self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
+ for flow_id in flow_ids:
+ flow_infos = self.resource_mgr.get_flow_id_info(pon_port,
+ onu_id,
+ flow_id)
+ for flow_info in flow_infos:
+ direction = flow_info['flow_type']
+ flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
+ flow_type=direction)
+ try:
+ self.stub.FlowRemove(flow_to_remove)
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
+ self.log.debug('This flow does not exist on the switch, '
+ 'normal after an OLT reboot',
+ flow=flow_to_remove)
+ else:
+ raise grpc_e
+
+ self.resource_mgr.free_flow_id(pon_port, onu_id, flow_id)
+
+ try:
+ tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
+ self.stub.RemoveTconts(openolt_pb2.Tconts(intf_id=pon_port,
+ onu_id=onu_id,
+ tconts=tconts))
+ except grpc.RpcError as grpc_e:
+ self.log.error('error-removing-tcont-scheduler-queues',
+ err=grpc_e)
+
def generate_stored_id(self, flow_id, direction):
if direction == UPSTREAM:
self.log.debug('upstream flow, shifting id')
@@ -669,6 +949,33 @@
def decode_stored_id(self, id):
if id >> 15 == 0x1:
- return (id & 0x7fff, UPSTREAM)
+ return id & 0x7fff, UPSTREAM
else:
- return (id, DOWNSTREAM)
+ return id, DOWNSTREAM
+
+ def _populate_tech_profile_per_pon_port(self):
+ for arange in self.resource_mgr.device_info.ranges:
+ for intf_id in arange.intf_ids:
+ self.tech_profile[intf_id] = \
+ self.resource_mgr.resource_mgrs[intf_id].tech_profile
+
+ # Make sure we have as many tech_profiles as there are pon ports on
+ # the device
+ assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
+
+ def _get_flow_info_as_json_blob(self, flow, flow_category):
+ json_blob = MessageToDict(message=flow,
+ preserving_proto_field_name=True)
+ self.log.debug("flow-info", json_blob=json_blob)
+ json_blob['flow_category'] = flow_category
+ flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
+ flow.onu_id, flow.flow_id)
+
+ if flow_info is None:
+ flow_info = list()
+ flow_info.append(json_blob)
+ else:
+ assert (isinstance(flow_info, list))
+ flow_info.append(json_blob)
+
+ return flow_info
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index 6ac8a22..7e869d4 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -20,12 +20,13 @@
from voltha.registry import registry
from voltha.core.config.config_backend import ConsulStore
from voltha.core.config.config_backend import EtcdStore
+from voltha.adapters.openolt.openolt_flow_mgr import *
+
from voltha.adapters.openolt.protos import openolt_pb2
+
class OpenOltResourceMgr(object):
- GEMPORT_IDS = "gemport_ids"
- ALLOC_IDS = "alloc_ids"
- BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
+ BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
def __init__(self, device_id, host_and_port, extra_args, device_info):
self.log = structlog.get_logger(id=device_id,
@@ -37,7 +38,6 @@
self.args = registry('main').get_args()
# KV store's IP Address and PORT
- host, port = '127.0.0.1', 8500
if self.args.backend == 'etcd':
host, port = self.args.etcd.split(':', 1)
self.kv_store = EtcdStore(host, port,
@@ -80,6 +80,12 @@
pool.end = self.device_info.gemport_id_end
pool.sharing = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH
+ pool = arange.pools.add()
+ pool.type = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.FLOW_ID
+ pool.start = self.device_info.flow_id_start
+ pool.end = self.device_info.flow_id_end
+ pool.sharing = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH
+
# Create a separate Resource Manager instance for each range. This assumes that
# each technology is represented by only a single range
global_resource_mgr = None
@@ -87,11 +93,12 @@
technology = arange.technology
self.log.info("device-info", technology=technology)
ranges[technology] = arange
- extra_args = self.extra_args + ' ' + PONResourceManager.OLT_MODEL_ARG + ' {}'.format(self.device_info.model)
+ extra_args = self.extra_args + ' ' + PONResourceManager.OLT_MODEL_ARG + ' {}'.format(self.device_info.model)
resource_mgr = PONResourceManager(technology,
- extra_args, self.device_id, self.args.backend, host, port)
+ extra_args, self.device_id, self.args.backend, host, port)
resource_mgrs_by_tech[technology] = resource_mgr
- if global_resource_mgr is None: global_resource_mgr = resource_mgr
+ if global_resource_mgr is None:
+ global_resource_mgr = resource_mgr
for intf_id in arange.intf_ids:
self.resource_mgrs[intf_id] = resource_mgrs_by_tech[technology]
self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
@@ -103,7 +110,7 @@
def __del__(self):
self.log.info("clearing-device-resource-pool")
- for key, resource_mgr in self.resource_mgrs.iteritems():
+ for key, resource_mgr in self.resource_mgrs.iteritems():
resource_mgr.clear_device_resource_pool()
def get_onu_id(self, pon_intf_id):
@@ -117,6 +124,46 @@
return onu_id
+ def get_flow_id(self, pon_intf_onu_id):
+ pon_intf = pon_intf_onu_id[0]
+ flow_id = self.resource_mgrs[pon_intf].get_resource_id(
+ pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
+ if flow_id is not None:
+ self.resource_mgrs[pon_intf].update_flow_id_for_onu(pon_intf_onu_id, flow_id)
+
+ return flow_id
+
+ def get_flow_id_info(self, pon_intf_id, onu_id, flow_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ return self.resource_mgrs[pon_intf_id].get_flow_id_info(pon_intf_onu_id, flow_id)
+
+ def get_current_flow_ids_for_onu(self, pon_intf_id, onu_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ return self.resource_mgrs[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_onu_id)
+
+ def update_flow_id_info_for_onu(self, pon_intf_onu_id, flow_id, flow_data):
+ pon_intf_id = pon_intf_onu_id[0]
+ return self.resource_mgrs[pon_intf_id].update_flow_id_info_for_onu(
+ pon_intf_onu_id, flow_id, flow_data)
+
+ def get_hsia_flow_for_onu(self, pon_intf_id, onu_id, gemport_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ try:
+ flow_ids = self.resource_mgrs[pon_intf_id]. \
+ get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if flow_ids is not None:
+ for flow_id in flow_ids:
+ flows = self.get_flow_id_info(pon_intf_id, onu_id, flow_id)
+ assert (isinstance(flows, list))
+ for flow in flows:
+ if flow['flow_category'] == HSIA_FLOW and \
+ flow['gemport_id'] == gemport_id:
+ return flow_id
+ except Exception as e:
+ self.log.error("error-retrieving-flow-info", e=e)
+
+ return self.get_flow_id(pon_intf_onu_id)
+
def get_alloc_id(self, pon_intf_onu_id):
# Derive the pon_intf from the pon_intf_onu_id tuple
pon_intf = pon_intf_onu_id[0]
@@ -129,27 +176,35 @@
# ONU.
return alloc_id_list[0]
- alloc_id_list = self.resource_mgrs[pon_intf].get_resource_id(
+ alloc_id = self.resource_mgrs[pon_intf].get_resource_id(
pon_intf_id=pon_intf,
resource_type=PONResourceManager.ALLOC_ID,
num_of_id=1
)
- if alloc_id_list and len(alloc_id_list) == 0:
+ if alloc_id is None:
self.log.error("no-alloc-id-available")
return None
# update the resource map on KV store with the list of alloc_id
# allocated for the pon_intf_onu_id tuple
self.resource_mgrs[pon_intf].update_alloc_ids_for_onu(pon_intf_onu_id,
- alloc_id_list)
-
- # Since we request only one alloc id, we refer the 0th
- # index
- alloc_id = alloc_id_list[0]
+ list(alloc_id))
return alloc_id
- def get_gemport_id(self, pon_intf_onu_id):
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ pon_intf_id = pon_intf_onu_id[0]
+ return self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_onu_id)
+
+ def update_gemports_ponport_to_onu_map_on_kv_store(self, gemport_list, pon_port, onu_id):
+ for gemport in gemport_list:
+ pon_intf_gemport = (pon_port, gemport)
+ # This information is used when packet_indication is received and
+ # we need to derive the ONU Id for which the packet arrived based
+ # on the pon_intf and gemport available in the packet_indication
+ self.kv_store[str(pon_intf_gemport)] = str(onu_id)
+
+ def get_gemport_id(self, pon_intf_onu_id, num_of_id=1):
# Derive the pon_intf and onu_id from the pon_intf_onu_id tuple
pon_intf = pon_intf_onu_id[0]
onu_id = pon_intf_onu_id[1]
@@ -157,15 +212,12 @@
gemport_id_list = self.resource_mgrs[pon_intf].get_current_gemport_ids_for_onu(
pon_intf_onu_id)
if gemport_id_list and len(gemport_id_list) > 0:
- # Since we support only one gemport_id for the ONU at the moment,
- # return the first gemport_id in the list, if available, for that
- # ONU.
- return gemport_id_list[0]
+ return gemport_id_list
gemport_id_list = self.resource_mgrs[pon_intf].get_resource_id(
pon_intf_id=pon_intf,
resource_type=PONResourceManager.GEMPORT_ID,
- num_of_id=1
+ num_of_id=num_of_id
)
if gemport_id_list and len(gemport_id_list) == 0:
@@ -175,27 +227,29 @@
# update the resource map on KV store with the list of gemport_id
# allocated for the pon_intf_onu_id tuple
self.resource_mgrs[pon_intf].update_gemport_ids_for_onu(pon_intf_onu_id,
- gemport_id_list)
+ gemport_id_list)
- # We currently use only one gemport
- gemport = gemport_id_list[0]
-
- pon_intf_gemport = (pon_intf, gemport)
- # This information is used when packet_indication is received and
- # we need to derive the ONU Id for which the packet arrived based
- # on the pon_intf and gemport available in the packet_indication
- self.kv_store[str(pon_intf_gemport)] = str(onu_id)
-
- return gemport
+ self.update_gemports_ponport_to_onu_map_on_kv_store(gemport_id_list,
+ pon_intf, onu_id)
+ return gemport_id_list
def free_onu_id(self, pon_intf_id, onu_id):
- result = self.resource_mgrs[pon_intf_id].free_resource_id(
+ _ = self.resource_mgrs[pon_intf_id].free_resource_id(
pon_intf_id, PONResourceManager.ONU_ID, onu_id)
pon_intf_onu_id = (pon_intf_id, onu_id)
self.resource_mgrs[pon_intf_id].remove_resource_map(
pon_intf_onu_id)
+ def free_flow_id(self, pon_intf_id, onu_id, flow_id):
+ self.resource_mgrs[pon_intf_id].free_resource_id(
+ pon_intf_id, PONResourceManager.FLOW_ID, flow_id)
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(pon_intf_onu_id,
+ flow_id, False)
+ self.resource_mgrs[pon_intf_id].remove_flow_id_info(pon_intf_onu_id,
+ flow_id)
+
def free_pon_resources_for_onu(self, pon_intf_id_onu_id):
pon_intf_id = pon_intf_id_onu_id[0]
@@ -203,18 +257,18 @@
alloc_ids = \
self.resource_mgrs[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_id_onu_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.ALLOC_ID,
- alloc_ids)
+ PONResourceManager.ALLOC_ID,
+ alloc_ids)
gemport_ids = \
self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_id_onu_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.GEMPORT_ID,
- gemport_ids)
+ PONResourceManager.GEMPORT_ID,
+ gemport_ids)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
- PONResourceManager.ONU_ID,
- onu_id)
+ PONResourceManager.ONU_ID,
+ onu_id)
# Clear resource map associated with (pon_intf_id, gemport_id) tuple.
self.resource_mgrs[pon_intf_id].remove_resource_map(pon_intf_id_onu_id)
@@ -242,20 +296,25 @@
onu_id_shared_pool_id = None
alloc_id_start = self.device_info.alloc_id_start
alloc_id_end = self.device_info.alloc_id_end
- alloc_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ alloc_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
alloc_id_shared_pool_id = None
gemport_id_start = self.device_info.gemport_id_start
gemport_id_end = self.device_info.gemport_id_end
- gemport_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ gemport_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
gemport_id_shared_pool_id = None
+ flow_id_start = self.device_info.flow_id_start
+ flow_id_end = self.device_info.flow_id_end
+ flow_id_shared = openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH # TODO EdgeCore/BAL limitation
+ flow_id_shared_pool_id = None
global_pool_id = 0
- for first_intf_pool_id in arange.intf_ids: break;
+ for first_intf_pool_id in arange.intf_ids:
+ break
for pool in arange.pools:
shared_pool_id = global_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH else \
- first_intf_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_SAME_TECH else \
- None
+ first_intf_pool_id if pool.sharing == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_SAME_TECH else \
+ None
if pool.type == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.ONU_ID:
onu_id_start = pool.start
@@ -272,40 +331,60 @@
gemport_id_end = pool.end
gemport_id_shared = pool.sharing
gemport_id_shared_pool_id = shared_pool_id
+ elif pool.type == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.FLOW_ID:
+ flow_id_start = pool.start
+ flow_id_end = pool.end
+ flow_id_shared = pool.sharing
+ flow_id_shared_pool_id = shared_pool_id
self.log.info("device-info-init", technology=arange.technology,
- onu_id_start=onu_id_start, onu_id_end=onu_id_end, onu_id_shared_pool_id=onu_id_shared_pool_id,
- alloc_id_start=alloc_id_start, alloc_id_end=alloc_id_end, alloc_id_shared_pool_id=alloc_id_shared_pool_id,
- gemport_id_start=gemport_id_start, gemport_id_end=gemport_id_end, gemport_id_shared_pool_id=gemport_id_shared_pool_id,
- intf_ids=arange.intf_ids)
+ onu_id_start=onu_id_start, onu_id_end=onu_id_end, onu_id_shared_pool_id=onu_id_shared_pool_id,
+ alloc_id_start=alloc_id_start, alloc_id_end=alloc_id_end,
+ alloc_id_shared_pool_id=alloc_id_shared_pool_id,
+ gemport_id_start=gemport_id_start, gemport_id_end=gemport_id_end,
+ gemport_id_shared_pool_id=gemport_id_shared_pool_id,
+ flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end,
+ flow_id_shared_pool_id=flow_id_shared_pool_id,
+ intf_ids=arange.intf_ids)
resource_mgr.init_default_pon_resource_ranges(
- onu_id_start_idx=onu_id_start,
- onu_id_end_idx=onu_id_end,
- onu_id_shared_pool_id=onu_id_shared_pool_id,
- alloc_id_start_idx=alloc_id_start,
- alloc_id_end_idx=alloc_id_end,
- alloc_id_shared_pool_id=alloc_id_shared_pool_id,
- gemport_id_start_idx=gemport_id_start,
- gemport_id_end_idx=gemport_id_end,
- gemport_id_shared_pool_id=gemport_id_shared_pool_id,
- num_of_pon_ports=self.device_info.pon_ports,
- intf_ids=arange.intf_ids
- )
+ onu_id_start_idx=onu_id_start,
+ onu_id_end_idx=onu_id_end,
+ onu_id_shared_pool_id=onu_id_shared_pool_id,
+ alloc_id_start_idx=alloc_id_start,
+ alloc_id_end_idx=alloc_id_end,
+ alloc_id_shared_pool_id=alloc_id_shared_pool_id,
+ gemport_id_start_idx=gemport_id_start,
+ gemport_id_end_idx=gemport_id_end,
+ gemport_id_shared_pool_id=gemport_id_shared_pool_id,
+ flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end,
+ flow_id_shared_pool_id=flow_id_shared_pool_id,
+ num_of_pon_ports=self.device_info.pon_ports,
+ intf_ids=arange.intf_ids
+ )
# For global sharing, make sure to refresh both local and global resource manager instances' range
if global_resource_mgr is not self:
if onu_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
global_resource_mgr.update_ranges(onu_id_start_idx=onu_id_start, onu_id_end_idx=onu_id_end)
resource_mgr.update_ranges(onu_id_start_idx=onu_id_start, onu_id_end_idx=onu_id_end,
- onu_id_shared_resource_mgr=global_resource_mgr)
+ onu_id_shared_resource_mgr=global_resource_mgr)
if alloc_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
global_resource_mgr.update_ranges(alloc_id_start_idx=alloc_id_start, alloc_id_end_idx=alloc_id_end)
resource_mgr.update_ranges(alloc_id_start_idx=alloc_id_start, alloc_id_end_idx=alloc_id_end,
- alloc_id_shared_resource_mgr=global_resource_mgr)
+ alloc_id_shared_resource_mgr=global_resource_mgr)
if gemport_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
- global_resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start, gemport_id_end_idx=gemport_id_end)
+ global_resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start,
+ gemport_id_end_idx=gemport_id_end)
resource_mgr.update_ranges(gemport_id_start_idx=gemport_id_start, gemport_id_end_idx=gemport_id_end,
- gemport_id_shared_resource_mgr=global_resource_mgr)
+ gemport_id_shared_resource_mgr=global_resource_mgr)
+
+ if flow_id_shared == openolt_pb2.DeviceInfo.DeviceResourceRanges.Pool.SHARED_BY_ALL_INTF_ALL_TECH:
+ global_resource_mgr.update_ranges(flow_id_start_idx=flow_id_start,
+ flow_id_end_idx=flow_id_end)
+ resource_mgr.update_ranges(flow_id_start_idx=flow_id_start, flow_id_end_idx=flow_id_end,
+ flow_id_shared_resource_mgr=global_resource_mgr)
diff --git a/voltha/adapters/openolt/protos/openolt.proto b/voltha/adapters/openolt/protos/openolt.proto
index ac9093b..8c35bba 100644
--- a/voltha/adapters/openolt/protos/openolt.proto
+++ b/voltha/adapters/openolt/protos/openolt.proto
@@ -130,6 +130,20 @@
};
}
+ rpc CreateTconts(Tconts) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/CreateTconts"
+ body: "*"
+ };
+ }
+
+ rpc RemoveTconts(Tconts) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/RemoveTconts"
+ body: "*"
+ };
+ }
+
rpc EnableIndication(Empty) returns (stream Indication) {}
}
@@ -219,7 +233,6 @@
fixed32 onu_id = 2;
SerialNumber serial_number = 3;
fixed32 pir = 4; // peak information rate assigned to onu
- fixed32 alloc_id = 5;
}
message OmciMsg {
@@ -443,5 +456,91 @@
fixed32 onu_id = 2;
}
+enum Direction {
+ UPSTREAM = 0;
+ DOWNSTREAM = 1;
+ BIDIRECTIONAL = 2;
+}
+
+enum SchedulingPolicy {
+ WRR = 0;
+ StrictPriority = 1;
+ Hybrid = 2;
+}
+
+enum AdditionalBW {
+ AdditionalBW_None = 0;
+ AdditionalBW_NA = 1;
+ AdditionalBW_BestEffort = 2;
+ AdditionalBW_Auto = 3;
+}
+
+enum DiscardPolicy {
+ TailDrop = 0;
+ WTailDrop = 1;
+ Red = 2;
+ WRed = 3;
+}
+
+enum InferredAdditionBWIndication {
+ InferredAdditionBWIndication_None = 0;
+ InferredAdditionBWIndication_Assured = 1;
+ InferredAdditionBWIndication_BestEffort = 2;
+}
+
+message Scheduler {
+ Direction direction = 1;
+ AdditionalBW additional_bw = 2; // Valid on for “direction == Upstream”.
+ fixed32 priority = 3;
+ fixed32 weight = 4;
+ SchedulingPolicy sched_policy = 5;
+}
+
+message TrafficShapingInfo {
+ fixed32 cir = 1;
+ fixed32 cbs = 2;
+ fixed32 pir = 3;
+ fixed32 pbs = 4;
+ fixed32 gir = 5; // only if “direction == Upstream ”
+ InferredAdditionBWIndication add_bw_ind = 6; // only if “direction == Upstream”
+}
+
+message Tcont {
+ Direction direction = 1;
+ fixed32 alloc_id = 2; // valid only if “direction == Upstream ”
+ Scheduler scheduler = 3;
+ TrafficShapingInfo traffic_shaping_info = 4;
+}
+
+message Tconts {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ repeated Tcont tconts = 3;
+}
+
+message TailDropDiscardConfig {
+ fixed32 queue_size = 1;
+}
+
+message RedDiscardConfig {
+ fixed32 min_threshold = 1;
+ fixed32 max_threshold = 2;
+ fixed32 max_probability = 3;
+}
+
+message WRedDiscardConfig {
+ RedDiscardConfig green = 1;
+ RedDiscardConfig yellow = 2;
+ RedDiscardConfig red = 3;
+}
+
+message DiscardConfig {
+ DiscardPolicy discard_policy = 1;
+ oneof discard_config {
+ TailDropDiscardConfig tail_drop_discard_config = 2;
+ RedDiscardConfig red_discard_config = 3;
+ WRedDiscardConfig wred_discard_config = 4;
+ }
+}
message Empty {}
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 255b2e7..381559e 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -1049,3 +1049,4 @@
def forward_onu_detect_state(self, device_id, state):
topic = self._gen_onu_detect_proxy_address_topic(device_id)
self.event_bus.publish(topic, state)
+