blob: e35f37d8128e94ed37ccf682c5a9bc2cc4f02e4d [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6from typing import Any, Dict, List, Optional
7
8from data_models.data_model import DataModel
9from data_models.data_model_parameters import ParameterName
10from device_config.enodeb_configuration import EnodebConfiguration
11from devices.device_utils import EnodebDeviceName, get_device_name
12from exceptions import ConfigurationError
13from logger import EnodebdLogger as logger
14from tr069 import models
15
16
17def process_inform_message(
18 inform: Any,
19 data_model: DataModel,
20 device_cfg: EnodebConfiguration,
21) -> None:
22 """
23 Modifies the device configuration based on what is received in the Inform
24 message. Will raise an error if it turns out that the data model we are
25 using is incorrect. This is decided based on the device OUI and
26 software-version that is reported in the Inform message.
27
28 Args:
29 inform: Inform Tr069 message
30 device_handler: The state machine we are using for our device
31 """
32 param_values_by_path = _get_param_values_by_path(inform)
33 param_name_list = data_model.get_parameter_names()
34 name_to_val = {}
35 for name in param_name_list:
36 path = data_model.get_parameter(name).path
37 if path in param_values_by_path:
38 value = param_values_by_path[path]
39 name_to_val[name] = value
40
41 for name, val in name_to_val.items():
42 device_cfg.set_parameter(name, val)
43
44
45def get_device_name_from_inform(
46 inform: models.Inform,
47) -> EnodebDeviceName:
48 def _get_param_value_from_path_suffix(
49 suffix: str,
50 path_list: List[str],
51 param_values_by_path: Dict[str, Any],
52 ) -> Any:
53 for path in path_list:
54 if path.endswith(suffix):
55 return param_values_by_path[path]
56 raise ConfigurationError('Did not receive expected info in Inform')
57
58 param_values_by_path = _get_param_values_by_path(inform)
59
60 # Check the OUI and version number to see if the data model matches
61 path_list = list(param_values_by_path.keys())
62 if hasattr(inform, 'DeviceId') and \
63 hasattr(inform.DeviceId, 'OUI'):
64 device_oui = inform.DeviceId.OUI
65 else:
66 device_oui = _get_param_value_from_path_suffix(
67 'DeviceInfo.ManufacturerOUI',
68 path_list,
69 param_values_by_path,
70 )
71 sw_version = _get_param_value_from_path_suffix(
72 'DeviceInfo.SoftwareVersion',
73 path_list,
74 param_values_by_path,
75 )
76 return get_device_name(device_oui, sw_version)
77
78
79def does_inform_have_event(
80 inform: models.Inform,
81 event_code: str,
82) -> bool:
83 """ True if the Inform message contains the specified event code """
84 for event in inform.Event.EventStruct:
85 if event.EventCode == event_code:
86 return True
87 return False
88
89
90def _get_param_values_by_path(
91 inform: models.Inform,
92) -> Dict[str, Any]:
93 if not hasattr(inform, 'ParameterList') or \
94 not hasattr(inform.ParameterList, 'ParameterValueStruct'):
95 raise ConfigurationError('Did not receive ParamterList in Inform')
96 param_values_by_path = {}
97 for param_value in inform.ParameterList.ParameterValueStruct:
98 path = param_value.Name
99 value = param_value.Value.Data
100 logger.debug(
101 '(Inform msg) Received parameter: %s = %s', path,
102 value,
103 )
104 param_values_by_path[path] = value
105 return param_values_by_path
106
107
108def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool:
109 """
110 Compare two parameters in TR-069 format.
111 The following differences are ignored:
112 - Leading and trailing whitespace, commas and quotes
113 - Capitalization, for booleans (true, false)
114 Returns:
115 True if params are the same
116 """
117 # Cast booleans to integers
118 cmp_a, cmp_b = param_a, param_b
119 if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'):
120 cmp_a, cmp_b = map(int, (cmp_a, cmp_b))
121 cmp_a, cmp_b = map(str, (cmp_a, cmp_b))
122 cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b))
123 if cmp_a.lower() in ['true', 'false']:
124 cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b))
125 return cmp_a == cmp_b
126
127
128def get_all_objects_to_add(
129 desired_cfg: EnodebConfiguration,
130 device_cfg: EnodebConfiguration,
131) -> List[ParameterName]:
132 """
133 Find a ParameterName that needs to be added to the eNB configuration,
134 if any
135
136 Note: This is the expected name of the parameter once it is added
137 but this is different than how to add it. For example,
138 enumerated objects of the form XX.YY.N. should be added
139 by calling AddObject to XX.YY. and having the CPE assign
140 the index.
141 """
142 desired = desired_cfg.get_object_names()
143 current = device_cfg.get_object_names()
144 return list(set(desired).difference(set(current)))
145
146
147def get_all_objects_to_delete(
148 desired_cfg: EnodebConfiguration,
149 device_cfg: EnodebConfiguration,
150) -> List[ParameterName]:
151 """
152 Find a ParameterName that needs to be deleted from the eNB configuration,
153 if any
154 """
155 desired = desired_cfg.get_object_names()
156 current = device_cfg.get_object_names()
157 return list(set(current).difference(set(desired)))
158
159
160def get_params_to_get(
161 device_cfg: EnodebConfiguration,
162 data_model: DataModel,
163 request_all_params: bool = False,
164) -> List[ParameterName]:
165 """
166 Returns the names of params not belonging to objects that are added/removed
167 """
168 desired_names = data_model.get_present_params()
169 if request_all_params:
170 return desired_names
171 known_names = device_cfg.get_parameter_names()
172 names = list(set(desired_names) - set(known_names))
173 return names
174
175
176def get_object_params_to_get(
177 desired_cfg: Optional[EnodebConfiguration],
178 device_cfg: EnodebConfiguration,
179 data_model: DataModel,
180) -> List[ParameterName]:
181 """
182 Returns a list of parameter names for object parameters we don't know the
183 current value of
184 """
185 names = []
186 # TODO: This might a string for some strange reason, investigate why
187 num_plmns = \
188 int(device_cfg.get_parameter(ParameterName.NUM_PLMNS))
189 for i in range(1, num_plmns + 1):
190 obj_name = ParameterName.PLMN_N % i
191 if not device_cfg.has_object(obj_name):
192 device_cfg.add_object(obj_name)
193 obj_to_params = data_model.get_numbered_param_names()
194 desired = obj_to_params[obj_name]
195 current = []
196 if desired_cfg is not None:
197 current = desired_cfg.get_parameter_names_for_object(obj_name)
198 names_to_add = list(set(desired) - set(current))
199 names = names + names_to_add
200 return names
201
202
203# We don't attempt to set these parameters on the eNB configuration
204READ_ONLY_PARAMETERS = [
205 ParameterName.OP_STATE,
206 ParameterName.RF_TX_STATUS,
207 ParameterName.GPS_STATUS,
208 ParameterName.PTP_STATUS,
209 ParameterName.MME_STATUS,
210 ParameterName.GPS_LAT,
211 ParameterName.GPS_LONG,
212]
213
214
215def get_param_values_to_set(
216 desired_cfg: EnodebConfiguration,
217 device_cfg: EnodebConfiguration,
218 data_model: DataModel,
219 exclude_admin: bool = False,
220) -> Dict[ParameterName, Any]:
221 """
222 Get a map of param names to values for parameters that we will
223 set on the eNB's configuration, excluding parameters for objects that can
224 be added/removed.
225
226 Also exclude special parameters like admin state, since it may be set at
227 a different time in the provisioning process than most parameters.
228 """
229 param_values = {}
230 # Get the parameters we might set
231 params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS)
232 if exclude_admin:
233 params = set(params) - {ParameterName.ADMIN_STATE}
234 # Values of parameters
235 for name in params:
236 new = desired_cfg.get_parameter(name)
237 old = device_cfg.get_parameter(name)
238 _type = data_model.get_parameter(name).type
239 if not are_tr069_params_equal(new, old, _type):
240 param_values[name] = new
241
242 return param_values
243
244
245def get_obj_param_values_to_set(
246 desired_cfg: EnodebConfiguration,
247 device_cfg: EnodebConfiguration,
248 data_model: DataModel,
249) -> Dict[ParameterName, Dict[ParameterName, Any]]:
250 """ Returns a map from object name to (a map of param name to value) """
251 param_values = {}
252 objs = desired_cfg.get_object_names()
253 for obj_name in objs:
254 param_values[obj_name] = {}
255 params = desired_cfg.get_parameter_names_for_object(obj_name)
256 for name in params:
257 new = desired_cfg.get_parameter_for_object(name, obj_name)
258 old = device_cfg.get_parameter_for_object(name, obj_name)
259 _type = data_model.get_parameter(name).type
260 if not are_tr069_params_equal(new, old, _type):
261 param_values[obj_name][name] = new
262 return param_values
263
264
265def get_all_param_values_to_set(
266 desired_cfg: EnodebConfiguration,
267 device_cfg: EnodebConfiguration,
268 data_model: DataModel,
269 exclude_admin: bool = False,
270) -> Dict[ParameterName, Any]:
271 """ Returns a map of param names to values that we need to set """
272 param_values = get_param_values_to_set(
273 desired_cfg, device_cfg,
274 data_model, exclude_admin,
275 )
276 obj_param_values = get_obj_param_values_to_set(
277 desired_cfg, device_cfg,
278 data_model,
279 )
280 for _obj_name, param_map in obj_param_values.items():
281 for name, val in param_map.items():
282 param_values[name] = val
283 return param_values
284
285
286def parse_get_parameter_values_response(
287 data_model: DataModel,
288 message: models.GetParameterValuesResponse,
289) -> Dict[ParameterName, Any]:
290 """ Returns a map of ParameterName to the value read from the response """
291 param_values_by_path = {}
292 for param_value_struct in message.ParameterList.ParameterValueStruct:
293 param_values_by_path[param_value_struct.Name] = \
294 param_value_struct.Value.Data
295
296 param_name_list = data_model.get_parameter_names()
297 name_to_val = {}
298 for name in param_name_list:
299 path = data_model.get_parameter(name).path
300 if path in param_values_by_path:
301 value = param_values_by_path[path]
302 name_to_val[name] = value
303
304 return name_to_val
305
306
307def get_optional_param_to_check(
308 data_model: DataModel,
309) -> Optional[ParameterName]:
310 """
311 If there is a parameter which is optional in the data model, and we do not
312 know if it exists or not, then return it so we can check for its presence.
313 """
314 params = data_model.get_names_of_optional_params()
315 for param in params:
316 try:
317 data_model.is_parameter_present(param)
318 except KeyError:
319 return param
320 return None