blob: b167e9b949f16311e543959dcb97596864393fc4 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
Wei-Yu Chen49950b92021-11-08 19:19:18 +08006import logging
7import os
8from unittest.mock import Mock, call, patch
9
10from lte.protos.mconfig import mconfigs_pb2
11from common.service import MagmaService
12from data_models.data_model_parameters import ParameterName
13from devices.device_map import get_device_handler_from_name
14from devices.device_utils import EnodebDeviceName
15from devices.freedomfi_one import (
16 FreedomFiOneConfigurationInitializer,
17 SASParameters,
18 StatusParameters,
19)
20from tests.test_utils.config_builder import EnodebConfigBuilder
21from tests.test_utils.enb_acs_builder import (
22 EnodebAcsStateMachineBuilder,
23)
24from tests.test_utils.enodeb_handler import EnodebHandlerTestCase
25from tests.test_utils.tr069_msg_builder import Tr069MessageBuilder
26from tr069 import models
27
28SRC_CONFIG_DIR = os.path.join(
29 os.environ.get('MAGMA_ROOT'),
30 'lte/gateway/configs',
31)
32
33
34class FreedomFiOneTests(EnodebHandlerTestCase):
35
36 def _get_freedomfi_one_read_only_param_values_response(
37 self,
38 ) -> models.GetParameterValuesResponse:
39 msg = models.GetParameterValuesResponse()
40 param_val_list = []
41 param_val_list.append(
42 Tr069MessageBuilder.get_parameter_value_struct(
43 name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
44 '.X_000E8F_Sync_Status',
45 val_type='string',
46 data='InSync',
47 ),
48 )
49 param_val_list.append(
50 Tr069MessageBuilder.get_parameter_value_struct(
51 name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
52 '.X_000E8F_SAS_Status',
53 val_type='string',
54 data='SUCCESS',
55 ),
56 )
57 param_val_list.append(
58 Tr069MessageBuilder.get_parameter_value_struct(
59 name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
60 '.X_000E8F_eNB_Status',
61 val_type='string',
62 data='SUCCESS',
63 ),
64 )
65 param_val_list.append(
66 Tr069MessageBuilder.get_parameter_value_struct(
67 name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
68 '.X_000E8F_DEFGW_Status',
69 val_type='string',
70 data='SUCCESS',
71 ),
72 )
73 param_val_list.append(
74 Tr069MessageBuilder.get_parameter_value_struct(
75 name='Device.FAP.GPS.ScanStatus',
76 val_type='string',
77 data='SUCCESS',
78 ),
79 )
80 param_val_list.append(
81 Tr069MessageBuilder.get_parameter_value_struct(
82 name='Device.FAP.GPS.LockedLongitude',
83 val_type='int',
84 data='-105272892',
85 ),
86 )
87 param_val_list.append(
88 Tr069MessageBuilder.get_parameter_value_struct(
89 name='Device.FAP.GPS.LockedLatitude',
90 val_type='int',
91 data='40019606',
92 ),
93 )
94 msg.ParameterList = models.ParameterValueList()
95 msg.ParameterList.ParameterValueStruct = param_val_list
96 return msg
97
98 def _get_freedomfi_one_param_values_response(self):
99 msg = models.GetParameterValuesResponse()
100 param_val_list = []
101 msg.ParameterList = models.ParameterValueList()
102 msg.ParameterList.ParameterValueStruct = param_val_list
103
104 param_val_list.append(
105 Tr069MessageBuilder.get_parameter_value_struct(
106 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.EARFCNDL',
107 val_type='int',
108 data='56240',
109 ),
110 )
111 param_val_list.append(
112 Tr069MessageBuilder.get_parameter_value_struct(
113 name='Device.FAP.GPS.ScanOnBoot',
114 val_type='boolean',
115 data='1',
116 ),
117 )
118 param_val_list.append(
119 Tr069MessageBuilder.get_parameter_value_struct(
120 name='Device.Services.FAPService.1.FAPControl.LTE.AdminState',
121 val_type='boolean',
122 data='1',
123 ),
124 )
125 param_val_list.append(
126 Tr069MessageBuilder.get_parameter_value_struct(
127 name='Device.FAP.PerfMgmt.Config.1.Enable',
128 val_type='boolean',
129 data='1',
130 ),
131 )
132 param_val_list.append(
133 Tr069MessageBuilder.get_parameter_value_struct(
134 name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkServerList',
135 val_type='string',
136 data='10.0.2.1',
137 ),
138 )
139 param_val_list.append(
140 Tr069MessageBuilder.get_parameter_value_struct(
141 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_Cell_Number',
142 val_type='int',
143 data='2',
144 ),
145 )
146 param_val_list.append(
147 Tr069MessageBuilder.get_parameter_value_struct(
148 name='Device.Services.FAPService.1.CellConfig.LTE.EPC.TAC',
149 val_type='int',
150 data='1',
151 ),
152 )
153 param_val_list.append(
154 Tr069MessageBuilder.get_parameter_value_struct(
155 name='Device.FAP.PerfMgmt.Config.1.PeriodicUploadInterval',
156 val_type='int',
157 data='60',
158 ),
159 )
160 param_val_list.append(
161 Tr069MessageBuilder.get_parameter_value_struct(
162 name='Device.DeviceInfo.SoftwareVersion',
163 val_type='string',
164 data='TEST3920@210901',
165 ),
166 )
167 param_val_list.append(
168 Tr069MessageBuilder.get_parameter_value_struct(
169 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.UserContactInformation',
170 val_type='string',
171 data='M0LK4T',
172 ),
173 )
174 param_val_list.append(
175 Tr069MessageBuilder.get_parameter_value_struct(
176 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.ProtectionLevel',
177 val_type='string',
178 data='GAA',
179 ),
180 )
181 param_val_list.append(
182 Tr069MessageBuilder.get_parameter_value_struct(
183 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CertSubject',
184 val_type='string',
185 data='/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
186 ),
187 )
188 param_val_list.append(
189 Tr069MessageBuilder.get_parameter_value_struct(
190 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.HeightType',
191 val_type='string',
192 data='AMSL',
193 ),
194 )
195 param_val_list.append(
196 Tr069MessageBuilder.get_parameter_value_struct(
197 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Category',
198 val_type='string',
199 data='A',
200 ),
201 )
202 param_val_list.append(
203 Tr069MessageBuilder.get_parameter_value_struct(
204 name='Device.FAP.GPS.ScanStatus',
205 val_type='string',
206 data='Success',
207 ),
208 )
209 param_val_list.append(
210 Tr069MessageBuilder.get_parameter_value_struct(
211 name='Device.ManagementServer.PeriodicInformInterval',
212 val_type='int',
213 data='60',
214 ),
215 )
216 param_val_list.append(
217 Tr069MessageBuilder.get_parameter_value_struct(
218 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.FreqBandIndicator',
219 val_type='unsignedInt',
220 data='48',
221 ),
222 )
223 param_val_list.append(
224 Tr069MessageBuilder.get_parameter_value_struct(
225 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.Common.CellIdentity',
226 val_type='unsignedInt',
227 data='101',
228 ),
229 )
230 param_val_list.append(
231 Tr069MessageBuilder.get_parameter_value_struct(
232 name='Device.FAP.GPS.LockedLongitude',
233 val_type='string',
234 data='-105272892',
235 ),
236 )
237 param_val_list.append(
238 Tr069MessageBuilder.get_parameter_value_struct(
239 name='Device.FAP.GPS.LockedLatitude',
240 val_type='string',
241 data='40019606',
242 ),
243 )
244 param_val_list.append(
245 Tr069MessageBuilder.get_parameter_value_struct(
246 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CPIEnable',
247 val_type='boolean',
248 data='0',
249 ),
250 )
251 param_val_list.append(
252 Tr069MessageBuilder.get_parameter_value_struct(
253 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CA_Enable',
254 val_type='boolean',
255 data='1',
256 ),
257 )
258 param_val_list.append(
259 Tr069MessageBuilder.get_parameter_value_struct(
260 name='Device.FAP.GPS.ScanOnBoot',
261 val_type='boolean',
262 data='1',
263 ),
264 )
265 param_val_list.append(
266 Tr069MessageBuilder.get_parameter_value_struct(
267 name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus.X_000E8F_Sync_Status',
268 val_type='string',
269 data='InSync',
270 ),
271 )
272 param_val_list.append(
273 Tr069MessageBuilder.get_parameter_value_struct(
274 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.PhyCellID',
275 val_type='string',
276 data='101,102',
277 ),
278 )
279 param_val_list.append(
280 Tr069MessageBuilder.get_parameter_value_struct(
281 name='Device.FAP.PerfMgmt.Config.1.URL',
282 val_type='string',
283 data=None,
284 ),
285 )
286 param_val_list.append(
287 Tr069MessageBuilder.get_parameter_value_struct(
288 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Location',
289 val_type='string',
290 data='indoor',
291 ),
292 )
293 param_val_list.append(
294 Tr069MessageBuilder.get_parameter_value_struct(
295 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment',
296 val_type='boolean',
297 data='2',
298 ),
299 )
300 param_val_list.append(
301 Tr069MessageBuilder.get_parameter_value_struct(
302 name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.IsPrimary',
303 val_type='boolean',
304 data='1',
305 ),
306 )
307 param_val_list.append(
308 Tr069MessageBuilder.get_parameter_value_struct(
309 name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.Enable',
310 val_type='boolean',
311 data='1',
312 ),
313 )
314 param_val_list.append(
315 Tr069MessageBuilder.get_parameter_value_struct(
316 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Server',
317 val_type='string',
318 data='https://spectrum-connect.federatedwireless.com/v1.2/',
319 ),
320 )
321 param_val_list.append(
322 Tr069MessageBuilder.get_parameter_value_struct(
323 name='Device.X_000E8F_DeviceFeature.X_000E8F_WebServerEnable',
324 val_type='boolean',
325 data='1',
326 ),
327 )
328 param_val_list.append(
329 Tr069MessageBuilder.get_parameter_value_struct(
330 name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.CellReservedForOperatorUse',
331 val_type='boolean',
332 data='0',
333 ),
334 )
335 param_val_list.append(
336 Tr069MessageBuilder.get_parameter_value_struct(
337 name='Device.Services.FAPService.1.CellConfig.LTE.Tunnel.1.TunnelRef',
338 val_type='string',
339 data='Device.IP.Interface.1.IPv4Address.1.',
340 ),
341 )
342 param_val_list.append(
343 Tr069MessageBuilder.get_parameter_value_struct(
344 name='Device.Services.FAPService.1.REM.X_000E8F_tfcsManagerConfig.primSrc',
345 val_type='string',
346 data='GNSS',
347 ),
348 )
349 param_val_list.append(
350 Tr069MessageBuilder.get_parameter_value_struct(
351 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Enable',
352 val_type='boolean',
353 data='1',
354 ),
355 )
356 param_val_list.append(
357 Tr069MessageBuilder.get_parameter_value_struct(
358 name='Device.ManagementServer.PeriodicInformEnable',
359 val_type='boolean',
360 data='1',
361 ),
362 )
363 param_val_list.append(
364 Tr069MessageBuilder.get_parameter_value_struct(
365 name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNListNumberOfEntries',
366 val_type='int',
367 data='1',
368 ),
369 )
370 param_val_list.append(
371 Tr069MessageBuilder.get_parameter_value_struct(
372 name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns',
373 val_type='int',
374 data='7',
375 ),
376 )
377 param_val_list.append(
378 Tr069MessageBuilder.get_parameter_value_struct(
379 name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CELL_Freq_Contiguous',
380 val_type='int',
381 data='0',
382 ),
383 )
384 param_val_list.append(
385 Tr069MessageBuilder.get_parameter_value_struct(
386 name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkPort',
387 val_type='int',
388 data='36412',
389 ),
390 )
391 return msg
392
393 def _get_service_config(self):
394 return {
395 "tr069": {
396 "interface": "eth1",
397 "port": 48080,
398 "perf_mgmt_port": 8081,
399 "public_ip": "192.88.99.142",
400 },
401 "reboot_enodeb_on_mme_disconnected": True,
402 "s1_interface": "eth1",
403 "sas": {
404 "sas_enabled": True,
405 "sas_server_url":
406 "https://spectrum-connect.federatedwireless.com/v1.2/",
407 "sas_uid": "M0LK4T",
408 "sas_category": "A",
409 "sas_channel_type": "GAA",
410 "sas_cert_subject": "/C=TW/O=Sercomm/OU=WInnForum CBSD "
411 "Certificate/CN=P27-SCE4255W:%s",
412 "sas_icg_group_id": "",
413 "sas_location": "indoor",
414 "sas_height_type": "AMSL",
415 },
416 }
417
418 def build_freedomfi_one_acs_state_machine(self):
419 service = EnodebAcsStateMachineBuilder.build_magma_service(
420 mconfig=EnodebConfigBuilder.get_mconfig(),
421 service_config=self._get_service_config(),
422 )
423 handler_class = get_device_handler_from_name(
424 EnodebDeviceName.FREEDOMFI_ONE,
425 )
426 acs_state_machine = handler_class(service)
427 return acs_state_machine
428
429 def test_provision(self) -> None:
430 """
431 Test the basic provisioning workflow
432 1 - enodeb sends Inform, enodebd sends InformResponse
433 2 - enodeb sends empty HTTP message,
434 3 - enodebd sends get transient params, updates the device state.
435 4 - enodebd sends get param values, updates the device state
436 5 - enodebd, sets fields including SAS fields.
437 """
438 logging.root.level = logging.DEBUG
439 acs_state_machine = self.build_freedomfi_one_acs_state_machine()
440
441 inform = Tr069MessageBuilder.get_inform(
442 oui="000E8F",
443 sw_version="TEST3920@210901",
444 enb_serial="2006CW5000023",
445 )
446 resp = acs_state_machine.handle_tr069_message(inform)
447 self.assertTrue(
448 isinstance(resp, models.InformResponse),
449 'Should respond with an InformResponse',
450 )
451
452 # Send an empty http request
453 req = models.DummyInput()
454 resp = acs_state_machine.handle_tr069_message(req)
455
456 # Expect a request for read-only params
457 self.assertTrue(
458 isinstance(resp, models.GetParameterValues),
459 'State machine should be requesting param values',
460 )
461 for tr_69_nodes in StatusParameters.STATUS_PARAMETERS.values():
462 self.assertIn(tr_69_nodes.path, resp.ParameterNames.string)
463
464 req = self._get_freedomfi_one_read_only_param_values_response()
465 get_resp = acs_state_machine.handle_tr069_message(req)
466
467 self.assertTrue(
468 isinstance(get_resp, models.GetParameterValues),
469 'State machine should be requesting param values',
470 )
471 req = self._get_freedomfi_one_param_values_response()
472 resp = acs_state_machine.handle_tr069_message(req)
473 self.assertTrue(
474 isinstance(resp, models.SetParameterValues),
475 'State machine should be setting parameters',
476 )
477 self.assertIsNotNone(
478 resp.ParameterKey.Data,
479 'ParameterKey should be set for FreedomFiOne eNB',
480 )
481
482 msg = models.SetParameterValuesResponse()
483 msg.Status = 1
484 get_resp = acs_state_machine.handle_tr069_message(msg)
485 self.assertTrue(
486 isinstance(get_resp, models.GetParameterValues),
487 'We should read back all parameters',
488 )
489
490 req = self._get_freedomfi_one_param_values_response()
491 resp = acs_state_machine.handle_tr069_message(req)
492 self.assertTrue(
493 isinstance(resp, models.DummyInput),
494 'Provisioning completed with Dummy response',
495 )
496
497 def test_manual_reboot_during_provisioning(self) -> None:
498 """
499 Test a scenario where a Magma user goes through the enodebd CLI to
500 reboot the Baicells eNodeB.
501
502 This checks the scenario where the command is sent in the middle
503 of a TR-069 provisioning session.
504 """
505 logging.root.level = logging.DEBUG
506 acs_state_machine = self.build_freedomfi_one_acs_state_machine()
507
508 # Send an Inform message, wait for an InformResponse
509 inform = Tr069MessageBuilder.get_inform(
510 oui="000E8F",
511 sw_version="TEST3920@210901",
512 enb_serial="2006CW5000023",
513 )
514 resp = acs_state_machine.handle_tr069_message(inform)
515 self.assertTrue(
516 isinstance(resp, models.InformResponse),
517 'Should respond with an InformResponse',
518 )
519
520 # Send an empty http request to kick off the rest of provisioning
521 req = models.DummyInput()
522 resp = acs_state_machine.handle_tr069_message(req)
523
524 # Expect a request for an optional parameter, three times
525 self.assertTrue(
526 isinstance(resp, models.GetParameterValues),
527 'State machine should be requesting param values',
528 )
529 req = Tr069MessageBuilder.get_fault()
530
531 # User uses the CLI tool to get eNodeB to reboot
532 acs_state_machine.reboot_asap()
533
534 resp = acs_state_machine.handle_tr069_message(req)
535 self.assertTrue(
536 isinstance(resp, models.Reboot),
537 'In reboot sequence, state machine should send a '
538 'Reboot message.',
539 )
540 req = Tr069MessageBuilder.get_reboot_response()
541 resp = acs_state_machine.handle_tr069_message(req)
542 self.assertTrue(
543 isinstance(resp, models.DummyInput),
544 'State machine should end TR-069 session after '
545 'receiving a RebootResponse',
546 )
547
548 def test_post_processing(self) -> None:
549 """ Test FreedomFi One specific post processing functionality"""
550
551 acs_state_machine = self.build_freedomfi_one_acs_state_machine()
552 cfg_desired = Mock()
553 acs_state_machine.device_cfg.set_parameter(
554 ParameterName.SERIAL_NUMBER,
555 "2006CW5000023",
556 )
557
558 cfg_init = FreedomFiOneConfigurationInitializer(acs_state_machine)
559 cfg_init.postprocess(
560 EnodebConfigBuilder.get_mconfig(),
561 self._get_service_config(), cfg_desired,
562 )
563 expected = [
564 call.delete_parameter('EARFCNDL'),
565 call.delete_parameter('DL bandwidth'),
566 call.delete_parameter('UL bandwidth'),
567 call.set_parameter(
568 'tunnel_ref',
569 'Device.IP.Interface.1.IPv4Address.1.',
570 ),
571 call.set_parameter('prim_src', 'GNSS'),
572 call.set_parameter('carrier_agg_enable', True),
573 call.set_parameter('carrier_number', 2),
574 call.set_parameter('contiguous_cc', 0),
575 call.set_parameter('web_ui_enable', False),
576 call.set_parameter('sas_enabled', True),
577 call.set_parameter(
578 'sas_server_url',
579 'https://spectrum-connect.federatedwireless.com/v1.2/',
580 ),
581 call.set_parameter('sas_uid', 'M0LK4T'),
582 call.set_parameter('sas_category', 'A'),
583 call.set_parameter('sas_channel_type', 'GAA'),
584 call.set_parameter(
585 'sas_cert_subject',
586 '/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
587 ),
588 call.set_parameter('sas_location', 'indoor'),
589 call.set_parameter('sas_height_type', 'AMSL'),
590 ]
591 self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
592
593 # Check without sas config
594 service_cfg = {
595 "tr069": {
596 "interface": "eth1",
597 "port": 48080,
598 "perf_mgmt_port": 8081,
599 "public_ip": "192.88.99.142",
600 },
601 "reboot_enodeb_on_mme_disconnected": True,
602 "s1_interface": "eth1",
603 }
604 cfg_desired = Mock()
605 cfg_init.postprocess(
606 EnodebConfigBuilder.get_mconfig(),
607 service_cfg, cfg_desired,
608 )
609 expected = [
610 call.delete_parameter('EARFCNDL'),
611 call.delete_parameter('DL bandwidth'),
612 call.delete_parameter('UL bandwidth'),
613 call.set_parameter(
614 'tunnel_ref',
615 'Device.IP.Interface.1.IPv4Address.1.',
616 ),
617 call.set_parameter('prim_src', 'GNSS'),
618 call.set_parameter('carrier_agg_enable', True),
619 call.set_parameter('carrier_number', 2),
620 call.set_parameter('contiguous_cc', 0),
621 call.set_parameter('web_ui_enable', False),
622 ]
623 self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
624
625 service_cfg['web_ui_enable_list'] = ["2006CW5000023"]
626
627 expected = [
628 call.delete_parameter('EARFCNDL'),
629 call.delete_parameter('DL bandwidth'),
630 call.delete_parameter('UL bandwidth'),
631 call.set_parameter(
632 'tunnel_ref',
633 'Device.IP.Interface.1.IPv4Address.1.',
634 ),
635 call.set_parameter('prim_src', 'GNSS'),
636 call.set_parameter('carrier_agg_enable', True),
637 call.set_parameter('carrier_number', 2),
638 call.set_parameter('contiguous_cc', 0),
639 call.set_parameter('web_ui_enable', False),
640 call.set_parameter('web_ui_enable', True),
641 ]
642 cfg_desired = Mock()
643 cfg_init.postprocess(
644 EnodebConfigBuilder.get_mconfig(),
645 service_cfg, cfg_desired,
646 )
647 print(cfg_desired.mock_calls)
648 print(type(cfg_desired.mock_calls))
649 self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
650
651 @patch('magma.configuration.service_configs.CONFIG_DIR', SRC_CONFIG_DIR)
652 def test_service_cfg_parsing(self):
653 """ Test the parsing of the service config file for enodebd.yml"""
654 service = MagmaService('enodebd', mconfigs_pb2.EnodebD())
655 service_cfg = service.config
656 service_cfg_1 = self._get_service_config()
657 service_cfg_1['web_ui_enable_list'] = []
658 service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
659 SASParameters.SAS_UID
660 ] = "INVALID_ID"
661 service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
662 SASParameters.SAS_CERT_SUBJECT
663 ] = "INVALID_CERT_SUBJECT"
664 self.assertEqual(service_cfg, service_cfg_1)
665
666 def test_status_nodes(self):
667 """ Test that the status of the node is valid"""
668 status = StatusParameters()
669
670 # Happy path
671 n1 = {
672 StatusParameters.DEFAULT_GW: "SUCCESS",
673 StatusParameters.SYNC_STATUS: "InSync",
674 StatusParameters.ENB_STATUS: "Success",
675 StatusParameters.SAS_STATUS: "Success",
676 StatusParameters.GPS_SCAN_STATUS: "SUCCESS",
677 ParameterName.GPS_LONG: "1",
678 ParameterName.GPS_LAT: "1",
679 }
680 device_config = Mock()
681 status.set_magma_device_cfg(n1, device_config)
682 expected = [
683 call.set_parameter(param_name='RF TX status', value=True),
684 call.set_parameter(param_name='GPS status', value=True),
685 call.set_parameter(param_name='PTP status', value=True),
686 call.set_parameter(param_name='MME status', value=True),
687 call.set_parameter(param_name='Opstate', value=True),
688 call.set_parameter('GPS lat', '1'),
689 call.set_parameter('GPS long', '1'),
690 ]
691 self.assertEqual(expected, device_config.mock_calls)
692
693 n2 = n1.copy()
694 # Verify we can handle specific none params
695 n2[StatusParameters.DEFAULT_GW] = None
696 n3 = n1.copy()
697 n3[StatusParameters.SYNC_STATUS] = None
698 n4 = n1.copy()
699 n4[StatusParameters.ENB_STATUS] = None
700 n5 = n1.copy()
701 n5[StatusParameters.SAS_STATUS] = None
702 n6 = n1.copy()
703 n6[StatusParameters.GPS_SCAN_STATUS] = None
704 n7 = n1.copy()
705 n7[ParameterName.GPS_LONG] = None
706 n8 = n1.copy()
707 n8[ParameterName.GPS_LAT] = None
708
709 device_config = Mock()
710 expected = [
711 call.set_parameter(param_name='RF TX status', value=True),
712 call.set_parameter(param_name='GPS status', value=True),
713 call.set_parameter(param_name='PTP status', value=True),
714 call.set_parameter(param_name='MME status', value=False),
715 call.set_parameter(param_name='Opstate', value=True),
716 call.set_parameter('GPS lat', '1'),
717 call.set_parameter('GPS long', '1'),
718 ]
719 status.set_magma_device_cfg(n2, device_config)
720 self.assertEqual(expected, device_config.mock_calls)
721
722 device_config = Mock()
723 expected = [
724 call.set_parameter(param_name='RF TX status', value=True),
725 call.set_parameter(param_name='GPS status', value=True),
726 call.set_parameter(param_name='PTP status', value=False),
727 call.set_parameter(param_name='MME status', value=True),
728 call.set_parameter(param_name='Opstate', value=True),
729 call.set_parameter('GPS lat', '1'),
730 call.set_parameter('GPS long', '1'),
731 ]
732 status.set_magma_device_cfg(n3, device_config)
733 self.assertEqual(expected, device_config.mock_calls)
734
735 device_config = Mock()
736 expected = [
737 call.set_parameter(param_name='RF TX status', value=True),
738 call.set_parameter(param_name='GPS status', value=True),
739 call.set_parameter(param_name='PTP status', value=True),
740 call.set_parameter(param_name='MME status', value=True),
741 call.set_parameter(param_name='Opstate', value=False),
742 call.set_parameter('GPS lat', '1'),
743 call.set_parameter('GPS long', '1'),
744 ]
745 status.set_magma_device_cfg(n4, device_config)
746 self.assertEqual(expected, device_config.mock_calls)
747
748 device_config = Mock()
749 expected = [
750 call.set_parameter(param_name='RF TX status', value=False),
751 call.set_parameter(param_name='GPS status', value=True),
752 call.set_parameter(param_name='PTP status', value=True),
753 call.set_parameter(param_name='MME status', value=True),
754 call.set_parameter(param_name='Opstate', value=True),
755 call.set_parameter('GPS lat', '1'),
756 call.set_parameter('GPS long', '1'),
757 ]
758 status.set_magma_device_cfg(n5, device_config)
759 self.assertEqual(expected, device_config.mock_calls)
760
761 device_config = Mock()
762 expected = [
763 call.set_parameter(param_name='RF TX status', value=True),
764 call.set_parameter(param_name='GPS status', value=False),
765 call.set_parameter(param_name='PTP status', value=False),
766 call.set_parameter(param_name='MME status', value=True),
767 call.set_parameter(param_name='Opstate', value=True),
768 call.set_parameter('GPS lat', '1'),
769 call.set_parameter('GPS long', '1'),
770 ]
771 status.set_magma_device_cfg(n6, device_config)
772 self.assertEqual(expected, device_config.mock_calls)
773
774 device_config = Mock()
775 expected = [
776 call.set_parameter(param_name='RF TX status', value=True),
777 call.set_parameter(param_name='GPS status', value=True),
778 call.set_parameter(param_name='PTP status', value=True),
779 call.set_parameter(param_name='MME status', value=True),
780 call.set_parameter(param_name='Opstate', value=True),
781 call.set_parameter('GPS lat', '1'),
782 call.set_parameter('GPS long', None),
783 ]
784 status.set_magma_device_cfg(n7, device_config)
785 self.assertEqual(expected, device_config.mock_calls)
786
787 device_config = Mock()
788 expected = [
789 call.set_parameter(param_name='RF TX status', value=True),
790 call.set_parameter(param_name='GPS status', value=True),
791 call.set_parameter(param_name='PTP status', value=True),
792 call.set_parameter(param_name='MME status', value=True),
793 call.set_parameter(param_name='Opstate', value=True),
794 call.set_parameter('GPS lat', None),
795 call.set_parameter('GPS long', '1'),
796 ]
797 status.set_magma_device_cfg(n8, device_config)
798 self.assertEqual(expected, device_config.mock_calls)