Wei-Yu Chen | ad55cb8 | 2022-02-15 20:07:01 +0800 | [diff] [blame] | 1 | # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| 2 | # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| 3 | # |
| 4 | # SPDX-License-Identifier: BSD-3-Clause |
Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 5 | |
| 6 | from typing import Any, Dict, List, Optional |
| 7 | |
| 8 | from data_models.data_model import DataModel |
| 9 | from data_models.data_model_parameters import ParameterName |
| 10 | from device_config.enodeb_configuration import EnodebConfiguration |
| 11 | from devices.device_utils import EnodebDeviceName, get_device_name |
| 12 | from exceptions import ConfigurationError |
| 13 | from logger import EnodebdLogger as logger |
| 14 | from tr069 import models |
| 15 | |
| 16 | |
| 17 | def process_inform_message( |
| 18 | inform: Any, |
| 19 | data_model: DataModel, |
| 20 | device_cfg: EnodebConfiguration, |
| 21 | ) -> None: |
| 22 | """ |
| 23 | Modifies the device configuration based on what is received in the Inform |
| 24 | message. Will raise an error if it turns out that the data model we are |
| 25 | using is incorrect. This is decided based on the device OUI and |
| 26 | software-version that is reported in the Inform message. |
| 27 | |
| 28 | Args: |
| 29 | inform: Inform Tr069 message |
| 30 | device_handler: The state machine we are using for our device |
| 31 | """ |
| 32 | param_values_by_path = _get_param_values_by_path(inform) |
| 33 | param_name_list = data_model.get_parameter_names() |
| 34 | name_to_val = {} |
| 35 | for name in param_name_list: |
| 36 | path = data_model.get_parameter(name).path |
| 37 | if path in param_values_by_path: |
| 38 | value = param_values_by_path[path] |
| 39 | name_to_val[name] = value |
| 40 | |
| 41 | for name, val in name_to_val.items(): |
| 42 | device_cfg.set_parameter(name, val) |
| 43 | |
| 44 | |
| 45 | def get_device_name_from_inform( |
| 46 | inform: models.Inform, |
| 47 | ) -> EnodebDeviceName: |
| 48 | def _get_param_value_from_path_suffix( |
| 49 | suffix: str, |
| 50 | path_list: List[str], |
| 51 | param_values_by_path: Dict[str, Any], |
| 52 | ) -> Any: |
| 53 | for path in path_list: |
| 54 | if path.endswith(suffix): |
| 55 | return param_values_by_path[path] |
| 56 | raise ConfigurationError('Did not receive expected info in Inform') |
| 57 | |
| 58 | param_values_by_path = _get_param_values_by_path(inform) |
| 59 | |
| 60 | # Check the OUI and version number to see if the data model matches |
| 61 | path_list = list(param_values_by_path.keys()) |
| 62 | if hasattr(inform, 'DeviceId') and \ |
| 63 | hasattr(inform.DeviceId, 'OUI'): |
| 64 | device_oui = inform.DeviceId.OUI |
| 65 | else: |
| 66 | device_oui = _get_param_value_from_path_suffix( |
| 67 | 'DeviceInfo.ManufacturerOUI', |
| 68 | path_list, |
| 69 | param_values_by_path, |
| 70 | ) |
| 71 | sw_version = _get_param_value_from_path_suffix( |
| 72 | 'DeviceInfo.SoftwareVersion', |
| 73 | path_list, |
| 74 | param_values_by_path, |
| 75 | ) |
| 76 | return get_device_name(device_oui, sw_version) |
| 77 | |
| 78 | |
| 79 | def does_inform_have_event( |
| 80 | inform: models.Inform, |
| 81 | event_code: str, |
| 82 | ) -> bool: |
| 83 | """ True if the Inform message contains the specified event code """ |
| 84 | for event in inform.Event.EventStruct: |
| 85 | if event.EventCode == event_code: |
| 86 | return True |
| 87 | return False |
| 88 | |
| 89 | |
| 90 | def _get_param_values_by_path( |
| 91 | inform: models.Inform, |
| 92 | ) -> Dict[str, Any]: |
| 93 | if not hasattr(inform, 'ParameterList') or \ |
| 94 | not hasattr(inform.ParameterList, 'ParameterValueStruct'): |
| 95 | raise ConfigurationError('Did not receive ParamterList in Inform') |
| 96 | param_values_by_path = {} |
| 97 | for param_value in inform.ParameterList.ParameterValueStruct: |
| 98 | path = param_value.Name |
| 99 | value = param_value.Value.Data |
| 100 | logger.debug( |
| 101 | '(Inform msg) Received parameter: %s = %s', path, |
| 102 | value, |
| 103 | ) |
| 104 | param_values_by_path[path] = value |
| 105 | return param_values_by_path |
| 106 | |
| 107 | |
| 108 | def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool: |
| 109 | """ |
| 110 | Compare two parameters in TR-069 format. |
| 111 | The following differences are ignored: |
| 112 | - Leading and trailing whitespace, commas and quotes |
| 113 | - Capitalization, for booleans (true, false) |
| 114 | Returns: |
| 115 | True if params are the same |
| 116 | """ |
| 117 | # Cast booleans to integers |
| 118 | cmp_a, cmp_b = param_a, param_b |
| 119 | if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'): |
| 120 | cmp_a, cmp_b = map(int, (cmp_a, cmp_b)) |
| 121 | cmp_a, cmp_b = map(str, (cmp_a, cmp_b)) |
| 122 | cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b)) |
| 123 | if cmp_a.lower() in ['true', 'false']: |
| 124 | cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b)) |
| 125 | return cmp_a == cmp_b |
| 126 | |
| 127 | |
| 128 | def get_all_objects_to_add( |
| 129 | desired_cfg: EnodebConfiguration, |
| 130 | device_cfg: EnodebConfiguration, |
| 131 | ) -> List[ParameterName]: |
| 132 | """ |
| 133 | Find a ParameterName that needs to be added to the eNB configuration, |
| 134 | if any |
| 135 | |
| 136 | Note: This is the expected name of the parameter once it is added |
| 137 | but this is different than how to add it. For example, |
| 138 | enumerated objects of the form XX.YY.N. should be added |
| 139 | by calling AddObject to XX.YY. and having the CPE assign |
| 140 | the index. |
| 141 | """ |
| 142 | desired = desired_cfg.get_object_names() |
| 143 | current = device_cfg.get_object_names() |
| 144 | return list(set(desired).difference(set(current))) |
| 145 | |
| 146 | |
| 147 | def get_all_objects_to_delete( |
| 148 | desired_cfg: EnodebConfiguration, |
| 149 | device_cfg: EnodebConfiguration, |
| 150 | ) -> List[ParameterName]: |
| 151 | """ |
| 152 | Find a ParameterName that needs to be deleted from the eNB configuration, |
| 153 | if any |
| 154 | """ |
| 155 | desired = desired_cfg.get_object_names() |
| 156 | current = device_cfg.get_object_names() |
| 157 | return list(set(current).difference(set(desired))) |
| 158 | |
| 159 | |
| 160 | def get_params_to_get( |
| 161 | device_cfg: EnodebConfiguration, |
| 162 | data_model: DataModel, |
| 163 | request_all_params: bool = False, |
| 164 | ) -> List[ParameterName]: |
| 165 | """ |
| 166 | Returns the names of params not belonging to objects that are added/removed |
| 167 | """ |
| 168 | desired_names = data_model.get_present_params() |
| 169 | if request_all_params: |
| 170 | return desired_names |
| 171 | known_names = device_cfg.get_parameter_names() |
| 172 | names = list(set(desired_names) - set(known_names)) |
| 173 | return names |
| 174 | |
| 175 | |
| 176 | def get_object_params_to_get( |
| 177 | desired_cfg: Optional[EnodebConfiguration], |
| 178 | device_cfg: EnodebConfiguration, |
| 179 | data_model: DataModel, |
| 180 | ) -> List[ParameterName]: |
| 181 | """ |
| 182 | Returns a list of parameter names for object parameters we don't know the |
| 183 | current value of |
| 184 | """ |
| 185 | names = [] |
| 186 | # TODO: This might a string for some strange reason, investigate why |
| 187 | num_plmns = \ |
| 188 | int(device_cfg.get_parameter(ParameterName.NUM_PLMNS)) |
| 189 | for i in range(1, num_plmns + 1): |
| 190 | obj_name = ParameterName.PLMN_N % i |
| 191 | if not device_cfg.has_object(obj_name): |
| 192 | device_cfg.add_object(obj_name) |
| 193 | obj_to_params = data_model.get_numbered_param_names() |
| 194 | desired = obj_to_params[obj_name] |
| 195 | current = [] |
| 196 | if desired_cfg is not None: |
| 197 | current = desired_cfg.get_parameter_names_for_object(obj_name) |
| 198 | names_to_add = list(set(desired) - set(current)) |
| 199 | names = names + names_to_add |
| 200 | return names |
| 201 | |
| 202 | |
| 203 | # We don't attempt to set these parameters on the eNB configuration |
| 204 | READ_ONLY_PARAMETERS = [ |
| 205 | ParameterName.OP_STATE, |
| 206 | ParameterName.RF_TX_STATUS, |
| 207 | ParameterName.GPS_STATUS, |
| 208 | ParameterName.PTP_STATUS, |
| 209 | ParameterName.MME_STATUS, |
| 210 | ParameterName.GPS_LAT, |
| 211 | ParameterName.GPS_LONG, |
| 212 | ] |
| 213 | |
| 214 | |
| 215 | def get_param_values_to_set( |
| 216 | desired_cfg: EnodebConfiguration, |
| 217 | device_cfg: EnodebConfiguration, |
| 218 | data_model: DataModel, |
| 219 | exclude_admin: bool = False, |
| 220 | ) -> Dict[ParameterName, Any]: |
| 221 | """ |
| 222 | Get a map of param names to values for parameters that we will |
| 223 | set on the eNB's configuration, excluding parameters for objects that can |
| 224 | be added/removed. |
| 225 | |
| 226 | Also exclude special parameters like admin state, since it may be set at |
| 227 | a different time in the provisioning process than most parameters. |
| 228 | """ |
| 229 | param_values = {} |
| 230 | # Get the parameters we might set |
| 231 | params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS) |
| 232 | if exclude_admin: |
| 233 | params = set(params) - {ParameterName.ADMIN_STATE} |
| 234 | # Values of parameters |
| 235 | for name in params: |
| 236 | new = desired_cfg.get_parameter(name) |
| 237 | old = device_cfg.get_parameter(name) |
| 238 | _type = data_model.get_parameter(name).type |
| 239 | if not are_tr069_params_equal(new, old, _type): |
| 240 | param_values[name] = new |
| 241 | |
| 242 | return param_values |
| 243 | |
| 244 | |
| 245 | def get_obj_param_values_to_set( |
| 246 | desired_cfg: EnodebConfiguration, |
| 247 | device_cfg: EnodebConfiguration, |
| 248 | data_model: DataModel, |
| 249 | ) -> Dict[ParameterName, Dict[ParameterName, Any]]: |
| 250 | """ Returns a map from object name to (a map of param name to value) """ |
| 251 | param_values = {} |
| 252 | objs = desired_cfg.get_object_names() |
| 253 | for obj_name in objs: |
| 254 | param_values[obj_name] = {} |
| 255 | params = desired_cfg.get_parameter_names_for_object(obj_name) |
| 256 | for name in params: |
| 257 | new = desired_cfg.get_parameter_for_object(name, obj_name) |
| 258 | old = device_cfg.get_parameter_for_object(name, obj_name) |
| 259 | _type = data_model.get_parameter(name).type |
| 260 | if not are_tr069_params_equal(new, old, _type): |
| 261 | param_values[obj_name][name] = new |
| 262 | return param_values |
| 263 | |
| 264 | |
| 265 | def get_all_param_values_to_set( |
| 266 | desired_cfg: EnodebConfiguration, |
| 267 | device_cfg: EnodebConfiguration, |
| 268 | data_model: DataModel, |
| 269 | exclude_admin: bool = False, |
| 270 | ) -> Dict[ParameterName, Any]: |
| 271 | """ Returns a map of param names to values that we need to set """ |
| 272 | param_values = get_param_values_to_set( |
| 273 | desired_cfg, device_cfg, |
| 274 | data_model, exclude_admin, |
| 275 | ) |
| 276 | obj_param_values = get_obj_param_values_to_set( |
| 277 | desired_cfg, device_cfg, |
| 278 | data_model, |
| 279 | ) |
| 280 | for _obj_name, param_map in obj_param_values.items(): |
| 281 | for name, val in param_map.items(): |
| 282 | param_values[name] = val |
| 283 | return param_values |
| 284 | |
| 285 | |
| 286 | def parse_get_parameter_values_response( |
| 287 | data_model: DataModel, |
| 288 | message: models.GetParameterValuesResponse, |
| 289 | ) -> Dict[ParameterName, Any]: |
| 290 | """ Returns a map of ParameterName to the value read from the response """ |
| 291 | param_values_by_path = {} |
| 292 | for param_value_struct in message.ParameterList.ParameterValueStruct: |
| 293 | param_values_by_path[param_value_struct.Name] = \ |
| 294 | param_value_struct.Value.Data |
| 295 | |
| 296 | param_name_list = data_model.get_parameter_names() |
| 297 | name_to_val = {} |
| 298 | for name in param_name_list: |
| 299 | path = data_model.get_parameter(name).path |
| 300 | if path in param_values_by_path: |
| 301 | value = param_values_by_path[path] |
| 302 | name_to_val[name] = value |
| 303 | |
| 304 | return name_to_val |
| 305 | |
| 306 | |
| 307 | def get_optional_param_to_check( |
| 308 | data_model: DataModel, |
| 309 | ) -> Optional[ParameterName]: |
| 310 | """ |
| 311 | If there is a parameter which is optional in the data model, and we do not |
| 312 | know if it exists or not, then return it so we can check for its presence. |
| 313 | """ |
| 314 | params = data_model.get_names_of_optional_params() |
| 315 | for param in params: |
| 316 | try: |
| 317 | data_model.is_parameter_present(param) |
| 318 | except KeyError: |
| 319 | return param |
| 320 | return None |