blob: b335546494523c27546eeb5de68a85d2ccd2ac27 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6from typing import Any, Dict, List, Optional
7
8from data_models.data_model import DataModel
9from data_models.data_model_parameters import ParameterName
10from device_config.enodeb_configuration import EnodebConfiguration
11from devices.device_utils import EnodebDeviceName, get_device_name
12from exceptions import ConfigurationError
13from logger import EnodebdLogger as logger
14from tr069 import models
15
16
17def process_inform_message(
18 inform: Any,
19 data_model: DataModel,
20 device_cfg: EnodebConfiguration,
21) -> None:
22 """
23 Modifies the device configuration based on what is received in the Inform
24 message. Will raise an error if it turns out that the data model we are
25 using is incorrect. This is decided based on the device OUI and
26 software-version that is reported in the Inform message.
27
28 Args:
29 inform: Inform Tr069 message
30 device_handler: The state machine we are using for our device
31 """
32 param_values_by_path = _get_param_values_by_path(inform)
33 param_name_list = data_model.get_parameter_names()
34 name_to_val = {}
35 for name in param_name_list:
36 path = data_model.get_parameter(name).path
37 if path in param_values_by_path:
38 value = param_values_by_path[path]
39 name_to_val[name] = value
40
41 for name, val in name_to_val.items():
42 device_cfg.set_parameter(name, val)
43
44
45def get_device_name_from_inform(
46 inform: models.Inform,
47) -> EnodebDeviceName:
48 def _get_param_value_from_path_suffix(
49 suffix: str,
50 path_list: List[str],
51 param_values_by_path: Dict[str, Any],
52 ) -> Any:
53 for path in path_list:
54 if path.endswith(suffix):
55 return param_values_by_path[path]
56 raise ConfigurationError('Did not receive expected info in Inform')
57
58 param_values_by_path = _get_param_values_by_path(inform)
59
60 # Check the OUI and version number to see if the data model matches
61 path_list = list(param_values_by_path.keys())
62 if hasattr(inform, 'DeviceId') and \
63 hasattr(inform.DeviceId, 'OUI'):
64 device_oui = inform.DeviceId.OUI
65 else:
66 device_oui = _get_param_value_from_path_suffix(
67 'DeviceInfo.ManufacturerOUI',
68 path_list,
69 param_values_by_path,
70 )
71 sw_version = _get_param_value_from_path_suffix(
72 'DeviceInfo.SoftwareVersion',
73 path_list,
74 param_values_by_path,
75 )
76 return get_device_name(device_oui, sw_version)
77
78
79def does_inform_have_event(
80 inform: models.Inform,
81 event_code: str,
82) -> bool:
83 """ True if the Inform message contains the specified event code """
84 for event in inform.Event.EventStruct:
85 if event.EventCode == event_code:
86 return True
87 return False
88
89
90def _get_param_values_by_path(
91 inform: models.Inform,
92) -> Dict[str, Any]:
93 if not hasattr(inform, 'ParameterList') or \
94 not hasattr(inform.ParameterList, 'ParameterValueStruct'):
95 raise ConfigurationError('Did not receive ParamterList in Inform')
96 param_values_by_path = {}
97 for param_value in inform.ParameterList.ParameterValueStruct:
98 path = param_value.Name
99 value = param_value.Value.Data
100 logger.debug(
101 '(Inform msg) Received parameter: %s = %s', path,
102 value,
103 )
104 param_values_by_path[path] = value
105 return param_values_by_path
106
107
108def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool:
109 """
110 Compare two parameters in TR-069 format.
111 The following differences are ignored:
112 - Leading and trailing whitespace, commas and quotes
113 - Capitalization, for booleans (true, false)
114 Returns:
115 True if params are the same
116 """
117 # Cast booleans to integers
118 cmp_a, cmp_b = param_a, param_b
119 if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'):
120 cmp_a, cmp_b = map(int, (cmp_a, cmp_b))
121 cmp_a, cmp_b = map(str, (cmp_a, cmp_b))
122 cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b))
123 if cmp_a.lower() in ['true', 'false']:
124 cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b))
125 return cmp_a == cmp_b
126
127
128def get_all_objects_to_add(
129 desired_cfg: EnodebConfiguration,
130 device_cfg: EnodebConfiguration,
131) -> List[ParameterName]:
132 """
133 Find a ParameterName that needs to be added to the eNB configuration,
134 if any
135
136 Note: This is the expected name of the parameter once it is added
137 but this is different than how to add it. For example,
138 enumerated objects of the form XX.YY.N. should be added
139 by calling AddObject to XX.YY. and having the CPE assign
140 the index.
141 """
142 desired = desired_cfg.get_object_names()
143 current = device_cfg.get_object_names()
144 return list(set(desired).difference(set(current)))
145
146
147def get_all_objects_to_delete(
148 desired_cfg: EnodebConfiguration,
149 device_cfg: EnodebConfiguration,
150) -> List[ParameterName]:
151 """
152 Find a ParameterName that needs to be deleted from the eNB configuration,
153 if any
154 """
155 desired = desired_cfg.get_object_names()
156 current = device_cfg.get_object_names()
157 return list(set(current).difference(set(desired)))
158
159
160def get_params_to_get(
161 device_cfg: EnodebConfiguration,
162 data_model: DataModel,
163 request_all_params: bool = False,
164) -> List[ParameterName]:
165 """
166 Returns the names of params not belonging to objects that are added/removed
167 """
168 desired_names = data_model.get_present_params()
169 if request_all_params:
170 return desired_names
171 known_names = device_cfg.get_parameter_names()
172 names = list(set(desired_names) - set(known_names))
173 return names
174
175
176def get_object_params_to_get(
177 desired_cfg: Optional[EnodebConfiguration],
178 device_cfg: EnodebConfiguration,
179 data_model: DataModel,
180) -> List[ParameterName]:
181 """
182 Returns a list of parameter names for object parameters we don't know the
183 current value of
184 """
185 names = []
186 # TODO: This might a string for some strange reason, investigate why
187 num_plmns = \
188 int(device_cfg.get_parameter(ParameterName.NUM_PLMNS))
189 for i in range(1, num_plmns + 1):
190 obj_name = ParameterName.PLMN_N % i
191 if not device_cfg.has_object(obj_name):
192 device_cfg.add_object(obj_name)
193 obj_to_params = data_model.get_numbered_param_names()
194 desired = obj_to_params[obj_name]
195 current = []
196 if desired_cfg is not None:
197 current = desired_cfg.get_parameter_names_for_object(obj_name)
198 names_to_add = list(set(desired) - set(current))
199 names = names + names_to_add
200 return names
201
202
203# We don't attempt to set these parameters on the eNB configuration
204READ_ONLY_PARAMETERS = [
205 ParameterName.OP_STATE,
206 ParameterName.RF_TX_STATUS,
207 ParameterName.GPS_STATUS,
208 ParameterName.PTP_STATUS,
209 ParameterName.MME_STATUS,
210 ParameterName.GPS_LAT,
211 ParameterName.GPS_LONG,
212]
213
214
215def get_param_values_to_set(
216 desired_cfg: EnodebConfiguration,
217 device_cfg: EnodebConfiguration,
218 data_model: DataModel,
219 exclude_admin: bool = False,
220) -> Dict[ParameterName, Any]:
221 """
222 Get a map of param names to values for parameters that we will
223 set on the eNB's configuration, excluding parameters for objects that can
224 be added/removed.
225
226 Also exclude special parameters like admin state, since it may be set at
227 a different time in the provisioning process than most parameters.
228 """
229 param_values = {}
230 # Get the parameters we might set
231 params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS)
232 if exclude_admin:
233 params = set(params) - {ParameterName.ADMIN_STATE}
234 # Values of parameters
235 for name in params:
236 new = desired_cfg.get_parameter(name)
237 old = device_cfg.get_parameter(name)
238 _type = data_model.get_parameter(name).type
239 if not are_tr069_params_equal(new, old, _type):
240 param_values[name] = new
241
242 return param_values
243
244
245def get_obj_param_values_to_set(
246 desired_cfg: EnodebConfiguration,
247 device_cfg: EnodebConfiguration,
248 data_model: DataModel,
249) -> Dict[ParameterName, Dict[ParameterName, Any]]:
250 """ Returns a map from object name to (a map of param name to value) """
251 param_values = {}
252 objs = desired_cfg.get_object_names()
253 for obj_name in objs:
254 param_values[obj_name] = {}
255 params = desired_cfg.get_parameter_names_for_object(obj_name)
256 for name in params:
257 new = desired_cfg.get_parameter_for_object(name, obj_name)
258 old = device_cfg.get_parameter_for_object(name, obj_name)
259 _type = data_model.get_parameter(name).type
260 if not are_tr069_params_equal(new, old, _type):
261 param_values[obj_name][name] = new
262 return param_values
263
264
265def get_all_param_values_to_set(
266 desired_cfg: EnodebConfiguration,
267 device_cfg: EnodebConfiguration,
268 data_model: DataModel,
269 exclude_admin: bool = False,
270) -> Dict[ParameterName, Any]:
Wei-Yu Chen31ebdb52022-06-27 16:53:11 +0800271
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800272 """ Returns a map of param names to values that we need to set """
273 param_values = get_param_values_to_set(
274 desired_cfg, device_cfg,
275 data_model, exclude_admin,
276 )
277 obj_param_values = get_obj_param_values_to_set(
278 desired_cfg, device_cfg,
279 data_model,
280 )
281 for _obj_name, param_map in obj_param_values.items():
282 for name, val in param_map.items():
283 param_values[name] = val
284 return param_values
285
286
287def parse_get_parameter_values_response(
288 data_model: DataModel,
289 message: models.GetParameterValuesResponse,
290) -> Dict[ParameterName, Any]:
291 """ Returns a map of ParameterName to the value read from the response """
292 param_values_by_path = {}
293 for param_value_struct in message.ParameterList.ParameterValueStruct:
294 param_values_by_path[param_value_struct.Name] = \
295 param_value_struct.Value.Data
296
297 param_name_list = data_model.get_parameter_names()
298 name_to_val = {}
299 for name in param_name_list:
300 path = data_model.get_parameter(name).path
301 if path in param_values_by_path:
302 value = param_values_by_path[path]
303 name_to_val[name] = value
304
305 return name_to_val
306
307
308def get_optional_param_to_check(
309 data_model: DataModel,
310) -> Optional[ParameterName]:
311 """
312 If there is a parameter which is optional in the data model, and we do not
313 know if it exists or not, then return it so we can check for its presence.
314 """
315 params = data_model.get_names_of_optional_params()
316 for param in params:
317 try:
318 data_model.is_parameter_present(param)
319 except KeyError:
320 return param
321 return None