blob: b335546494523c27546eeb5de68a85d2ccd2ac27 [file] [log] [blame]
# SPDX-FileCopyrightText: 2020 The Magma Authors.
# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
#
# SPDX-License-Identifier: BSD-3-Clause
from typing import Any, Dict, List, Optional
from data_models.data_model import DataModel
from data_models.data_model_parameters import ParameterName
from device_config.enodeb_configuration import EnodebConfiguration
from devices.device_utils import EnodebDeviceName, get_device_name
from exceptions import ConfigurationError
from logger import EnodebdLogger as logger
from tr069 import models
def process_inform_message(
inform: Any,
data_model: DataModel,
device_cfg: EnodebConfiguration,
) -> None:
"""
Modifies the device configuration based on what is received in the Inform
message. Will raise an error if it turns out that the data model we are
using is incorrect. This is decided based on the device OUI and
software-version that is reported in the Inform message.
Args:
inform: Inform Tr069 message
device_handler: The state machine we are using for our device
"""
param_values_by_path = _get_param_values_by_path(inform)
param_name_list = data_model.get_parameter_names()
name_to_val = {}
for name in param_name_list:
path = data_model.get_parameter(name).path
if path in param_values_by_path:
value = param_values_by_path[path]
name_to_val[name] = value
for name, val in name_to_val.items():
device_cfg.set_parameter(name, val)
def get_device_name_from_inform(
inform: models.Inform,
) -> EnodebDeviceName:
def _get_param_value_from_path_suffix(
suffix: str,
path_list: List[str],
param_values_by_path: Dict[str, Any],
) -> Any:
for path in path_list:
if path.endswith(suffix):
return param_values_by_path[path]
raise ConfigurationError('Did not receive expected info in Inform')
param_values_by_path = _get_param_values_by_path(inform)
# Check the OUI and version number to see if the data model matches
path_list = list(param_values_by_path.keys())
if hasattr(inform, 'DeviceId') and \
hasattr(inform.DeviceId, 'OUI'):
device_oui = inform.DeviceId.OUI
else:
device_oui = _get_param_value_from_path_suffix(
'DeviceInfo.ManufacturerOUI',
path_list,
param_values_by_path,
)
sw_version = _get_param_value_from_path_suffix(
'DeviceInfo.SoftwareVersion',
path_list,
param_values_by_path,
)
return get_device_name(device_oui, sw_version)
def does_inform_have_event(
inform: models.Inform,
event_code: str,
) -> bool:
""" True if the Inform message contains the specified event code """
for event in inform.Event.EventStruct:
if event.EventCode == event_code:
return True
return False
def _get_param_values_by_path(
inform: models.Inform,
) -> Dict[str, Any]:
if not hasattr(inform, 'ParameterList') or \
not hasattr(inform.ParameterList, 'ParameterValueStruct'):
raise ConfigurationError('Did not receive ParamterList in Inform')
param_values_by_path = {}
for param_value in inform.ParameterList.ParameterValueStruct:
path = param_value.Name
value = param_value.Value.Data
logger.debug(
'(Inform msg) Received parameter: %s = %s', path,
value,
)
param_values_by_path[path] = value
return param_values_by_path
def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool:
"""
Compare two parameters in TR-069 format.
The following differences are ignored:
- Leading and trailing whitespace, commas and quotes
- Capitalization, for booleans (true, false)
Returns:
True if params are the same
"""
# Cast booleans to integers
cmp_a, cmp_b = param_a, param_b
if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'):
cmp_a, cmp_b = map(int, (cmp_a, cmp_b))
cmp_a, cmp_b = map(str, (cmp_a, cmp_b))
cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b))
if cmp_a.lower() in ['true', 'false']:
cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b))
return cmp_a == cmp_b
def get_all_objects_to_add(
desired_cfg: EnodebConfiguration,
device_cfg: EnodebConfiguration,
) -> List[ParameterName]:
"""
Find a ParameterName that needs to be added to the eNB configuration,
if any
Note: This is the expected name of the parameter once it is added
but this is different than how to add it. For example,
enumerated objects of the form XX.YY.N. should be added
by calling AddObject to XX.YY. and having the CPE assign
the index.
"""
desired = desired_cfg.get_object_names()
current = device_cfg.get_object_names()
return list(set(desired).difference(set(current)))
def get_all_objects_to_delete(
desired_cfg: EnodebConfiguration,
device_cfg: EnodebConfiguration,
) -> List[ParameterName]:
"""
Find a ParameterName that needs to be deleted from the eNB configuration,
if any
"""
desired = desired_cfg.get_object_names()
current = device_cfg.get_object_names()
return list(set(current).difference(set(desired)))
def get_params_to_get(
device_cfg: EnodebConfiguration,
data_model: DataModel,
request_all_params: bool = False,
) -> List[ParameterName]:
"""
Returns the names of params not belonging to objects that are added/removed
"""
desired_names = data_model.get_present_params()
if request_all_params:
return desired_names
known_names = device_cfg.get_parameter_names()
names = list(set(desired_names) - set(known_names))
return names
def get_object_params_to_get(
desired_cfg: Optional[EnodebConfiguration],
device_cfg: EnodebConfiguration,
data_model: DataModel,
) -> List[ParameterName]:
"""
Returns a list of parameter names for object parameters we don't know the
current value of
"""
names = []
# TODO: This might a string for some strange reason, investigate why
num_plmns = \
int(device_cfg.get_parameter(ParameterName.NUM_PLMNS))
for i in range(1, num_plmns + 1):
obj_name = ParameterName.PLMN_N % i
if not device_cfg.has_object(obj_name):
device_cfg.add_object(obj_name)
obj_to_params = data_model.get_numbered_param_names()
desired = obj_to_params[obj_name]
current = []
if desired_cfg is not None:
current = desired_cfg.get_parameter_names_for_object(obj_name)
names_to_add = list(set(desired) - set(current))
names = names + names_to_add
return names
# We don't attempt to set these parameters on the eNB configuration
READ_ONLY_PARAMETERS = [
ParameterName.OP_STATE,
ParameterName.RF_TX_STATUS,
ParameterName.GPS_STATUS,
ParameterName.PTP_STATUS,
ParameterName.MME_STATUS,
ParameterName.GPS_LAT,
ParameterName.GPS_LONG,
]
def get_param_values_to_set(
desired_cfg: EnodebConfiguration,
device_cfg: EnodebConfiguration,
data_model: DataModel,
exclude_admin: bool = False,
) -> Dict[ParameterName, Any]:
"""
Get a map of param names to values for parameters that we will
set on the eNB's configuration, excluding parameters for objects that can
be added/removed.
Also exclude special parameters like admin state, since it may be set at
a different time in the provisioning process than most parameters.
"""
param_values = {}
# Get the parameters we might set
params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS)
if exclude_admin:
params = set(params) - {ParameterName.ADMIN_STATE}
# Values of parameters
for name in params:
new = desired_cfg.get_parameter(name)
old = device_cfg.get_parameter(name)
_type = data_model.get_parameter(name).type
if not are_tr069_params_equal(new, old, _type):
param_values[name] = new
return param_values
def get_obj_param_values_to_set(
desired_cfg: EnodebConfiguration,
device_cfg: EnodebConfiguration,
data_model: DataModel,
) -> Dict[ParameterName, Dict[ParameterName, Any]]:
""" Returns a map from object name to (a map of param name to value) """
param_values = {}
objs = desired_cfg.get_object_names()
for obj_name in objs:
param_values[obj_name] = {}
params = desired_cfg.get_parameter_names_for_object(obj_name)
for name in params:
new = desired_cfg.get_parameter_for_object(name, obj_name)
old = device_cfg.get_parameter_for_object(name, obj_name)
_type = data_model.get_parameter(name).type
if not are_tr069_params_equal(new, old, _type):
param_values[obj_name][name] = new
return param_values
def get_all_param_values_to_set(
desired_cfg: EnodebConfiguration,
device_cfg: EnodebConfiguration,
data_model: DataModel,
exclude_admin: bool = False,
) -> Dict[ParameterName, Any]:
""" Returns a map of param names to values that we need to set """
param_values = get_param_values_to_set(
desired_cfg, device_cfg,
data_model, exclude_admin,
)
obj_param_values = get_obj_param_values_to_set(
desired_cfg, device_cfg,
data_model,
)
for _obj_name, param_map in obj_param_values.items():
for name, val in param_map.items():
param_values[name] = val
return param_values
def parse_get_parameter_values_response(
data_model: DataModel,
message: models.GetParameterValuesResponse,
) -> Dict[ParameterName, Any]:
""" Returns a map of ParameterName to the value read from the response """
param_values_by_path = {}
for param_value_struct in message.ParameterList.ParameterValueStruct:
param_values_by_path[param_value_struct.Name] = \
param_value_struct.Value.Data
param_name_list = data_model.get_parameter_names()
name_to_val = {}
for name in param_name_list:
path = data_model.get_parameter(name).path
if path in param_values_by_path:
value = param_values_by_path[path]
name_to_val[name] = value
return name_to_val
def get_optional_param_to_check(
data_model: DataModel,
) -> Optional[ParameterName]:
"""
If there is a parameter which is optional in the data model, and we do not
know if it exists or not, then return it so we can check for its presence.
"""
params = data_model.get_names_of_optional_params()
for param in params:
try:
data_model.is_parameter_present(param)
except KeyError:
return param
return None