Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/tests/freedomfi_one_tests.py b/tests/freedomfi_one_tests.py
new file mode 100644
index 0000000..4d88496
--- /dev/null
+++ b/tests/freedomfi_one_tests.py
@@ -0,0 +1,805 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+import os
+from unittest.mock import Mock, call, patch
+
+from lte.protos.mconfig import mconfigs_pb2
+from common.service import MagmaService
+from data_models.data_model_parameters import ParameterName
+from devices.device_map import get_device_handler_from_name
+from devices.device_utils import EnodebDeviceName
+from devices.freedomfi_one import (
+ FreedomFiOneConfigurationInitializer,
+ SASParameters,
+ StatusParameters,
+)
+from tests.test_utils.config_builder import EnodebConfigBuilder
+from tests.test_utils.enb_acs_builder import (
+ EnodebAcsStateMachineBuilder,
+)
+from tests.test_utils.enodeb_handler import EnodebHandlerTestCase
+from tests.test_utils.tr069_msg_builder import Tr069MessageBuilder
+from tr069 import models
+
+SRC_CONFIG_DIR = os.path.join(
+ os.environ.get('MAGMA_ROOT'),
+ 'lte/gateway/configs',
+)
+
+
+class FreedomFiOneTests(EnodebHandlerTestCase):
+
+ def _get_freedomfi_one_read_only_param_values_response(
+ self,
+ ) -> models.GetParameterValuesResponse:
+ msg = models.GetParameterValuesResponse()
+ param_val_list = []
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
+ '.X_000E8F_Sync_Status',
+ val_type='string',
+ data='InSync',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
+ '.X_000E8F_SAS_Status',
+ val_type='string',
+ data='SUCCESS',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
+ '.X_000E8F_eNB_Status',
+ val_type='string',
+ data='SUCCESS',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
+ '.X_000E8F_DEFGW_Status',
+ val_type='string',
+ data='SUCCESS',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.ScanStatus',
+ val_type='string',
+ data='SUCCESS',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.LockedLongitude',
+ val_type='int',
+ data='-105272892',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.LockedLatitude',
+ val_type='int',
+ data='40019606',
+ ),
+ )
+ msg.ParameterList = models.ParameterValueList()
+ msg.ParameterList.ParameterValueStruct = param_val_list
+ return msg
+
+ def _get_freedomfi_one_param_values_response(self):
+ msg = models.GetParameterValuesResponse()
+ param_val_list = []
+ msg.ParameterList = models.ParameterValueList()
+ msg.ParameterList.ParameterValueStruct = param_val_list
+
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.EARFCNDL',
+ val_type='int',
+ data='56240',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.ScanOnBoot',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.AdminState',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.PerfMgmt.Config.1.Enable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkServerList',
+ val_type='string',
+ data='10.0.2.1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_Cell_Number',
+ val_type='int',
+ data='2',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.EPC.TAC',
+ val_type='int',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.PerfMgmt.Config.1.PeriodicUploadInterval',
+ val_type='int',
+ data='60',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.DeviceInfo.SoftwareVersion',
+ val_type='string',
+ data='TEST3920@210901',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.UserContactInformation',
+ val_type='string',
+ data='M0LK4T',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.ProtectionLevel',
+ val_type='string',
+ data='GAA',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CertSubject',
+ val_type='string',
+ data='/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.HeightType',
+ val_type='string',
+ data='AMSL',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Category',
+ val_type='string',
+ data='A',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.ScanStatus',
+ val_type='string',
+ data='Success',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.ManagementServer.PeriodicInformInterval',
+ val_type='int',
+ data='60',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.FreqBandIndicator',
+ val_type='unsignedInt',
+ data='48',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.Common.CellIdentity',
+ val_type='unsignedInt',
+ data='101',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.LockedLongitude',
+ val_type='string',
+ data='-105272892',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.LockedLatitude',
+ val_type='string',
+ data='40019606',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CPIEnable',
+ val_type='boolean',
+ data='0',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CA_Enable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.GPS.ScanOnBoot',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus.X_000E8F_Sync_Status',
+ val_type='string',
+ data='InSync',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.PhyCellID',
+ val_type='string',
+ data='101,102',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.FAP.PerfMgmt.Config.1.URL',
+ val_type='string',
+ data=None,
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Location',
+ val_type='string',
+ data='indoor',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment',
+ val_type='boolean',
+ data='2',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.IsPrimary',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.Enable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Server',
+ val_type='string',
+ data='https://spectrum-connect.federatedwireless.com/v1.2/',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.X_000E8F_DeviceFeature.X_000E8F_WebServerEnable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.CellReservedForOperatorUse',
+ val_type='boolean',
+ data='0',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.Tunnel.1.TunnelRef',
+ val_type='string',
+ data='Device.IP.Interface.1.IPv4Address.1.',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.REM.X_000E8F_tfcsManagerConfig.primSrc',
+ val_type='string',
+ data='GNSS',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Enable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.ManagementServer.PeriodicInformEnable',
+ val_type='boolean',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNListNumberOfEntries',
+ val_type='int',
+ data='1',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns',
+ val_type='int',
+ data='7',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CELL_Freq_Contiguous',
+ val_type='int',
+ data='0',
+ ),
+ )
+ param_val_list.append(
+ Tr069MessageBuilder.get_parameter_value_struct(
+ name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkPort',
+ val_type='int',
+ data='36412',
+ ),
+ )
+ return msg
+
+ def _get_service_config(self):
+ return {
+ "tr069": {
+ "interface": "eth1",
+ "port": 48080,
+ "perf_mgmt_port": 8081,
+ "public_ip": "192.88.99.142",
+ },
+ "reboot_enodeb_on_mme_disconnected": True,
+ "s1_interface": "eth1",
+ "sas": {
+ "sas_enabled": True,
+ "sas_server_url":
+ "https://spectrum-connect.federatedwireless.com/v1.2/",
+ "sas_uid": "M0LK4T",
+ "sas_category": "A",
+ "sas_channel_type": "GAA",
+ "sas_cert_subject": "/C=TW/O=Sercomm/OU=WInnForum CBSD "
+ "Certificate/CN=P27-SCE4255W:%s",
+ "sas_icg_group_id": "",
+ "sas_location": "indoor",
+ "sas_height_type": "AMSL",
+ },
+ }
+
+ def build_freedomfi_one_acs_state_machine(self):
+ service = EnodebAcsStateMachineBuilder.build_magma_service(
+ mconfig=EnodebConfigBuilder.get_mconfig(),
+ service_config=self._get_service_config(),
+ )
+ handler_class = get_device_handler_from_name(
+ EnodebDeviceName.FREEDOMFI_ONE,
+ )
+ acs_state_machine = handler_class(service)
+ return acs_state_machine
+
+ def test_provision(self) -> None:
+ """
+ Test the basic provisioning workflow
+ 1 - enodeb sends Inform, enodebd sends InformResponse
+ 2 - enodeb sends empty HTTP message,
+ 3 - enodebd sends get transient params, updates the device state.
+ 4 - enodebd sends get param values, updates the device state
+ 5 - enodebd, sets fields including SAS fields.
+ """
+ logging.root.level = logging.DEBUG
+ acs_state_machine = self.build_freedomfi_one_acs_state_machine()
+
+ inform = Tr069MessageBuilder.get_inform(
+ oui="000E8F",
+ sw_version="TEST3920@210901",
+ enb_serial="2006CW5000023",
+ )
+ resp = acs_state_machine.handle_tr069_message(inform)
+ self.assertTrue(
+ isinstance(resp, models.InformResponse),
+ 'Should respond with an InformResponse',
+ )
+
+ # Send an empty http request
+ req = models.DummyInput()
+ resp = acs_state_machine.handle_tr069_message(req)
+
+ # Expect a request for read-only params
+ self.assertTrue(
+ isinstance(resp, models.GetParameterValues),
+ 'State machine should be requesting param values',
+ )
+ for tr_69_nodes in StatusParameters.STATUS_PARAMETERS.values():
+ self.assertIn(tr_69_nodes.path, resp.ParameterNames.string)
+
+ req = self._get_freedomfi_one_read_only_param_values_response()
+ get_resp = acs_state_machine.handle_tr069_message(req)
+
+ self.assertTrue(
+ isinstance(get_resp, models.GetParameterValues),
+ 'State machine should be requesting param values',
+ )
+ req = self._get_freedomfi_one_param_values_response()
+ resp = acs_state_machine.handle_tr069_message(req)
+ self.assertTrue(
+ isinstance(resp, models.SetParameterValues),
+ 'State machine should be setting parameters',
+ )
+ self.assertIsNotNone(
+ resp.ParameterKey.Data,
+ 'ParameterKey should be set for FreedomFiOne eNB',
+ )
+
+ msg = models.SetParameterValuesResponse()
+ msg.Status = 1
+ get_resp = acs_state_machine.handle_tr069_message(msg)
+ self.assertTrue(
+ isinstance(get_resp, models.GetParameterValues),
+ 'We should read back all parameters',
+ )
+
+ req = self._get_freedomfi_one_param_values_response()
+ resp = acs_state_machine.handle_tr069_message(req)
+ self.assertTrue(
+ isinstance(resp, models.DummyInput),
+ 'Provisioning completed with Dummy response',
+ )
+
+ def test_manual_reboot_during_provisioning(self) -> None:
+ """
+ Test a scenario where a Magma user goes through the enodebd CLI to
+ reboot the Baicells eNodeB.
+
+ This checks the scenario where the command is sent in the middle
+ of a TR-069 provisioning session.
+ """
+ logging.root.level = logging.DEBUG
+ acs_state_machine = self.build_freedomfi_one_acs_state_machine()
+
+ # Send an Inform message, wait for an InformResponse
+ inform = Tr069MessageBuilder.get_inform(
+ oui="000E8F",
+ sw_version="TEST3920@210901",
+ enb_serial="2006CW5000023",
+ )
+ resp = acs_state_machine.handle_tr069_message(inform)
+ self.assertTrue(
+ isinstance(resp, models.InformResponse),
+ 'Should respond with an InformResponse',
+ )
+
+ # Send an empty http request to kick off the rest of provisioning
+ req = models.DummyInput()
+ resp = acs_state_machine.handle_tr069_message(req)
+
+ # Expect a request for an optional parameter, three times
+ self.assertTrue(
+ isinstance(resp, models.GetParameterValues),
+ 'State machine should be requesting param values',
+ )
+ req = Tr069MessageBuilder.get_fault()
+
+ # User uses the CLI tool to get eNodeB to reboot
+ acs_state_machine.reboot_asap()
+
+ resp = acs_state_machine.handle_tr069_message(req)
+ self.assertTrue(
+ isinstance(resp, models.Reboot),
+ 'In reboot sequence, state machine should send a '
+ 'Reboot message.',
+ )
+ req = Tr069MessageBuilder.get_reboot_response()
+ resp = acs_state_machine.handle_tr069_message(req)
+ self.assertTrue(
+ isinstance(resp, models.DummyInput),
+ 'State machine should end TR-069 session after '
+ 'receiving a RebootResponse',
+ )
+
+ def test_post_processing(self) -> None:
+ """ Test FreedomFi One specific post processing functionality"""
+
+ acs_state_machine = self.build_freedomfi_one_acs_state_machine()
+ cfg_desired = Mock()
+ acs_state_machine.device_cfg.set_parameter(
+ ParameterName.SERIAL_NUMBER,
+ "2006CW5000023",
+ )
+
+ cfg_init = FreedomFiOneConfigurationInitializer(acs_state_machine)
+ cfg_init.postprocess(
+ EnodebConfigBuilder.get_mconfig(),
+ self._get_service_config(), cfg_desired,
+ )
+ expected = [
+ call.delete_parameter('EARFCNDL'),
+ call.delete_parameter('DL bandwidth'),
+ call.delete_parameter('UL bandwidth'),
+ call.set_parameter(
+ 'tunnel_ref',
+ 'Device.IP.Interface.1.IPv4Address.1.',
+ ),
+ call.set_parameter('prim_src', 'GNSS'),
+ call.set_parameter('carrier_agg_enable', True),
+ call.set_parameter('carrier_number', 2),
+ call.set_parameter('contiguous_cc', 0),
+ call.set_parameter('web_ui_enable', False),
+ call.set_parameter('sas_enabled', True),
+ call.set_parameter(
+ 'sas_server_url',
+ 'https://spectrum-connect.federatedwireless.com/v1.2/',
+ ),
+ call.set_parameter('sas_uid', 'M0LK4T'),
+ call.set_parameter('sas_category', 'A'),
+ call.set_parameter('sas_channel_type', 'GAA'),
+ call.set_parameter(
+ 'sas_cert_subject',
+ '/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
+ ),
+ call.set_parameter('sas_location', 'indoor'),
+ call.set_parameter('sas_height_type', 'AMSL'),
+ ]
+ self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
+
+ # Check without sas config
+ service_cfg = {
+ "tr069": {
+ "interface": "eth1",
+ "port": 48080,
+ "perf_mgmt_port": 8081,
+ "public_ip": "192.88.99.142",
+ },
+ "reboot_enodeb_on_mme_disconnected": True,
+ "s1_interface": "eth1",
+ }
+ cfg_desired = Mock()
+ cfg_init.postprocess(
+ EnodebConfigBuilder.get_mconfig(),
+ service_cfg, cfg_desired,
+ )
+ expected = [
+ call.delete_parameter('EARFCNDL'),
+ call.delete_parameter('DL bandwidth'),
+ call.delete_parameter('UL bandwidth'),
+ call.set_parameter(
+ 'tunnel_ref',
+ 'Device.IP.Interface.1.IPv4Address.1.',
+ ),
+ call.set_parameter('prim_src', 'GNSS'),
+ call.set_parameter('carrier_agg_enable', True),
+ call.set_parameter('carrier_number', 2),
+ call.set_parameter('contiguous_cc', 0),
+ call.set_parameter('web_ui_enable', False),
+ ]
+ self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
+
+ service_cfg['web_ui_enable_list'] = ["2006CW5000023"]
+
+ expected = [
+ call.delete_parameter('EARFCNDL'),
+ call.delete_parameter('DL bandwidth'),
+ call.delete_parameter('UL bandwidth'),
+ call.set_parameter(
+ 'tunnel_ref',
+ 'Device.IP.Interface.1.IPv4Address.1.',
+ ),
+ call.set_parameter('prim_src', 'GNSS'),
+ call.set_parameter('carrier_agg_enable', True),
+ call.set_parameter('carrier_number', 2),
+ call.set_parameter('contiguous_cc', 0),
+ call.set_parameter('web_ui_enable', False),
+ call.set_parameter('web_ui_enable', True),
+ ]
+ cfg_desired = Mock()
+ cfg_init.postprocess(
+ EnodebConfigBuilder.get_mconfig(),
+ service_cfg, cfg_desired,
+ )
+ print(cfg_desired.mock_calls)
+ print(type(cfg_desired.mock_calls))
+ self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
+
+ @patch('magma.configuration.service_configs.CONFIG_DIR', SRC_CONFIG_DIR)
+ def test_service_cfg_parsing(self):
+ """ Test the parsing of the service config file for enodebd.yml"""
+ service = MagmaService('enodebd', mconfigs_pb2.EnodebD())
+ service_cfg = service.config
+ service_cfg_1 = self._get_service_config()
+ service_cfg_1['web_ui_enable_list'] = []
+ service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
+ SASParameters.SAS_UID
+ ] = "INVALID_ID"
+ service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
+ SASParameters.SAS_CERT_SUBJECT
+ ] = "INVALID_CERT_SUBJECT"
+ self.assertEqual(service_cfg, service_cfg_1)
+
+ def test_status_nodes(self):
+ """ Test that the status of the node is valid"""
+ status = StatusParameters()
+
+ # Happy path
+ n1 = {
+ StatusParameters.DEFAULT_GW: "SUCCESS",
+ StatusParameters.SYNC_STATUS: "InSync",
+ StatusParameters.ENB_STATUS: "Success",
+ StatusParameters.SAS_STATUS: "Success",
+ StatusParameters.GPS_SCAN_STATUS: "SUCCESS",
+ ParameterName.GPS_LONG: "1",
+ ParameterName.GPS_LAT: "1",
+ }
+ device_config = Mock()
+ status.set_magma_device_cfg(n1, device_config)
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ self.assertEqual(expected, device_config.mock_calls)
+
+ n2 = n1.copy()
+ # Verify we can handle specific none params
+ n2[StatusParameters.DEFAULT_GW] = None
+ n3 = n1.copy()
+ n3[StatusParameters.SYNC_STATUS] = None
+ n4 = n1.copy()
+ n4[StatusParameters.ENB_STATUS] = None
+ n5 = n1.copy()
+ n5[StatusParameters.SAS_STATUS] = None
+ n6 = n1.copy()
+ n6[StatusParameters.GPS_SCAN_STATUS] = None
+ n7 = n1.copy()
+ n7[ParameterName.GPS_LONG] = None
+ n8 = n1.copy()
+ n8[ParameterName.GPS_LAT] = None
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=False),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n2, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=False),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n3, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=False),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n4, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=False),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n5, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=False),
+ call.set_parameter(param_name='PTP status', value=False),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n6, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', '1'),
+ call.set_parameter('GPS long', None),
+ ]
+ status.set_magma_device_cfg(n7, device_config)
+ self.assertEqual(expected, device_config.mock_calls)
+
+ device_config = Mock()
+ expected = [
+ call.set_parameter(param_name='RF TX status', value=True),
+ call.set_parameter(param_name='GPS status', value=True),
+ call.set_parameter(param_name='PTP status', value=True),
+ call.set_parameter(param_name='MME status', value=True),
+ call.set_parameter(param_name='Opstate', value=True),
+ call.set_parameter('GPS lat', None),
+ call.set_parameter('GPS long', '1'),
+ ]
+ status.set_magma_device_cfg(n8, device_config)
+ self.assertEqual(expected, device_config.mock_calls)