| # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| import json |
| from typing import Any, List |
| |
| from data_models.data_model import DataModel |
| from data_models.data_model_parameters import ParameterName |
| |
| from collections import namedtuple |
| |
| from lte_utils import DuplexMode, map_earfcndl_to_band_earfcnul_mode |
| |
| from exceptions import ConfigurationError |
| from logger import EnodebdLogger as logger |
| |
| |
| SingleEnodebConfig = namedtuple( |
| 'SingleEnodebConfig', |
| [ |
| 'earfcndl', 'subframe_assignment', |
| 'special_subframe_pattern', |
| 'pci', 'plmnid_list', 'tac', |
| 'bandwidth_mhz', 'cell_id', |
| 'allow_enodeb_transmit', |
| 'mme_address', 'mme_port', |
| ], |
| ) |
| |
| class EnodebConfiguration(): |
| """ |
| This represents the data model configuration for a single |
| eNodeB device. This can correspond to either the current configuration |
| of the device, or what configuration we desire to have for the device. |
| """ |
| |
| def __init__(self, data_model: DataModel) -> None: |
| """ |
| The fields initialized in the constructor here should be enough to |
| track state across any data model configuration. |
| |
| Most objects for eNodeB data models cannot be added or deleted. |
| For those objects, we just track state with a simple mapping from |
| parameter name to value. |
| |
| For objects which can be added/deleted, we track them separately. |
| """ |
| |
| # DataModel |
| self._data_model = data_model |
| |
| # Dict[ParameterName, Any] |
| self._param_to_value = {} |
| |
| # Dict[ParameterName, Dict[ParameterName, Any]] |
| self._numbered_objects = {} |
| # If adding a PLMN object, then you would set something like |
| # self._numbered_objects['PLMN_1'] = {'PLMN_1_ENABLED': True} |
| |
| @property |
| def data_model(self) -> DataModel: |
| """ |
| The data model configuration is tied to a single data model |
| """ |
| return self._data_model |
| |
| def __str__(self) -> str: |
| return str(self._param_to_value) |
| |
| def get_parameter_names(self) -> List[ParameterName]: |
| """ |
| Returns: list of ParameterName |
| """ |
| return list(self._param_to_value.keys()) |
| |
| def has_parameter(self, param_name: ParameterName) -> bool: |
| return param_name in self._param_to_value |
| |
| def get_parameter(self, param_name: ParameterName) -> Any: |
| """ |
| Args: |
| param_name: ParameterName |
| Returns: |
| Any, value of the parameter, formatted to be understood by enodebd |
| """ |
| self._assert_param_in_model(param_name) |
| return self._param_to_value[param_name] |
| |
| def set_parameter( |
| self, |
| param_name: ParameterName, |
| value: Any, |
| ) -> None: |
| """ |
| Args: |
| param_name: the parameter name to configure |
| value: the value to set, formatted to be understood by enodebd |
| """ |
| self._assert_param_in_model(param_name) |
| self._param_to_value[param_name] = value |
| |
| def set_parameter_if_present(self, parameter_name: ParameterName, value: Any) -> None: |
| """ |
| Args: |
| param_name: the parameter name to configure |
| value: the value to set, formatted to be understood by enodebd |
| """ |
| |
| trparam_model = self.data_model |
| tr_param = trparam_model.get_parameter(parameter_name) |
| if tr_param is not None: |
| self._param_to_value[parameter_name] = value |
| |
| def delete_parameter(self, param_name: ParameterName) -> None: |
| del self._param_to_value[param_name] |
| |
| def get_object_names(self) -> List[ParameterName]: |
| return list(self._numbered_objects.keys()) |
| |
| def has_object(self, param_name: ParameterName) -> bool: |
| """ |
| Args: |
| param_name: The ParameterName of the object |
| Returns: True if set in configuration |
| """ |
| self._assert_param_in_model(param_name) |
| return param_name in self._numbered_objects |
| |
| def add_object(self, param_name: ParameterName) -> None: |
| if param_name in self._numbered_objects: |
| raise ConfigurationError("Configuration already has object") |
| self._numbered_objects[param_name] = {} |
| |
| def delete_object(self, param_name: ParameterName) -> None: |
| if param_name not in self._numbered_objects: |
| raise ConfigurationError("Configuration does not have object") |
| del self._numbered_objects[param_name] |
| |
| def get_parameter_for_object( |
| self, |
| param_name: ParameterName, |
| object_name: ParameterName, |
| ) -> Any: |
| return self._numbered_objects[object_name].get(param_name) |
| |
| def set_parameter_for_object( |
| self, |
| param_name: ParameterName, |
| value: Any, |
| object_name: ParameterName, |
| ) -> None: |
| """ |
| Args: |
| param_name: the parameter name to configure |
| value: the value to set, formatted to be understood by enodebd |
| object_name: ParameterName of object |
| """ |
| self._assert_param_in_model(object_name) |
| self._assert_param_in_model(param_name) |
| self._numbered_objects[object_name][param_name] = value |
| |
| def get_parameter_names_for_object( |
| self, |
| object_name: ParameterName, |
| ) -> List[ParameterName]: |
| return list(self._numbered_objects[object_name].keys()) |
| |
| def get_debug_info(self) -> str: |
| debug_info = 'Param values: {}, \n Object values: {}' |
| return debug_info.format( |
| json.dumps(self._param_to_value, indent=2), |
| json.dumps( |
| self._numbered_objects, |
| indent=2, |
| ), |
| ) |
| |
| def _assert_param_in_model(self, param_name: ParameterName) -> None: |
| trparam_model = self.data_model |
| tr_param = trparam_model.get_parameter(param_name) |
| if tr_param is None: |
| logger.warning('Parameter <%s> not defined in model', param_name) |
| raise ConfigurationError("Parameter %s not defined in model." % param_name) |
| |
| def check_desired_configuration(self, current_config, desired_config: dict) -> bool: |
| def config_assert(condition: bool, message: str = None) -> None: |
| """ To be used in place of 'assert' so that ConfigurationError is raised |
| for all config-related exceptions. """ |
| if not condition: |
| raise ConfigurationError(message) |
| |
| # _set_earfcn_freq_band_mode |
| # Originally: |
| # mconfig: loaded from proto definiation |
| # service_config: loaded from mconfig |
| # device_config: retrieved configuration from enodeb |
| # data_model: defined in device parameter |
| |
| # device_config = config loaded from enodeb, desired_config = config loaded from file |
| |
| # _check_earfcn_freqw_band_mode |
| try: |
| band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"]) |
| except ValueError as err: |
| raise ConfigurationError(err) |
| |
| if current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY): |
| duplex_capability = current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY) |
| if duplex_mode == DuplexMode.TDD and duplex_capability != "TDDMode": |
| raise ConfigurationError("Duplex mode TDD is not supported by eNodeB") |
| elif duplex_mode == DuplexMode.FDD and duplex_capability != "FDDMode": |
| raise ConfigurationError("Duplex mode FDD is not supported by eNodeB") |
| elif duplex_mode not in [DuplexMode.TDD, DuplexMode.FDD]: |
| raise ConfigurationError("Invalid duplex mode") |
| |
| if current_config.has_parameter(ParameterName.BAND_CAPABILITY): |
| band_capability = current_config.get_parameter(ParameterName.BAND_CAPABILITY).split(',') |
| if str(band) not in band_capability: |
| logger.warning("Band %d not in capability list %s", band, band_capability) |
| |
| # _check_tdd_subframe_config |
| config_assert( |
| desired_config["subframe_assignment"] in range(0, 7), |
| "Invalid TDD special subframe assignment (%d)" % desired_config["subframe_assignment"], |
| ) |
| config_assert( |
| desired_config["special_subframe_pattern"] in range(0, 10), |
| "Invalid TDD special subframe pattern (%d)" % desired_config["special_subframe_pattern"], |
| ) |
| |
| # _check_plmnids_tac |
| for char in str(desired_config["plmn_list"]): |
| config_assert(char in "0123456789, ", "Invalid PLMNID (%s)" % desired_config["plmn_list"]) |
| |
| # TODO - add support for multiple PLMNIDs |
| plmnid_list = str(desired_config["plmn_list"]).split(",") |
| config_assert(len(plmnid_list) == 1, "Only 1 PLMNID is supported") |
| config_assert(len(plmnid_list[0]) <= 6, "PLMNID must be length <= 6 (%s)" % plmnid_list[0]) |
| |
| # _check_s1_connection_configuration |
| config_assert(type(desired_config["mme_address"]) is str, "Invalid MME address") |
| config_assert(type(desired_config["mme_port"]) is int, "Invalid MME port") |
| |
| def apply_desired_configuration(self, current_config, desired_config: SingleEnodebConfig) -> None: |
| |
| # _set_earfcn_freq_band_mode |
| self.set_parameter(ParameterName.EARFCNDL, desired_config["earfcn_downlink1"]) |
| band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"]) |
| if duplex_mode == DuplexMode.FDD: |
| self.set_parameter(ParameterName.EARFCNUL, desired_config["earfcn_uplink1"]) |
| self.set_parameter_if_present(ParameterName.BAND, band) |
| |
| # _set_tdd_subframe_config |
| if (current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY) |
| and current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY) == "TDDMode"): |
| self.set_parameter(ParameterName.SUBFRAME_ASSIGNMENT, desired_config["subframe_assignment"]) |
| self.set_parameter(ParameterName.SPECIAL_SUBFRAME_PATTERN, desired_config["special_subframe_pattern"]) |
| |
| # _set_plmnids_tac |
| plmnid_list = str(desired_config["plmn_list"]).split(",") |
| for i in range(1, 2): |
| object_name = ParameterName.PLMN_N % i |
| enable_plmn = i == 1 |
| self.add_object(object_name) |
| self.set_parameter_for_object(ParameterName.PLMN_N_ENABLE % i, enable_plmn, object_name) |
| if enable_plmn: |
| self.set_parameter_for_object(ParameterName.PLMN_N_CELL_RESERVED % i, False, object_name) |
| self.set_parameter_for_object(ParameterName.PLMN_N_PRIMARY % i, enable_plmn, object_name) |
| self.set_parameter_for_object(ParameterName.PLMN_N_PLMNID % i, plmnid_list[i - 1], object_name) |
| self.set_parameter(ParameterName.TAC1, desired_config["tac1"]) |
| |
| # _set_bandwidth |
| # FIXME: The DL_BANDWIDTH update request wasn't able to be accepted by enodeb |
| self.set_parameter(ParameterName.DL_BANDWIDTH, desired_config["downlink_bandwidth"]) |
| self.set_parameter(ParameterName.UL_BANDWIDTH, desired_config["uplink_bandwidth"]) |
| |
| # _set_cell_id |
| self.set_parameter(ParameterName.CELL_ID, desired_config["cell_id"]) |
| |
| # _set_misc_static_params |
| self.set_parameter_if_present(ParameterName.LOCAL_GATEWAY_ENABLE, 0) |
| self.set_parameter_if_present(ParameterName.GPS_ENABLE, True) |
| self.set_parameter_if_present(ParameterName.IP_SEC_ENABLE, False) |
| self.set_parameter_if_present(ParameterName.CELL_RESERVED, False) |
| self.set_parameter_if_present(ParameterName.MME_POOL_ENABLE, False) |
| |
| # _set_s1_connection_configuration |
| self.set_parameter(ParameterName.MME_ADDRESS, desired_config["mme_address"]) |
| self.set_parameter(ParameterName.MME_PORT, desired_config["mme_port"]) |
| |
| # enable LTE if we should |
| self.set_parameter(ParameterName.ADMIN_STATE, desired_config["admin_state"]) |
| |
| # These parameters are already configured above |
| exclude_list = [ |
| "earfcn_downlink1", "earfcn_uplink1", "subframe_assignment", "special_subframe_pattern", |
| "plmnid", "tac1", "downlink_bandwidth", "uplink_bandwidth", "cell_id", |
| "mme_address", "mme_port", "admin_state" |
| ] |
| |
| # Configure the additional parameters which are set in enodeb config files |
| for name, value in desired_config.items(): |
| if name not in exclude_list: |
| self.set_parameter_if_present(name, value) |