AETHER-3573 Simplify configuration files in enodebd
Change-Id: I048d743c0677c85244b87a6c6444f39c06f6bf4b
diff --git a/device_config/enodeb_configuration.py b/device_config/enodeb_configuration.py
index 2b35271..8a7050b 100644
--- a/device_config/enodeb_configuration.py
+++ b/device_config/enodeb_configuration.py
@@ -8,10 +8,27 @@
from data_models.data_model import DataModel
from data_models.data_model_parameters import ParameterName
+
+from collections import namedtuple
+
+from lte_utils import DuplexMode, map_earfcndl_to_band_earfcnul_mode
+
from exceptions import ConfigurationError
from logger import EnodebdLogger as logger
+SingleEnodebConfig = namedtuple(
+ 'SingleEnodebConfig',
+ [
+ 'earfcndl', 'subframe_assignment',
+ 'special_subframe_pattern',
+ 'pci', 'plmnid_list', 'tac',
+ 'bandwidth_mhz', 'cell_id',
+ 'allow_enodeb_transmit',
+ 'mme_address', 'mme_port',
+ ],
+)
+
class EnodebConfiguration():
"""
This represents the data model configuration for a single
@@ -81,6 +98,18 @@
self._assert_param_in_model(param_name)
self._param_to_value[param_name] = value
+ def set_parameter_if_present(self, parameter_name: ParameterName, value: Any) -> None:
+ """
+ Args:
+ param_name: the parameter name to configure
+ value: the value to set, formatted to be understood by enodebd
+ """
+
+ trparam_model = self.data_model
+ tr_param = trparam_model.get_parameter(parameter_name)
+ if tr_param is not None:
+ self._param_to_value[parameter_name] = value
+
def delete_parameter(self, param_name: ParameterName) -> None:
del self._param_to_value[param_name]
@@ -151,3 +180,123 @@
if tr_param is None:
logger.warning('Parameter <%s> not defined in model', param_name)
raise ConfigurationError("Parameter %s not defined in model." % param_name)
+
+ def check_desired_configuration(self, current_config, desired_config: dict) -> bool:
+ def config_assert(condition: bool, message: str = None) -> None:
+ """ To be used in place of 'assert' so that ConfigurationError is raised
+ for all config-related exceptions. """
+ if not condition:
+ raise ConfigurationError(message)
+
+ # _set_earfcn_freq_band_mode
+ # Originally:
+ # mconfig: loaded from proto definiation
+ # service_config: loaded from mconfig
+ # device_config: retrieved configuration from enodeb
+ # data_model: defined in device parameter
+
+ # device_config = config loaded from enodeb, desired_config = config loaded from file
+
+ # _check_earfcn_freqw_band_mode
+ try:
+ band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"])
+ except ValueError as err:
+ raise ConfigurationError(err)
+
+ if current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY):
+ duplex_capability = current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY)
+ if duplex_mode == DuplexMode.TDD and duplex_capability != "TDDMode":
+ raise ConfigurationError("Duplex mode TDD is not supported by eNodeB")
+ elif duplex_mode == DuplexMode.FDD and duplex_capability != "FDDMode":
+ raise ConfigurationError("Duplex mode FDD is not supported by eNodeB")
+ elif duplex_mode not in [DuplexMode.TDD, DuplexMode.FDD]:
+ raise ConfigurationError("Invalid duplex mode")
+
+ if current_config.has_parameter(ParameterName.BAND_CAPABILITY):
+ band_capability = current_config.get_parameter(ParameterName.BAND_CAPABILITY).split(',')
+ if str(band) not in band_capability:
+ logger.warning("Band %d not in capability list %s", band, band_capability)
+
+ # _check_tdd_subframe_config
+ config_assert(
+ desired_config["subframe_assignment"] in range(0, 7),
+ "Invalid TDD special subframe assignment (%d)" % desired_config["subframe_assignment"],
+ )
+ config_assert(
+ desired_config["special_subframe_pattern"] in range(0, 10),
+ "Invalid TDD special subframe pattern (%d)" % desired_config["special_subframe_pattern"],
+ )
+
+ # _check_plmnids_tac
+ for char in str(desired_config["plmn_list"]):
+ config_assert(char in "0123456789, ", "Invalid PLMNID (%s)" % desired_config["plmn_list"])
+
+ # TODO - add support for multiple PLMNIDs
+ plmnid_list = str(desired_config["plmn_list"]).split(",")
+ config_assert(len(plmnid_list) == 1, "Only 1 PLMNID is supported")
+ config_assert(len(plmnid_list[0]) <= 6, "PLMNID must be length <= 6 (%s)" % plmnid_list[0])
+
+ # _check_s1_connection_configuration
+ config_assert(type(desired_config["mme_address"]) is str, "Invalid MME address")
+ config_assert(type(desired_config["mme_port"]) is int, "Invalid MME port")
+
+ def apply_desired_configuration(self, current_config, desired_config: SingleEnodebConfig) -> None:
+
+ # _set_earfcn_freq_band_mode
+ self.set_parameter(ParameterName.EARFCNDL, desired_config["earfcn_downlink1"])
+ band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"])
+ if duplex_mode == DuplexMode.FDD:
+ self.set_parameter(ParameterName.EARFCNUL, desired_config["earfcn_uplink1"])
+ self.set_parameter_if_present(ParameterName.BAND, band)
+
+ # _set_tdd_subframe_config
+ if (current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY)
+ and current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY) == "TDDMode"):
+ self.set_parameter(ParameterName.SUBFRAME_ASSIGNMENT, desired_config["subframe_assignment"])
+ self.set_parameter(ParameterName.SPECIAL_SUBFRAME_PATTERN, desired_config["special_subframe_pattern"])
+
+ # _set_plmnids_tac
+ plmnid_list = str(desired_config["plmn_list"]).split(",")
+ for i in range(1, 2):
+ object_name = ParameterName.PLMN_N % i
+ enable_plmn = i == 1
+ self.add_object(object_name)
+ self.set_parameter_for_object(ParameterName.PLMN_N_ENABLE % i, enable_plmn, object_name)
+ if enable_plmn:
+ self.set_parameter_for_object(ParameterName.PLMN_N_CELL_RESERVED % i, False, object_name)
+ self.set_parameter_for_object(ParameterName.PLMN_N_PRIMARY % i, enable_plmn, object_name)
+ self.set_parameter_for_object(ParameterName.PLMN_N_PLMNID % i, plmnid_list[i - 1], object_name)
+ self.set_parameter(ParameterName.TAC1, desired_config["tac1"])
+
+ # _set_bandwidth
+ self.set_parameter(ParameterName.DL_BANDWIDTH, desired_config["downlink_bandwidth"])
+ self.set_parameter(ParameterName.UL_BANDWIDTH, desired_config["uplink_bandwidth"])
+
+ # _set_cell_id
+ self.set_parameter(ParameterName.CELL_ID, desired_config["cell_id"])
+
+ # _set_misc_static_params
+ self.set_parameter_if_present(ParameterName.LOCAL_GATEWAY_ENABLE, 0)
+ self.set_parameter_if_present(ParameterName.GPS_ENABLE, True)
+ self.set_parameter_if_present(ParameterName.IP_SEC_ENABLE, False)
+ self.set_parameter_if_present(ParameterName.CELL_RESERVED, False)
+ self.set_parameter_if_present(ParameterName.MME_POOL_ENABLE, False)
+
+ # _set_s1_connection_configuration
+ self.set_parameter(ParameterName.MME_ADDRESS, desired_config["mme_address"])
+ self.set_parameter(ParameterName.MME_PORT, desired_config["mme_port"])
+
+ # enable LTE if we should
+ self.set_parameter(ParameterName.ADMIN_STATE, desired_config["admin_state"])
+
+ # These parameters are already configured at above
+ exclude_list = [
+ "earfcn_downlink1", "earfcn_uplink1", "subframe_assignment", "special_subframe_pattern",
+ "plmnid", "tac1", "downlink_bandwidth", "uplink_bandwidth", "cell_id",
+ "mme_address", "mme_port", "admin_state"
+ ]
+
+ # Configure the additional parameters which are set in enodeb config files
+ for name, value in desired_config.items():
+ if name not in exclude_list:
+ self.set_parameter_if_present(name, value)