Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 1 | """ |
| 2 | Copyright 2020 The Magma Authors. |
| 3 | |
| 4 | This source code is licensed under the BSD-style license found in the |
| 5 | LICENSE file in the root directory of this source tree. |
| 6 | |
| 7 | Unless required by applicable law or agreed to in writing, software |
| 8 | distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | See the License for the specific language governing permissions and |
| 11 | limitations under the License. |
| 12 | """ |
| 13 | |
| 14 | from typing import Any, Dict, List, Optional |
| 15 | |
| 16 | from data_models.data_model import DataModel |
| 17 | from data_models.data_model_parameters import ParameterName |
| 18 | from device_config.enodeb_configuration import EnodebConfiguration |
| 19 | from devices.device_utils import EnodebDeviceName, get_device_name |
| 20 | from exceptions import ConfigurationError |
| 21 | from logger import EnodebdLogger as logger |
| 22 | from tr069 import models |
| 23 | |
| 24 | |
| 25 | def process_inform_message( |
| 26 | inform: Any, |
| 27 | data_model: DataModel, |
| 28 | device_cfg: EnodebConfiguration, |
| 29 | ) -> None: |
| 30 | """ |
| 31 | Modifies the device configuration based on what is received in the Inform |
| 32 | message. Will raise an error if it turns out that the data model we are |
| 33 | using is incorrect. This is decided based on the device OUI and |
| 34 | software-version that is reported in the Inform message. |
| 35 | |
| 36 | Args: |
| 37 | inform: Inform Tr069 message |
| 38 | device_handler: The state machine we are using for our device |
| 39 | """ |
| 40 | param_values_by_path = _get_param_values_by_path(inform) |
| 41 | param_name_list = data_model.get_parameter_names() |
| 42 | name_to_val = {} |
| 43 | for name in param_name_list: |
| 44 | path = data_model.get_parameter(name).path |
| 45 | if path in param_values_by_path: |
| 46 | value = param_values_by_path[path] |
| 47 | name_to_val[name] = value |
| 48 | |
| 49 | for name, val in name_to_val.items(): |
| 50 | device_cfg.set_parameter(name, val) |
| 51 | |
| 52 | |
| 53 | def get_device_name_from_inform( |
| 54 | inform: models.Inform, |
| 55 | ) -> EnodebDeviceName: |
| 56 | def _get_param_value_from_path_suffix( |
| 57 | suffix: str, |
| 58 | path_list: List[str], |
| 59 | param_values_by_path: Dict[str, Any], |
| 60 | ) -> Any: |
| 61 | for path in path_list: |
| 62 | if path.endswith(suffix): |
| 63 | return param_values_by_path[path] |
| 64 | raise ConfigurationError('Did not receive expected info in Inform') |
| 65 | |
| 66 | param_values_by_path = _get_param_values_by_path(inform) |
| 67 | |
| 68 | # Check the OUI and version number to see if the data model matches |
| 69 | path_list = list(param_values_by_path.keys()) |
| 70 | if hasattr(inform, 'DeviceId') and \ |
| 71 | hasattr(inform.DeviceId, 'OUI'): |
| 72 | device_oui = inform.DeviceId.OUI |
| 73 | else: |
| 74 | device_oui = _get_param_value_from_path_suffix( |
| 75 | 'DeviceInfo.ManufacturerOUI', |
| 76 | path_list, |
| 77 | param_values_by_path, |
| 78 | ) |
| 79 | sw_version = _get_param_value_from_path_suffix( |
| 80 | 'DeviceInfo.SoftwareVersion', |
| 81 | path_list, |
| 82 | param_values_by_path, |
| 83 | ) |
| 84 | return get_device_name(device_oui, sw_version) |
| 85 | |
| 86 | |
| 87 | def does_inform_have_event( |
| 88 | inform: models.Inform, |
| 89 | event_code: str, |
| 90 | ) -> bool: |
| 91 | """ True if the Inform message contains the specified event code """ |
| 92 | for event in inform.Event.EventStruct: |
| 93 | if event.EventCode == event_code: |
| 94 | return True |
| 95 | return False |
| 96 | |
| 97 | |
| 98 | def _get_param_values_by_path( |
| 99 | inform: models.Inform, |
| 100 | ) -> Dict[str, Any]: |
| 101 | if not hasattr(inform, 'ParameterList') or \ |
| 102 | not hasattr(inform.ParameterList, 'ParameterValueStruct'): |
| 103 | raise ConfigurationError('Did not receive ParamterList in Inform') |
| 104 | param_values_by_path = {} |
| 105 | for param_value in inform.ParameterList.ParameterValueStruct: |
| 106 | path = param_value.Name |
| 107 | value = param_value.Value.Data |
| 108 | logger.debug( |
| 109 | '(Inform msg) Received parameter: %s = %s', path, |
| 110 | value, |
| 111 | ) |
| 112 | param_values_by_path[path] = value |
| 113 | return param_values_by_path |
| 114 | |
| 115 | |
| 116 | def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool: |
| 117 | """ |
| 118 | Compare two parameters in TR-069 format. |
| 119 | The following differences are ignored: |
| 120 | - Leading and trailing whitespace, commas and quotes |
| 121 | - Capitalization, for booleans (true, false) |
| 122 | Returns: |
| 123 | True if params are the same |
| 124 | """ |
| 125 | # Cast booleans to integers |
| 126 | cmp_a, cmp_b = param_a, param_b |
| 127 | if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'): |
| 128 | cmp_a, cmp_b = map(int, (cmp_a, cmp_b)) |
| 129 | cmp_a, cmp_b = map(str, (cmp_a, cmp_b)) |
| 130 | cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b)) |
| 131 | if cmp_a.lower() in ['true', 'false']: |
| 132 | cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b)) |
| 133 | return cmp_a == cmp_b |
| 134 | |
| 135 | |
| 136 | def get_all_objects_to_add( |
| 137 | desired_cfg: EnodebConfiguration, |
| 138 | device_cfg: EnodebConfiguration, |
| 139 | ) -> List[ParameterName]: |
| 140 | """ |
| 141 | Find a ParameterName that needs to be added to the eNB configuration, |
| 142 | if any |
| 143 | |
| 144 | Note: This is the expected name of the parameter once it is added |
| 145 | but this is different than how to add it. For example, |
| 146 | enumerated objects of the form XX.YY.N. should be added |
| 147 | by calling AddObject to XX.YY. and having the CPE assign |
| 148 | the index. |
| 149 | """ |
| 150 | desired = desired_cfg.get_object_names() |
| 151 | current = device_cfg.get_object_names() |
| 152 | return list(set(desired).difference(set(current))) |
| 153 | |
| 154 | |
| 155 | def get_all_objects_to_delete( |
| 156 | desired_cfg: EnodebConfiguration, |
| 157 | device_cfg: EnodebConfiguration, |
| 158 | ) -> List[ParameterName]: |
| 159 | """ |
| 160 | Find a ParameterName that needs to be deleted from the eNB configuration, |
| 161 | if any |
| 162 | """ |
| 163 | desired = desired_cfg.get_object_names() |
| 164 | current = device_cfg.get_object_names() |
| 165 | return list(set(current).difference(set(desired))) |
| 166 | |
| 167 | |
| 168 | def get_params_to_get( |
| 169 | device_cfg: EnodebConfiguration, |
| 170 | data_model: DataModel, |
| 171 | request_all_params: bool = False, |
| 172 | ) -> List[ParameterName]: |
| 173 | """ |
| 174 | Returns the names of params not belonging to objects that are added/removed |
| 175 | """ |
| 176 | desired_names = data_model.get_present_params() |
| 177 | if request_all_params: |
| 178 | return desired_names |
| 179 | known_names = device_cfg.get_parameter_names() |
| 180 | names = list(set(desired_names) - set(known_names)) |
| 181 | return names |
| 182 | |
| 183 | |
| 184 | def get_object_params_to_get( |
| 185 | desired_cfg: Optional[EnodebConfiguration], |
| 186 | device_cfg: EnodebConfiguration, |
| 187 | data_model: DataModel, |
| 188 | ) -> List[ParameterName]: |
| 189 | """ |
| 190 | Returns a list of parameter names for object parameters we don't know the |
| 191 | current value of |
| 192 | """ |
| 193 | names = [] |
| 194 | # TODO: This might a string for some strange reason, investigate why |
| 195 | num_plmns = \ |
| 196 | int(device_cfg.get_parameter(ParameterName.NUM_PLMNS)) |
| 197 | for i in range(1, num_plmns + 1): |
| 198 | obj_name = ParameterName.PLMN_N % i |
| 199 | if not device_cfg.has_object(obj_name): |
| 200 | device_cfg.add_object(obj_name) |
| 201 | obj_to_params = data_model.get_numbered_param_names() |
| 202 | desired = obj_to_params[obj_name] |
| 203 | current = [] |
| 204 | if desired_cfg is not None: |
| 205 | current = desired_cfg.get_parameter_names_for_object(obj_name) |
| 206 | names_to_add = list(set(desired) - set(current)) |
| 207 | names = names + names_to_add |
| 208 | return names |
| 209 | |
| 210 | |
| 211 | # We don't attempt to set these parameters on the eNB configuration |
| 212 | READ_ONLY_PARAMETERS = [ |
| 213 | ParameterName.OP_STATE, |
| 214 | ParameterName.RF_TX_STATUS, |
| 215 | ParameterName.GPS_STATUS, |
| 216 | ParameterName.PTP_STATUS, |
| 217 | ParameterName.MME_STATUS, |
| 218 | ParameterName.GPS_LAT, |
| 219 | ParameterName.GPS_LONG, |
| 220 | ] |
| 221 | |
| 222 | |
| 223 | def get_param_values_to_set( |
| 224 | desired_cfg: EnodebConfiguration, |
| 225 | device_cfg: EnodebConfiguration, |
| 226 | data_model: DataModel, |
| 227 | exclude_admin: bool = False, |
| 228 | ) -> Dict[ParameterName, Any]: |
| 229 | """ |
| 230 | Get a map of param names to values for parameters that we will |
| 231 | set on the eNB's configuration, excluding parameters for objects that can |
| 232 | be added/removed. |
| 233 | |
| 234 | Also exclude special parameters like admin state, since it may be set at |
| 235 | a different time in the provisioning process than most parameters. |
| 236 | """ |
| 237 | param_values = {} |
| 238 | # Get the parameters we might set |
| 239 | params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS) |
| 240 | if exclude_admin: |
| 241 | params = set(params) - {ParameterName.ADMIN_STATE} |
| 242 | # Values of parameters |
| 243 | for name in params: |
| 244 | new = desired_cfg.get_parameter(name) |
| 245 | old = device_cfg.get_parameter(name) |
| 246 | _type = data_model.get_parameter(name).type |
| 247 | if not are_tr069_params_equal(new, old, _type): |
| 248 | param_values[name] = new |
| 249 | |
| 250 | return param_values |
| 251 | |
| 252 | |
| 253 | def get_obj_param_values_to_set( |
| 254 | desired_cfg: EnodebConfiguration, |
| 255 | device_cfg: EnodebConfiguration, |
| 256 | data_model: DataModel, |
| 257 | ) -> Dict[ParameterName, Dict[ParameterName, Any]]: |
| 258 | """ Returns a map from object name to (a map of param name to value) """ |
| 259 | param_values = {} |
| 260 | objs = desired_cfg.get_object_names() |
| 261 | for obj_name in objs: |
| 262 | param_values[obj_name] = {} |
| 263 | params = desired_cfg.get_parameter_names_for_object(obj_name) |
| 264 | for name in params: |
| 265 | new = desired_cfg.get_parameter_for_object(name, obj_name) |
| 266 | old = device_cfg.get_parameter_for_object(name, obj_name) |
| 267 | _type = data_model.get_parameter(name).type |
| 268 | if not are_tr069_params_equal(new, old, _type): |
| 269 | param_values[obj_name][name] = new |
| 270 | return param_values |
| 271 | |
| 272 | |
| 273 | def get_all_param_values_to_set( |
| 274 | desired_cfg: EnodebConfiguration, |
| 275 | device_cfg: EnodebConfiguration, |
| 276 | data_model: DataModel, |
| 277 | exclude_admin: bool = False, |
| 278 | ) -> Dict[ParameterName, Any]: |
| 279 | """ Returns a map of param names to values that we need to set """ |
| 280 | param_values = get_param_values_to_set( |
| 281 | desired_cfg, device_cfg, |
| 282 | data_model, exclude_admin, |
| 283 | ) |
| 284 | obj_param_values = get_obj_param_values_to_set( |
| 285 | desired_cfg, device_cfg, |
| 286 | data_model, |
| 287 | ) |
| 288 | for _obj_name, param_map in obj_param_values.items(): |
| 289 | for name, val in param_map.items(): |
| 290 | param_values[name] = val |
| 291 | return param_values |
| 292 | |
| 293 | |
| 294 | def parse_get_parameter_values_response( |
| 295 | data_model: DataModel, |
| 296 | message: models.GetParameterValuesResponse, |
| 297 | ) -> Dict[ParameterName, Any]: |
| 298 | """ Returns a map of ParameterName to the value read from the response """ |
| 299 | param_values_by_path = {} |
| 300 | for param_value_struct in message.ParameterList.ParameterValueStruct: |
| 301 | param_values_by_path[param_value_struct.Name] = \ |
| 302 | param_value_struct.Value.Data |
| 303 | |
| 304 | param_name_list = data_model.get_parameter_names() |
| 305 | name_to_val = {} |
| 306 | for name in param_name_list: |
| 307 | path = data_model.get_parameter(name).path |
| 308 | if path in param_values_by_path: |
| 309 | value = param_values_by_path[path] |
| 310 | name_to_val[name] = value |
| 311 | |
| 312 | return name_to_val |
| 313 | |
| 314 | |
| 315 | def get_optional_param_to_check( |
| 316 | data_model: DataModel, |
| 317 | ) -> Optional[ParameterName]: |
| 318 | """ |
| 319 | If there is a parameter which is optional in the data model, and we do not |
| 320 | know if it exists or not, then return it so we can check for its presence. |
| 321 | """ |
| 322 | params = data_model.get_names_of_optional_params() |
| 323 | for param in params: |
| 324 | try: |
| 325 | data_model.is_parameter_present(param) |
| 326 | except KeyError: |
| 327 | return param |
| 328 | return None |