blob: 048ba08cd18d6b13093a9e86cf11fa7af022554a [file] [log] [blame]
# SPDX-FileCopyrightText: 2020 The Magma Authors.
# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
#
# SPDX-License-Identifier: BSD-3-Clause
import logging
import os
from unittest.mock import Mock, call, patch
from lte.protos.mconfig import mconfigs_pb2
from common.service import MagmaService
from data_models.data_model_parameters import ParameterName
from devices.device_map import get_device_handler_from_name
from devices.device_utils import EnodebDeviceName
from devices.freedomfi_one import (
FreedomFiOneConfigurationInitializer,
SASParameters,
StatusParameters,
)
from tests.test_utils.config_builder import EnodebConfigBuilder
from tests.test_utils.enb_acs_builder import (
EnodebAcsStateMachineBuilder,
)
from tests.test_utils.enodeb_handler import EnodebHandlerTestCase
from tests.test_utils.tr069_msg_builder import Tr069MessageBuilder
from tr069 import models
SRC_CONFIG_DIR = os.path.join(
os.environ.get('MAGMA_ROOT'),
'lte/gateway/configs',
)
class FreedomFiOneTests(EnodebHandlerTestCase):
def _get_freedomfi_one_read_only_param_values_response(
self,
) -> models.GetParameterValuesResponse:
msg = models.GetParameterValuesResponse()
param_val_list = []
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_Sync_Status',
val_type='string',
data='InSync',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_SAS_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_eNB_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_DEFGW_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanStatus',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLongitude',
val_type='int',
data='-105272892',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLatitude',
val_type='int',
data='40019606',
),
)
msg.ParameterList = models.ParameterValueList()
msg.ParameterList.ParameterValueStruct = param_val_list
return msg
def _get_freedomfi_one_param_values_response(self):
msg = models.GetParameterValuesResponse()
param_val_list = []
msg.ParameterList = models.ParameterValueList()
msg.ParameterList.ParameterValueStruct = param_val_list
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.EARFCNDL',
val_type='int',
data='56240',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanOnBoot',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.AdminState',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkServerList',
val_type='string',
data='10.0.2.1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_Cell_Number',
val_type='int',
data='2',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.TAC',
val_type='int',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.PeriodicUploadInterval',
val_type='int',
data='60',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.DeviceInfo.SoftwareVersion',
val_type='string',
data='TEST3920@210901',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.UserContactInformation',
val_type='string',
data='M0LK4T',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.ProtectionLevel',
val_type='string',
data='GAA',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CertSubject',
val_type='string',
data='/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.HeightType',
val_type='string',
data='AMSL',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Category',
val_type='string',
data='A',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanStatus',
val_type='string',
data='Success',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.ManagementServer.PeriodicInformInterval',
val_type='int',
data='60',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.FreqBandIndicator',
val_type='unsignedInt',
data='48',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.Common.CellIdentity',
val_type='unsignedInt',
data='101',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLongitude',
val_type='string',
data='-105272892',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLatitude',
val_type='string',
data='40019606',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CPIEnable',
val_type='boolean',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CA_Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanOnBoot',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus.X_000E8F_Sync_Status',
val_type='string',
data='InSync',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.PhyCellID',
val_type='string',
data='101,102',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.URL',
val_type='string',
data=None,
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Location',
val_type='string',
data='indoor',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment',
val_type='boolean',
data='2',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.IsPrimary',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Server',
val_type='string',
data='https://spectrum-connect.federatedwireless.com/v1.2/',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_WebServerEnable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.CellReservedForOperatorUse',
val_type='boolean',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.Tunnel.1.TunnelRef',
val_type='string',
data='Device.IP.Interface.1.IPv4Address.1.',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.REM.X_000E8F_tfcsManagerConfig.primSrc',
val_type='string',
data='GNSS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.ManagementServer.PeriodicInformEnable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNListNumberOfEntries',
val_type='int',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns',
val_type='int',
data='7',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CELL_Freq_Contiguous',
val_type='int',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkPort',
val_type='int',
data='36412',
),
)
return msg
def _get_service_config(self):
return {
"tr069": {
"interface": "eth1",
"port": 48080,
"perf_mgmt_port": 8081,
"public_ip": "192.88.99.142",
},
"reboot_enodeb_on_mme_disconnected": True,
"s1_interface": "eth1",
"sas": {
"sas_enabled": True,
"sas_server_url":
"https://spectrum-connect.federatedwireless.com/v1.2/",
"sas_uid": "M0LK4T",
"sas_category": "A",
"sas_channel_type": "GAA",
"sas_cert_subject": "/C=TW/O=Sercomm/OU=WInnForum CBSD "
"Certificate/CN=P27-SCE4255W:%s",
"sas_icg_group_id": "",
"sas_location": "indoor",
"sas_height_type": "AMSL",
},
}
def build_freedomfi_one_acs_state_machine(self):
service = EnodebAcsStateMachineBuilder.build_magma_service(
mconfig=EnodebConfigBuilder.get_mconfig(),
service_config=self._get_service_config(),
)
handler_class = get_device_handler_from_name(
EnodebDeviceName.FREEDOMFI_ONE,
)
acs_state_machine = handler_class(service)
return acs_state_machine
def test_provision(self) -> None:
"""
Test the basic provisioning workflow
1 - enodeb sends Inform, enodebd sends InformResponse
2 - enodeb sends empty HTTP message,
3 - enodebd sends get transient params, updates the device state.
4 - enodebd sends get param values, updates the device state
5 - enodebd, sets fields including SAS fields.
"""
logging.root.level = logging.DEBUG
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
inform = Tr069MessageBuilder.get_inform(
oui="000E8F",
sw_version="TEST3920@210901",
enb_serial="2006CW5000023",
)
resp = acs_state_machine.handle_tr069_message(inform)
self.assertTrue(
isinstance(resp, models.InformResponse),
'Should respond with an InformResponse',
)
# Send an empty http request
req = models.DummyInput()
resp = acs_state_machine.handle_tr069_message(req)
# Expect a request for read-only params
self.assertTrue(
isinstance(resp, models.GetParameterValues),
'State machine should be requesting param values',
)
for tr_69_nodes in StatusParameters.STATUS_PARAMETERS.values():
self.assertIn(tr_69_nodes.path, resp.ParameterNames.string)
req = self._get_freedomfi_one_read_only_param_values_response()
get_resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(get_resp, models.GetParameterValues),
'State machine should be requesting param values',
)
req = self._get_freedomfi_one_param_values_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.SetParameterValues),
'State machine should be setting parameters',
)
self.assertIsNotNone(
resp.ParameterKey.Data,
'ParameterKey should be set for FreedomFiOne eNB',
)
msg = models.SetParameterValuesResponse()
msg.Status = 1
get_resp = acs_state_machine.handle_tr069_message(msg)
self.assertTrue(
isinstance(get_resp, models.GetParameterValues),
'We should read back all parameters',
)
req = self._get_freedomfi_one_param_values_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.DummyInput),
'Provisioning completed with Dummy response',
)
def test_manual_reboot_during_provisioning(self) -> None:
"""
Test a scenario where a Magma user goes through the enodebd CLI to
reboot the Baicells eNodeB.
This checks the scenario where the command is sent in the middle
of a TR-069 provisioning session.
"""
logging.root.level = logging.DEBUG
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
# Send an Inform message, wait for an InformResponse
inform = Tr069MessageBuilder.get_inform(
oui="000E8F",
sw_version="TEST3920@210901",
enb_serial="2006CW5000023",
)
resp = acs_state_machine.handle_tr069_message(inform)
self.assertTrue(
isinstance(resp, models.InformResponse),
'Should respond with an InformResponse',
)
# Send an empty http request to kick off the rest of provisioning
req = models.DummyInput()
resp = acs_state_machine.handle_tr069_message(req)
# Expect a request for an optional parameter, three times
self.assertTrue(
isinstance(resp, models.GetParameterValues),
'State machine should be requesting param values',
)
req = Tr069MessageBuilder.get_fault()
# User uses the CLI tool to get eNodeB to reboot
acs_state_machine.reboot_asap()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.Reboot),
'In reboot sequence, state machine should send a '
'Reboot message.',
)
req = Tr069MessageBuilder.get_reboot_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.DummyInput),
'State machine should end TR-069 session after '
'receiving a RebootResponse',
)
def test_post_processing(self) -> None:
""" Test FreedomFi One specific post processing functionality"""
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
cfg_desired = Mock()
acs_state_machine.device_cfg.set_parameter(
ParameterName.SERIAL_NUMBER,
"2006CW5000023",
)
cfg_init = FreedomFiOneConfigurationInitializer(acs_state_machine)
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
self._get_service_config(), cfg_desired,
)
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
call.set_parameter('sas_enabled', True),
call.set_parameter(
'sas_server_url',
'https://spectrum-connect.federatedwireless.com/v1.2/',
),
call.set_parameter('sas_uid', 'M0LK4T'),
call.set_parameter('sas_category', 'A'),
call.set_parameter('sas_channel_type', 'GAA'),
call.set_parameter(
'sas_cert_subject',
'/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
),
call.set_parameter('sas_location', 'indoor'),
call.set_parameter('sas_height_type', 'AMSL'),
]
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
# Check without sas config
service_cfg = {
"tr069": {
"interface": "eth1",
"port": 48080,
"perf_mgmt_port": 8081,
"public_ip": "192.88.99.142",
},
"reboot_enodeb_on_mme_disconnected": True,
"s1_interface": "eth1",
}
cfg_desired = Mock()
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
service_cfg, cfg_desired,
)
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
]
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
service_cfg['web_ui_enable_list'] = ["2006CW5000023"]
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
call.set_parameter('web_ui_enable', True),
]
cfg_desired = Mock()
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
service_cfg, cfg_desired,
)
print(cfg_desired.mock_calls)
print(type(cfg_desired.mock_calls))
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
@patch('magma.configuration.service_configs.CONFIG_DIR', SRC_CONFIG_DIR)
def test_service_cfg_parsing(self):
""" Test the parsing of the service config file for enodebd.yml"""
service = MagmaService('enodebd', mconfigs_pb2.EnodebD())
service_cfg = service.config
service_cfg_1 = self._get_service_config()
service_cfg_1['web_ui_enable_list'] = []
service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
SASParameters.SAS_UID
] = "INVALID_ID"
service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
SASParameters.SAS_CERT_SUBJECT
] = "INVALID_CERT_SUBJECT"
self.assertEqual(service_cfg, service_cfg_1)
def test_status_nodes(self):
""" Test that the status of the node is valid"""
status = StatusParameters()
# Happy path
n1 = {
StatusParameters.DEFAULT_GW: "SUCCESS",
StatusParameters.SYNC_STATUS: "InSync",
StatusParameters.ENB_STATUS: "Success",
StatusParameters.SAS_STATUS: "Success",
StatusParameters.GPS_SCAN_STATUS: "SUCCESS",
ParameterName.GPS_LONG: "1",
ParameterName.GPS_LAT: "1",
}
device_config = Mock()
status.set_magma_device_cfg(n1, device_config)
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
self.assertEqual(expected, device_config.mock_calls)
n2 = n1.copy()
# Verify we can handle specific none params
n2[StatusParameters.DEFAULT_GW] = None
n3 = n1.copy()
n3[StatusParameters.SYNC_STATUS] = None
n4 = n1.copy()
n4[StatusParameters.ENB_STATUS] = None
n5 = n1.copy()
n5[StatusParameters.SAS_STATUS] = None
n6 = n1.copy()
n6[StatusParameters.GPS_SCAN_STATUS] = None
n7 = n1.copy()
n7[ParameterName.GPS_LONG] = None
n8 = n1.copy()
n8[ParameterName.GPS_LAT] = None
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=False),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n2, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=False),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n3, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=False),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n4, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=False),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n5, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=False),
call.set_parameter(param_name='PTP status', value=False),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n6, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', None),
]
status.set_magma_device_cfg(n7, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='gps_status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', None),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n8, device_config)
self.assertEqual(expected, device_config.mock_calls)