blob: 4d88496691db4901808279216e3418756957884b [file] [log] [blame]
"""
Copyright 2020 The Magma Authors.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import os
from unittest.mock import Mock, call, patch
from lte.protos.mconfig import mconfigs_pb2
from common.service import MagmaService
from data_models.data_model_parameters import ParameterName
from devices.device_map import get_device_handler_from_name
from devices.device_utils import EnodebDeviceName
from devices.freedomfi_one import (
FreedomFiOneConfigurationInitializer,
SASParameters,
StatusParameters,
)
from tests.test_utils.config_builder import EnodebConfigBuilder
from tests.test_utils.enb_acs_builder import (
EnodebAcsStateMachineBuilder,
)
from tests.test_utils.enodeb_handler import EnodebHandlerTestCase
from tests.test_utils.tr069_msg_builder import Tr069MessageBuilder
from tr069 import models
SRC_CONFIG_DIR = os.path.join(
os.environ.get('MAGMA_ROOT'),
'lte/gateway/configs',
)
class FreedomFiOneTests(EnodebHandlerTestCase):
def _get_freedomfi_one_read_only_param_values_response(
self,
) -> models.GetParameterValuesResponse:
msg = models.GetParameterValuesResponse()
param_val_list = []
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_Sync_Status',
val_type='string',
data='InSync',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_SAS_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_eNB_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus'
'.X_000E8F_DEFGW_Status',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanStatus',
val_type='string',
data='SUCCESS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLongitude',
val_type='int',
data='-105272892',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLatitude',
val_type='int',
data='40019606',
),
)
msg.ParameterList = models.ParameterValueList()
msg.ParameterList.ParameterValueStruct = param_val_list
return msg
def _get_freedomfi_one_param_values_response(self):
msg = models.GetParameterValuesResponse()
param_val_list = []
msg.ParameterList = models.ParameterValueList()
msg.ParameterList.ParameterValueStruct = param_val_list
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.EARFCNDL',
val_type='int',
data='56240',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanOnBoot',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.AdminState',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkServerList',
val_type='string',
data='10.0.2.1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_Cell_Number',
val_type='int',
data='2',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.TAC',
val_type='int',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.PeriodicUploadInterval',
val_type='int',
data='60',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.DeviceInfo.SoftwareVersion',
val_type='string',
data='TEST3920@210901',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.UserContactInformation',
val_type='string',
data='M0LK4T',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.ProtectionLevel',
val_type='string',
data='GAA',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CertSubject',
val_type='string',
data='/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.HeightType',
val_type='string',
data='AMSL',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Category',
val_type='string',
data='A',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanStatus',
val_type='string',
data='Success',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.ManagementServer.PeriodicInformInterval',
val_type='int',
data='60',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.FreqBandIndicator',
val_type='unsignedInt',
data='48',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.Common.CellIdentity',
val_type='unsignedInt',
data='101',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLongitude',
val_type='string',
data='-105272892',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.LockedLatitude',
val_type='string',
data='40019606',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.CPIEnable',
val_type='boolean',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CA_Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.GPS.ScanOnBoot',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_NEStatus.X_000E8F_Sync_Status',
val_type='string',
data='InSync',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.RF.PhyCellID',
val_type='string',
data='101,102',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.FAP.PerfMgmt.Config.1.URL',
val_type='string',
data=None,
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Location',
val_type='string',
data='indoor',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment',
val_type='boolean',
data='2',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.IsPrimary',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Server',
val_type='string',
data='https://spectrum-connect.federatedwireless.com/v1.2/',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.X_000E8F_DeviceFeature.X_000E8F_WebServerEnable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNList.1.CellReservedForOperatorUse',
val_type='boolean',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.Tunnel.1.TunnelRef',
val_type='string',
data='Device.IP.Interface.1.IPv4Address.1.',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.REM.X_000E8F_tfcsManagerConfig.primSrc',
val_type='string',
data='GNSS',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_SAS.Enable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.ManagementServer.PeriodicInformEnable',
val_type='boolean',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.EPC.PLMNListNumberOfEntries',
val_type='int',
data='1',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns',
val_type='int',
data='7',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.X_000E8F_RRMConfig.X_000E8F_CELL_Freq_Contiguous',
val_type='int',
data='0',
),
)
param_val_list.append(
Tr069MessageBuilder.get_parameter_value_struct(
name='Device.Services.FAPService.1.FAPControl.LTE.Gateway.S1SigLinkPort',
val_type='int',
data='36412',
),
)
return msg
def _get_service_config(self):
return {
"tr069": {
"interface": "eth1",
"port": 48080,
"perf_mgmt_port": 8081,
"public_ip": "192.88.99.142",
},
"reboot_enodeb_on_mme_disconnected": True,
"s1_interface": "eth1",
"sas": {
"sas_enabled": True,
"sas_server_url":
"https://spectrum-connect.federatedwireless.com/v1.2/",
"sas_uid": "M0LK4T",
"sas_category": "A",
"sas_channel_type": "GAA",
"sas_cert_subject": "/C=TW/O=Sercomm/OU=WInnForum CBSD "
"Certificate/CN=P27-SCE4255W:%s",
"sas_icg_group_id": "",
"sas_location": "indoor",
"sas_height_type": "AMSL",
},
}
def build_freedomfi_one_acs_state_machine(self):
service = EnodebAcsStateMachineBuilder.build_magma_service(
mconfig=EnodebConfigBuilder.get_mconfig(),
service_config=self._get_service_config(),
)
handler_class = get_device_handler_from_name(
EnodebDeviceName.FREEDOMFI_ONE,
)
acs_state_machine = handler_class(service)
return acs_state_machine
def test_provision(self) -> None:
"""
Test the basic provisioning workflow
1 - enodeb sends Inform, enodebd sends InformResponse
2 - enodeb sends empty HTTP message,
3 - enodebd sends get transient params, updates the device state.
4 - enodebd sends get param values, updates the device state
5 - enodebd, sets fields including SAS fields.
"""
logging.root.level = logging.DEBUG
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
inform = Tr069MessageBuilder.get_inform(
oui="000E8F",
sw_version="TEST3920@210901",
enb_serial="2006CW5000023",
)
resp = acs_state_machine.handle_tr069_message(inform)
self.assertTrue(
isinstance(resp, models.InformResponse),
'Should respond with an InformResponse',
)
# Send an empty http request
req = models.DummyInput()
resp = acs_state_machine.handle_tr069_message(req)
# Expect a request for read-only params
self.assertTrue(
isinstance(resp, models.GetParameterValues),
'State machine should be requesting param values',
)
for tr_69_nodes in StatusParameters.STATUS_PARAMETERS.values():
self.assertIn(tr_69_nodes.path, resp.ParameterNames.string)
req = self._get_freedomfi_one_read_only_param_values_response()
get_resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(get_resp, models.GetParameterValues),
'State machine should be requesting param values',
)
req = self._get_freedomfi_one_param_values_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.SetParameterValues),
'State machine should be setting parameters',
)
self.assertIsNotNone(
resp.ParameterKey.Data,
'ParameterKey should be set for FreedomFiOne eNB',
)
msg = models.SetParameterValuesResponse()
msg.Status = 1
get_resp = acs_state_machine.handle_tr069_message(msg)
self.assertTrue(
isinstance(get_resp, models.GetParameterValues),
'We should read back all parameters',
)
req = self._get_freedomfi_one_param_values_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.DummyInput),
'Provisioning completed with Dummy response',
)
def test_manual_reboot_during_provisioning(self) -> None:
"""
Test a scenario where a Magma user goes through the enodebd CLI to
reboot the Baicells eNodeB.
This checks the scenario where the command is sent in the middle
of a TR-069 provisioning session.
"""
logging.root.level = logging.DEBUG
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
# Send an Inform message, wait for an InformResponse
inform = Tr069MessageBuilder.get_inform(
oui="000E8F",
sw_version="TEST3920@210901",
enb_serial="2006CW5000023",
)
resp = acs_state_machine.handle_tr069_message(inform)
self.assertTrue(
isinstance(resp, models.InformResponse),
'Should respond with an InformResponse',
)
# Send an empty http request to kick off the rest of provisioning
req = models.DummyInput()
resp = acs_state_machine.handle_tr069_message(req)
# Expect a request for an optional parameter, three times
self.assertTrue(
isinstance(resp, models.GetParameterValues),
'State machine should be requesting param values',
)
req = Tr069MessageBuilder.get_fault()
# User uses the CLI tool to get eNodeB to reboot
acs_state_machine.reboot_asap()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.Reboot),
'In reboot sequence, state machine should send a '
'Reboot message.',
)
req = Tr069MessageBuilder.get_reboot_response()
resp = acs_state_machine.handle_tr069_message(req)
self.assertTrue(
isinstance(resp, models.DummyInput),
'State machine should end TR-069 session after '
'receiving a RebootResponse',
)
def test_post_processing(self) -> None:
""" Test FreedomFi One specific post processing functionality"""
acs_state_machine = self.build_freedomfi_one_acs_state_machine()
cfg_desired = Mock()
acs_state_machine.device_cfg.set_parameter(
ParameterName.SERIAL_NUMBER,
"2006CW5000023",
)
cfg_init = FreedomFiOneConfigurationInitializer(acs_state_machine)
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
self._get_service_config(), cfg_desired,
)
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
call.set_parameter('sas_enabled', True),
call.set_parameter(
'sas_server_url',
'https://spectrum-connect.federatedwireless.com/v1.2/',
),
call.set_parameter('sas_uid', 'M0LK4T'),
call.set_parameter('sas_category', 'A'),
call.set_parameter('sas_channel_type', 'GAA'),
call.set_parameter(
'sas_cert_subject',
'/C=TW/O=Sercomm/OU=WInnForum CBSD Certificate/CN=P27-SCE4255W:%s',
),
call.set_parameter('sas_location', 'indoor'),
call.set_parameter('sas_height_type', 'AMSL'),
]
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
# Check without sas config
service_cfg = {
"tr069": {
"interface": "eth1",
"port": 48080,
"perf_mgmt_port": 8081,
"public_ip": "192.88.99.142",
},
"reboot_enodeb_on_mme_disconnected": True,
"s1_interface": "eth1",
}
cfg_desired = Mock()
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
service_cfg, cfg_desired,
)
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
]
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
service_cfg['web_ui_enable_list'] = ["2006CW5000023"]
expected = [
call.delete_parameter('EARFCNDL'),
call.delete_parameter('DL bandwidth'),
call.delete_parameter('UL bandwidth'),
call.set_parameter(
'tunnel_ref',
'Device.IP.Interface.1.IPv4Address.1.',
),
call.set_parameter('prim_src', 'GNSS'),
call.set_parameter('carrier_agg_enable', True),
call.set_parameter('carrier_number', 2),
call.set_parameter('contiguous_cc', 0),
call.set_parameter('web_ui_enable', False),
call.set_parameter('web_ui_enable', True),
]
cfg_desired = Mock()
cfg_init.postprocess(
EnodebConfigBuilder.get_mconfig(),
service_cfg, cfg_desired,
)
print(cfg_desired.mock_calls)
print(type(cfg_desired.mock_calls))
self.assertEqual(cfg_desired.mock_calls.sort(), expected.sort())
@patch('magma.configuration.service_configs.CONFIG_DIR', SRC_CONFIG_DIR)
def test_service_cfg_parsing(self):
""" Test the parsing of the service config file for enodebd.yml"""
service = MagmaService('enodebd', mconfigs_pb2.EnodebD())
service_cfg = service.config
service_cfg_1 = self._get_service_config()
service_cfg_1['web_ui_enable_list'] = []
service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
SASParameters.SAS_UID
] = "INVALID_ID"
service_cfg_1[FreedomFiOneConfigurationInitializer.SAS_KEY][
SASParameters.SAS_CERT_SUBJECT
] = "INVALID_CERT_SUBJECT"
self.assertEqual(service_cfg, service_cfg_1)
def test_status_nodes(self):
""" Test that the status of the node is valid"""
status = StatusParameters()
# Happy path
n1 = {
StatusParameters.DEFAULT_GW: "SUCCESS",
StatusParameters.SYNC_STATUS: "InSync",
StatusParameters.ENB_STATUS: "Success",
StatusParameters.SAS_STATUS: "Success",
StatusParameters.GPS_SCAN_STATUS: "SUCCESS",
ParameterName.GPS_LONG: "1",
ParameterName.GPS_LAT: "1",
}
device_config = Mock()
status.set_magma_device_cfg(n1, device_config)
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
self.assertEqual(expected, device_config.mock_calls)
n2 = n1.copy()
# Verify we can handle specific none params
n2[StatusParameters.DEFAULT_GW] = None
n3 = n1.copy()
n3[StatusParameters.SYNC_STATUS] = None
n4 = n1.copy()
n4[StatusParameters.ENB_STATUS] = None
n5 = n1.copy()
n5[StatusParameters.SAS_STATUS] = None
n6 = n1.copy()
n6[StatusParameters.GPS_SCAN_STATUS] = None
n7 = n1.copy()
n7[ParameterName.GPS_LONG] = None
n8 = n1.copy()
n8[ParameterName.GPS_LAT] = None
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=False),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n2, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=False),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n3, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=False),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n4, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=False),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n5, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=False),
call.set_parameter(param_name='PTP status', value=False),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n6, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', '1'),
call.set_parameter('GPS long', None),
]
status.set_magma_device_cfg(n7, device_config)
self.assertEqual(expected, device_config.mock_calls)
device_config = Mock()
expected = [
call.set_parameter(param_name='RF TX status', value=True),
call.set_parameter(param_name='GPS status', value=True),
call.set_parameter(param_name='PTP status', value=True),
call.set_parameter(param_name='MME status', value=True),
call.set_parameter(param_name='Opstate', value=True),
call.set_parameter('GPS lat', None),
call.set_parameter('GPS long', '1'),
]
status.set_magma_device_cfg(n8, device_config)
self.assertEqual(expected, device_config.mock_calls)