blob: e0e32cc218bd9dba2aa7c26dd43bd3d5c9328752 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14from typing import Any, Dict, List, Optional
15
16from data_models.data_model import DataModel
17from data_models.data_model_parameters import ParameterName
18from device_config.enodeb_configuration import EnodebConfiguration
19from devices.device_utils import EnodebDeviceName, get_device_name
20from exceptions import ConfigurationError
21from logger import EnodebdLogger as logger
22from tr069 import models
23
24
25def process_inform_message(
26 inform: Any,
27 data_model: DataModel,
28 device_cfg: EnodebConfiguration,
29) -> None:
30 """
31 Modifies the device configuration based on what is received in the Inform
32 message. Will raise an error if it turns out that the data model we are
33 using is incorrect. This is decided based on the device OUI and
34 software-version that is reported in the Inform message.
35
36 Args:
37 inform: Inform Tr069 message
38 device_handler: The state machine we are using for our device
39 """
40 param_values_by_path = _get_param_values_by_path(inform)
41 param_name_list = data_model.get_parameter_names()
42 name_to_val = {}
43 for name in param_name_list:
44 path = data_model.get_parameter(name).path
45 if path in param_values_by_path:
46 value = param_values_by_path[path]
47 name_to_val[name] = value
48
49 for name, val in name_to_val.items():
50 device_cfg.set_parameter(name, val)
51
52
53def get_device_name_from_inform(
54 inform: models.Inform,
55) -> EnodebDeviceName:
56 def _get_param_value_from_path_suffix(
57 suffix: str,
58 path_list: List[str],
59 param_values_by_path: Dict[str, Any],
60 ) -> Any:
61 for path in path_list:
62 if path.endswith(suffix):
63 return param_values_by_path[path]
64 raise ConfigurationError('Did not receive expected info in Inform')
65
66 param_values_by_path = _get_param_values_by_path(inform)
67
68 # Check the OUI and version number to see if the data model matches
69 path_list = list(param_values_by_path.keys())
70 if hasattr(inform, 'DeviceId') and \
71 hasattr(inform.DeviceId, 'OUI'):
72 device_oui = inform.DeviceId.OUI
73 else:
74 device_oui = _get_param_value_from_path_suffix(
75 'DeviceInfo.ManufacturerOUI',
76 path_list,
77 param_values_by_path,
78 )
79 sw_version = _get_param_value_from_path_suffix(
80 'DeviceInfo.SoftwareVersion',
81 path_list,
82 param_values_by_path,
83 )
84 return get_device_name(device_oui, sw_version)
85
86
87def does_inform_have_event(
88 inform: models.Inform,
89 event_code: str,
90) -> bool:
91 """ True if the Inform message contains the specified event code """
92 for event in inform.Event.EventStruct:
93 if event.EventCode == event_code:
94 return True
95 return False
96
97
98def _get_param_values_by_path(
99 inform: models.Inform,
100) -> Dict[str, Any]:
101 if not hasattr(inform, 'ParameterList') or \
102 not hasattr(inform.ParameterList, 'ParameterValueStruct'):
103 raise ConfigurationError('Did not receive ParamterList in Inform')
104 param_values_by_path = {}
105 for param_value in inform.ParameterList.ParameterValueStruct:
106 path = param_value.Name
107 value = param_value.Value.Data
108 logger.debug(
109 '(Inform msg) Received parameter: %s = %s', path,
110 value,
111 )
112 param_values_by_path[path] = value
113 return param_values_by_path
114
115
116def are_tr069_params_equal(param_a: Any, param_b: Any, type_: str) -> bool:
117 """
118 Compare two parameters in TR-069 format.
119 The following differences are ignored:
120 - Leading and trailing whitespace, commas and quotes
121 - Capitalization, for booleans (true, false)
122 Returns:
123 True if params are the same
124 """
125 # Cast booleans to integers
126 cmp_a, cmp_b = param_a, param_b
127 if type_ == 'boolean' and cmp_b in ('0', '1') or cmp_a in ('0', '1'):
128 cmp_a, cmp_b = map(int, (cmp_a, cmp_b))
129 cmp_a, cmp_b = map(str, (cmp_a, cmp_b))
130 cmp_a, cmp_b = map(lambda s: s.strip(', \'"'), (cmp_a, cmp_b))
131 if cmp_a.lower() in ['true', 'false']:
132 cmp_a, cmp_b = map(lambda s: s.lower(), (cmp_a, cmp_b))
133 return cmp_a == cmp_b
134
135
136def get_all_objects_to_add(
137 desired_cfg: EnodebConfiguration,
138 device_cfg: EnodebConfiguration,
139) -> List[ParameterName]:
140 """
141 Find a ParameterName that needs to be added to the eNB configuration,
142 if any
143
144 Note: This is the expected name of the parameter once it is added
145 but this is different than how to add it. For example,
146 enumerated objects of the form XX.YY.N. should be added
147 by calling AddObject to XX.YY. and having the CPE assign
148 the index.
149 """
150 desired = desired_cfg.get_object_names()
151 current = device_cfg.get_object_names()
152 return list(set(desired).difference(set(current)))
153
154
155def get_all_objects_to_delete(
156 desired_cfg: EnodebConfiguration,
157 device_cfg: EnodebConfiguration,
158) -> List[ParameterName]:
159 """
160 Find a ParameterName that needs to be deleted from the eNB configuration,
161 if any
162 """
163 desired = desired_cfg.get_object_names()
164 current = device_cfg.get_object_names()
165 return list(set(current).difference(set(desired)))
166
167
168def get_params_to_get(
169 device_cfg: EnodebConfiguration,
170 data_model: DataModel,
171 request_all_params: bool = False,
172) -> List[ParameterName]:
173 """
174 Returns the names of params not belonging to objects that are added/removed
175 """
176 desired_names = data_model.get_present_params()
177 if request_all_params:
178 return desired_names
179 known_names = device_cfg.get_parameter_names()
180 names = list(set(desired_names) - set(known_names))
181 return names
182
183
184def get_object_params_to_get(
185 desired_cfg: Optional[EnodebConfiguration],
186 device_cfg: EnodebConfiguration,
187 data_model: DataModel,
188) -> List[ParameterName]:
189 """
190 Returns a list of parameter names for object parameters we don't know the
191 current value of
192 """
193 names = []
194 # TODO: This might a string for some strange reason, investigate why
195 num_plmns = \
196 int(device_cfg.get_parameter(ParameterName.NUM_PLMNS))
197 for i in range(1, num_plmns + 1):
198 obj_name = ParameterName.PLMN_N % i
199 if not device_cfg.has_object(obj_name):
200 device_cfg.add_object(obj_name)
201 obj_to_params = data_model.get_numbered_param_names()
202 desired = obj_to_params[obj_name]
203 current = []
204 if desired_cfg is not None:
205 current = desired_cfg.get_parameter_names_for_object(obj_name)
206 names_to_add = list(set(desired) - set(current))
207 names = names + names_to_add
208 return names
209
210
211# We don't attempt to set these parameters on the eNB configuration
212READ_ONLY_PARAMETERS = [
213 ParameterName.OP_STATE,
214 ParameterName.RF_TX_STATUS,
215 ParameterName.GPS_STATUS,
216 ParameterName.PTP_STATUS,
217 ParameterName.MME_STATUS,
218 ParameterName.GPS_LAT,
219 ParameterName.GPS_LONG,
220]
221
222
223def get_param_values_to_set(
224 desired_cfg: EnodebConfiguration,
225 device_cfg: EnodebConfiguration,
226 data_model: DataModel,
227 exclude_admin: bool = False,
228) -> Dict[ParameterName, Any]:
229 """
230 Get a map of param names to values for parameters that we will
231 set on the eNB's configuration, excluding parameters for objects that can
232 be added/removed.
233
234 Also exclude special parameters like admin state, since it may be set at
235 a different time in the provisioning process than most parameters.
236 """
237 param_values = {}
238 # Get the parameters we might set
239 params = set(desired_cfg.get_parameter_names()) - set(READ_ONLY_PARAMETERS)
240 if exclude_admin:
241 params = set(params) - {ParameterName.ADMIN_STATE}
242 # Values of parameters
243 for name in params:
244 new = desired_cfg.get_parameter(name)
245 old = device_cfg.get_parameter(name)
246 _type = data_model.get_parameter(name).type
247 if not are_tr069_params_equal(new, old, _type):
248 param_values[name] = new
249
250 return param_values
251
252
253def get_obj_param_values_to_set(
254 desired_cfg: EnodebConfiguration,
255 device_cfg: EnodebConfiguration,
256 data_model: DataModel,
257) -> Dict[ParameterName, Dict[ParameterName, Any]]:
258 """ Returns a map from object name to (a map of param name to value) """
259 param_values = {}
260 objs = desired_cfg.get_object_names()
261 for obj_name in objs:
262 param_values[obj_name] = {}
263 params = desired_cfg.get_parameter_names_for_object(obj_name)
264 for name in params:
265 new = desired_cfg.get_parameter_for_object(name, obj_name)
266 old = device_cfg.get_parameter_for_object(name, obj_name)
267 _type = data_model.get_parameter(name).type
268 if not are_tr069_params_equal(new, old, _type):
269 param_values[obj_name][name] = new
270 return param_values
271
272
273def get_all_param_values_to_set(
274 desired_cfg: EnodebConfiguration,
275 device_cfg: EnodebConfiguration,
276 data_model: DataModel,
277 exclude_admin: bool = False,
278) -> Dict[ParameterName, Any]:
279 """ Returns a map of param names to values that we need to set """
280 param_values = get_param_values_to_set(
281 desired_cfg, device_cfg,
282 data_model, exclude_admin,
283 )
284 obj_param_values = get_obj_param_values_to_set(
285 desired_cfg, device_cfg,
286 data_model,
287 )
288 for _obj_name, param_map in obj_param_values.items():
289 for name, val in param_map.items():
290 param_values[name] = val
291 return param_values
292
293
294def parse_get_parameter_values_response(
295 data_model: DataModel,
296 message: models.GetParameterValuesResponse,
297) -> Dict[ParameterName, Any]:
298 """ Returns a map of ParameterName to the value read from the response """
299 param_values_by_path = {}
300 for param_value_struct in message.ParameterList.ParameterValueStruct:
301 param_values_by_path[param_value_struct.Name] = \
302 param_value_struct.Value.Data
303
304 param_name_list = data_model.get_parameter_names()
305 name_to_val = {}
306 for name in param_name_list:
307 path = data_model.get_parameter(name).path
308 if path in param_values_by_path:
309 value = param_values_by_path[path]
310 name_to_val[name] = value
311
312 return name_to_val
313
314
315def get_optional_param_to_check(
316 data_model: DataModel,
317) -> Optional[ParameterName]:
318 """
319 If there is a parameter which is optional in the data model, and we do not
320 know if it exists or not, then return it so we can check for its presence.
321 """
322 params = data_model.get_names_of_optional_params()
323 for param in params:
324 try:
325 data_model.is_parameter_present(param)
326 except KeyError:
327 return param
328 return None