blob: 8a7050b185b6b08afa41fd2b0d5d0fd0a2134e8d [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import json
7from typing import Any, List
8
9from data_models.data_model import DataModel
10from data_models.data_model_parameters import ParameterName
Wei-Yu Chen8d064162022-05-27 21:06:55 +080011
12from collections import namedtuple
13
14from lte_utils import DuplexMode, map_earfcndl_to_band_earfcnul_mode
15
Wei-Yu Chen49950b92021-11-08 19:19:18 +080016from exceptions import ConfigurationError
17from logger import EnodebdLogger as logger
18
19
Wei-Yu Chen8d064162022-05-27 21:06:55 +080020SingleEnodebConfig = namedtuple(
21 'SingleEnodebConfig',
22 [
23 'earfcndl', 'subframe_assignment',
24 'special_subframe_pattern',
25 'pci', 'plmnid_list', 'tac',
26 'bandwidth_mhz', 'cell_id',
27 'allow_enodeb_transmit',
28 'mme_address', 'mme_port',
29 ],
30)
31
Wei-Yu Chen49950b92021-11-08 19:19:18 +080032class EnodebConfiguration():
33 """
34 This represents the data model configuration for a single
35 eNodeB device. This can correspond to either the current configuration
36 of the device, or what configuration we desire to have for the device.
37 """
38
39 def __init__(self, data_model: DataModel) -> None:
40 """
41 The fields initialized in the constructor here should be enough to
42 track state across any data model configuration.
43
44 Most objects for eNodeB data models cannot be added or deleted.
45 For those objects, we just track state with a simple mapping from
46 parameter name to value.
47
48 For objects which can be added/deleted, we track them separately.
49 """
50
51 # DataModel
52 self._data_model = data_model
53
54 # Dict[ParameterName, Any]
55 self._param_to_value = {}
56
57 # Dict[ParameterName, Dict[ParameterName, Any]]
58 self._numbered_objects = {}
59 # If adding a PLMN object, then you would set something like
60 # self._numbered_objects['PLMN_1'] = {'PLMN_1_ENABLED': True}
61
62 @property
63 def data_model(self) -> DataModel:
64 """
65 The data model configuration is tied to a single data model
66 """
67 return self._data_model
68
69 def get_parameter_names(self) -> List[ParameterName]:
70 """
71 Returns: list of ParameterName
72 """
73 return list(self._param_to_value.keys())
74
75 def has_parameter(self, param_name: ParameterName) -> bool:
76 return param_name in self._param_to_value
77
78 def get_parameter(self, param_name: ParameterName) -> Any:
79 """
80 Args:
81 param_name: ParameterName
82 Returns:
83 Any, value of the parameter, formatted to be understood by enodebd
84 """
85 self._assert_param_in_model(param_name)
86 return self._param_to_value[param_name]
87
88 def set_parameter(
89 self,
90 param_name: ParameterName,
91 value: Any,
92 ) -> None:
93 """
94 Args:
95 param_name: the parameter name to configure
96 value: the value to set, formatted to be understood by enodebd
97 """
98 self._assert_param_in_model(param_name)
99 self._param_to_value[param_name] = value
100
Wei-Yu Chen8d064162022-05-27 21:06:55 +0800101 def set_parameter_if_present(self, parameter_name: ParameterName, value: Any) -> None:
102 """
103 Args:
104 param_name: the parameter name to configure
105 value: the value to set, formatted to be understood by enodebd
106 """
107
108 trparam_model = self.data_model
109 tr_param = trparam_model.get_parameter(parameter_name)
110 if tr_param is not None:
111 self._param_to_value[parameter_name] = value
112
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800113 def delete_parameter(self, param_name: ParameterName) -> None:
114 del self._param_to_value[param_name]
115
116 def get_object_names(self) -> List[ParameterName]:
117 return list(self._numbered_objects.keys())
118
119 def has_object(self, param_name: ParameterName) -> bool:
120 """
121 Args:
122 param_name: The ParameterName of the object
123 Returns: True if set in configuration
124 """
125 self._assert_param_in_model(param_name)
126 return param_name in self._numbered_objects
127
128 def add_object(self, param_name: ParameterName) -> None:
129 if param_name in self._numbered_objects:
130 raise ConfigurationError("Configuration already has object")
131 self._numbered_objects[param_name] = {}
132
133 def delete_object(self, param_name: ParameterName) -> None:
134 if param_name not in self._numbered_objects:
135 raise ConfigurationError("Configuration does not have object")
136 del self._numbered_objects[param_name]
137
138 def get_parameter_for_object(
139 self,
140 param_name: ParameterName,
141 object_name: ParameterName,
142 ) -> Any:
143 return self._numbered_objects[object_name].get(param_name)
144
145 def set_parameter_for_object(
146 self,
147 param_name: ParameterName,
148 value: Any,
149 object_name: ParameterName,
150 ) -> None:
151 """
152 Args:
153 param_name: the parameter name to configure
154 value: the value to set, formatted to be understood by enodebd
155 object_name: ParameterName of object
156 """
157 self._assert_param_in_model(object_name)
158 self._assert_param_in_model(param_name)
159 self._numbered_objects[object_name][param_name] = value
160
161 def get_parameter_names_for_object(
162 self,
163 object_name: ParameterName,
164 ) -> List[ParameterName]:
165 return list(self._numbered_objects[object_name].keys())
166
167 def get_debug_info(self) -> str:
168 debug_info = 'Param values: {}, \n Object values: {}'
169 return debug_info.format(
170 json.dumps(self._param_to_value, indent=2),
171 json.dumps(
172 self._numbered_objects,
173 indent=2,
174 ),
175 )
176
177 def _assert_param_in_model(self, param_name: ParameterName) -> None:
178 trparam_model = self.data_model
179 tr_param = trparam_model.get_parameter(param_name)
180 if tr_param is None:
181 logger.warning('Parameter <%s> not defined in model', param_name)
Wei-Yu Chen5cbdfbb2021-12-02 01:10:21 +0800182 raise ConfigurationError("Parameter %s not defined in model." % param_name)
Wei-Yu Chen8d064162022-05-27 21:06:55 +0800183
184 def check_desired_configuration(self, current_config, desired_config: dict) -> bool:
185 def config_assert(condition: bool, message: str = None) -> None:
186 """ To be used in place of 'assert' so that ConfigurationError is raised
187 for all config-related exceptions. """
188 if not condition:
189 raise ConfigurationError(message)
190
191 # _set_earfcn_freq_band_mode
192 # Originally:
193 # mconfig: loaded from proto definiation
194 # service_config: loaded from mconfig
195 # device_config: retrieved configuration from enodeb
196 # data_model: defined in device parameter
197
198 # device_config = config loaded from enodeb, desired_config = config loaded from file
199
200 # _check_earfcn_freqw_band_mode
201 try:
202 band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"])
203 except ValueError as err:
204 raise ConfigurationError(err)
205
206 if current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY):
207 duplex_capability = current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY)
208 if duplex_mode == DuplexMode.TDD and duplex_capability != "TDDMode":
209 raise ConfigurationError("Duplex mode TDD is not supported by eNodeB")
210 elif duplex_mode == DuplexMode.FDD and duplex_capability != "FDDMode":
211 raise ConfigurationError("Duplex mode FDD is not supported by eNodeB")
212 elif duplex_mode not in [DuplexMode.TDD, DuplexMode.FDD]:
213 raise ConfigurationError("Invalid duplex mode")
214
215 if current_config.has_parameter(ParameterName.BAND_CAPABILITY):
216 band_capability = current_config.get_parameter(ParameterName.BAND_CAPABILITY).split(',')
217 if str(band) not in band_capability:
218 logger.warning("Band %d not in capability list %s", band, band_capability)
219
220 # _check_tdd_subframe_config
221 config_assert(
222 desired_config["subframe_assignment"] in range(0, 7),
223 "Invalid TDD special subframe assignment (%d)" % desired_config["subframe_assignment"],
224 )
225 config_assert(
226 desired_config["special_subframe_pattern"] in range(0, 10),
227 "Invalid TDD special subframe pattern (%d)" % desired_config["special_subframe_pattern"],
228 )
229
230 # _check_plmnids_tac
231 for char in str(desired_config["plmn_list"]):
232 config_assert(char in "0123456789, ", "Invalid PLMNID (%s)" % desired_config["plmn_list"])
233
234 # TODO - add support for multiple PLMNIDs
235 plmnid_list = str(desired_config["plmn_list"]).split(",")
236 config_assert(len(plmnid_list) == 1, "Only 1 PLMNID is supported")
237 config_assert(len(plmnid_list[0]) <= 6, "PLMNID must be length <= 6 (%s)" % plmnid_list[0])
238
239 # _check_s1_connection_configuration
240 config_assert(type(desired_config["mme_address"]) is str, "Invalid MME address")
241 config_assert(type(desired_config["mme_port"]) is int, "Invalid MME port")
242
243 def apply_desired_configuration(self, current_config, desired_config: SingleEnodebConfig) -> None:
244
245 # _set_earfcn_freq_band_mode
246 self.set_parameter(ParameterName.EARFCNDL, desired_config["earfcn_downlink1"])
247 band, duplex_mode, _ = map_earfcndl_to_band_earfcnul_mode(desired_config["earfcn_downlink1"])
248 if duplex_mode == DuplexMode.FDD:
249 self.set_parameter(ParameterName.EARFCNUL, desired_config["earfcn_uplink1"])
250 self.set_parameter_if_present(ParameterName.BAND, band)
251
252 # _set_tdd_subframe_config
253 if (current_config.has_parameter(ParameterName.DUPLEX_MODE_CAPABILITY)
254 and current_config.get_parameter(ParameterName.DUPLEX_MODE_CAPABILITY) == "TDDMode"):
255 self.set_parameter(ParameterName.SUBFRAME_ASSIGNMENT, desired_config["subframe_assignment"])
256 self.set_parameter(ParameterName.SPECIAL_SUBFRAME_PATTERN, desired_config["special_subframe_pattern"])
257
258 # _set_plmnids_tac
259 plmnid_list = str(desired_config["plmn_list"]).split(",")
260 for i in range(1, 2):
261 object_name = ParameterName.PLMN_N % i
262 enable_plmn = i == 1
263 self.add_object(object_name)
264 self.set_parameter_for_object(ParameterName.PLMN_N_ENABLE % i, enable_plmn, object_name)
265 if enable_plmn:
266 self.set_parameter_for_object(ParameterName.PLMN_N_CELL_RESERVED % i, False, object_name)
267 self.set_parameter_for_object(ParameterName.PLMN_N_PRIMARY % i, enable_plmn, object_name)
268 self.set_parameter_for_object(ParameterName.PLMN_N_PLMNID % i, plmnid_list[i - 1], object_name)
269 self.set_parameter(ParameterName.TAC1, desired_config["tac1"])
270
271 # _set_bandwidth
272 self.set_parameter(ParameterName.DL_BANDWIDTH, desired_config["downlink_bandwidth"])
273 self.set_parameter(ParameterName.UL_BANDWIDTH, desired_config["uplink_bandwidth"])
274
275 # _set_cell_id
276 self.set_parameter(ParameterName.CELL_ID, desired_config["cell_id"])
277
278 # _set_misc_static_params
279 self.set_parameter_if_present(ParameterName.LOCAL_GATEWAY_ENABLE, 0)
280 self.set_parameter_if_present(ParameterName.GPS_ENABLE, True)
281 self.set_parameter_if_present(ParameterName.IP_SEC_ENABLE, False)
282 self.set_parameter_if_present(ParameterName.CELL_RESERVED, False)
283 self.set_parameter_if_present(ParameterName.MME_POOL_ENABLE, False)
284
285 # _set_s1_connection_configuration
286 self.set_parameter(ParameterName.MME_ADDRESS, desired_config["mme_address"])
287 self.set_parameter(ParameterName.MME_PORT, desired_config["mme_port"])
288
289 # enable LTE if we should
290 self.set_parameter(ParameterName.ADMIN_STATE, desired_config["admin_state"])
291
292 # These parameters are already configured at above
293 exclude_list = [
294 "earfcn_downlink1", "earfcn_uplink1", "subframe_assignment", "special_subframe_pattern",
295 "plmnid", "tac1", "downlink_bandwidth", "uplink_bandwidth", "cell_id",
296 "mme_address", "mme_port", "admin_state"
297 ]
298
299 # Configure the additional parameters which are set in enodeb config files
300 for name, value in desired_config.items():
301 if name not in exclude_list:
302 self.set_parameter_if_present(name, value)