| # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| import logging |
| from typing import Any, Callable, Dict, List, Optional, Type |
| |
| from common.service import MagmaService |
| from data_models import transform_for_magma |
| from data_models.data_model import ( |
| DataModel, |
| InvalidTrParamPath, |
| TrParam, |
| ) |
| from data_models.data_model_parameters import ( |
| ParameterName, |
| TrParameterType, |
| ) |
| from device_config.configuration_init import build_desired_config |
| from device_config.enodeb_config_postprocessor import EnodebConfigurationPostProcessor |
| from device_config.enodeb_configuration import EnodebConfiguration |
| from devices.device_utils import EnodebDeviceName |
| from logger import EnodebdLogger |
| from state_machines.acs_state_utils import ( |
| get_all_objects_to_add, |
| get_all_objects_to_delete, |
| get_all_param_values_to_set, |
| get_params_to_get, |
| parse_get_parameter_values_response, |
| ) |
| from state_machines.enb_acs import EnodebAcsStateMachine |
| from state_machines.enb_acs_impl import BasicEnodebAcsStateMachine |
| from state_machines.enb_acs_states import ( |
| AcsMsgAndTransition, |
| AcsReadMsgResult, |
| AddObjectsState, |
| DeleteObjectsState, |
| EnbSendRebootState, |
| EndSessionState, |
| EnodebAcsState, |
| ErrorState, |
| DownloadState, |
| WaitDownloadResponseState, |
| WaitInformTransferCompleteState, |
| GetParametersState, |
| SetParameterValuesState, |
| WaitGetParametersState, |
| WaitInformMRebootState, |
| WaitInformState, |
| WaitRebootResponseState, |
| WaitSetParameterValuesState, |
| CheckStatusState, |
| ) |
| from tr069 import models |
| |
| |
| FAPSERVICE_PATH = "Device.Services.FAPService.1." |
| FAP_CONTROL = FAPSERVICE_PATH + "FAPControl." |
| |
| |
| class SercommHandler(BasicEnodebAcsStateMachine): |
| def __init__(self, service: MagmaService,) -> None: |
| self._state_map = {} |
| super().__init__(service=service, use_param_key=True) |
| |
| def reboot_asap(self) -> None: |
| self.transition("reboot") |
| |
| def is_enodeb_connected(self) -> bool: |
| return not isinstance(self.state, WaitInformState) |
| |
| def _init_state_map(self) -> None: |
| self._state_map = { |
| # Inform comes in -> Respond with InformResponse |
| "wait_inform": WaitInformState(self, when_done="get_rpc_methods"), |
| # If first inform after boot -> GetRpc request comes in, if not |
| # empty request comes in => Transition to get_transient_params |
| "get_rpc_methods": SercommGetInitState( |
| self, when_done="get_transient_params", |
| ), |
| # Read transient readonly params. |
| "get_transient_params": SercommSendGetTransientParametersState( |
| self, when_upgrade="firmware_upgrade", when_done="get_params", |
| ), |
| "firmware_upgrade": DownloadState(self, when_done="wait_download_response"), |
| "wait_download_response": WaitDownloadResponseState( |
| self, when_done="wait_transfer_complete" |
| ), |
| "wait_transfer_complete": WaitInformTransferCompleteState( |
| self, |
| when_done="get_params", |
| when_periodic="wait_transfer_complete", |
| when_timeout="end_session", |
| ), |
| "get_params": SercommGetObjectParametersState( |
| self, |
| when_delete="delete_objs", |
| when_add="add_objs", |
| when_set="set_params", |
| when_skip="check_status", |
| ), |
| "delete_objs": DeleteObjectsState( |
| self, when_add="add_objs", when_skip="set_params", |
| ), |
| "add_objs": AddObjectsState(self, when_done="set_params"), |
| "set_params": SetParameterValuesState(self, when_done="wait_set_params",), |
| "wait_set_params": WaitSetParameterValuesState( |
| self, |
| when_done="check_get_params", |
| when_apply_invasive="check_get_params", |
| status_non_zero_allowed=True, |
| ), |
| "check_get_params": GetParametersState( |
| self, when_done="check_wait_get_params", request_all_params=True, |
| ), |
| "check_wait_get_params": WaitGetParametersState( |
| self, when_done="check_status", |
| ), |
| "check_status": CheckStatusState(self, when_done="check_status"), |
| "end_session": EndSessionState(self), |
| # These states are only entered through manual user intervention |
| "reboot": EnbSendRebootState(self, when_done="wait_reboot"), |
| "wait_reboot": WaitRebootResponseState( |
| self, when_done="wait_post_reboot_inform", |
| ), |
| "wait_post_reboot_inform": WaitInformMRebootState( |
| self, when_done="wait_empty", when_timeout="wait_inform", |
| ), |
| # The states below are entered when an unexpected message type is |
| # received |
| "unexpected_fault": ErrorState( |
| self, inform_transition_target="wait_inform", |
| ), |
| } |
| |
| @property |
| def device_name(self) -> str: |
| return EnodebDeviceName.SERCOMM |
| |
| @property |
| def data_model_class(self) -> Type[DataModel]: |
| return SercommTrDataModel |
| |
| @property |
| def config_postprocessor(self) -> EnodebConfigurationPostProcessor: |
| return SercommConfigurationInitializer(self) |
| |
| @property |
| def state_map(self) -> Dict[str, EnodebAcsState]: |
| return self._state_map |
| |
| @property |
| def disconnected_state_name(self) -> str: |
| return "wait_inform" |
| |
| @property |
| def unexpected_fault_state_name(self) -> str: |
| return "unexpected_fault" |
| |
| |
| class SASParameters: |
| """ Class modeling the SAS parameters and their TR path""" |
| |
| # For CBRS radios we set this to the limit and the SAS can reduce the |
| # power if needed. |
| |
| SAS_PARAMETERS = { |
| ParameterName.SAS_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.SAS_SERVER_URL: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.Server", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_UID: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.UserContactInformation", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CATEGORY: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.Category", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CHANNEL_TYPE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.ProtectionLevel", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CERT_SUBJECT: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CertSubject", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # SAS_IC_GROUP_ID: TrParam( |
| # FAP_CONTROL + 'LTE.X_SCM_SAS.ICGGroupId', is_invasive=False, |
| # type=TrParameterType.STRING, False), |
| ParameterName.SAS_LOCATION: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.Location", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_HEIGHT_TYPE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.HeightType", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CPI_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CPIEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CPI_IPE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CPIInstallParamSuppliedEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.SAS_FCCID: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.FCCIdentificationNumber", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_MEAS_CAPS: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.MeasCapability", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_MANU_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.ManufacturerPrefixEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CPI_NAME: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CPIName", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CPI_ID: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CPIId", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SAS_ANTA_AZIMUTH: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.AntennaAzimuth", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.SAS_ANTA_DOWNTILT: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.AntennaDowntilt", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.SAS_ANTA_GAIN: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.AntennaGain", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.SAS_ANTA_BEAMWIDTH: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.AntennaBeamwidth", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.SAS_CPI_DATA: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_SAS.CPISignatureData", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| |
| |
| class StatusParameters: |
| """ |
| Stateful class that converts eNB status to Magma understood status. |
| Sercomm has many status params that interact with each other. |
| This class maintains the business logic of parsing these status params |
| and converting it to Magma understood fields. |
| """ |
| |
| # Status parameters |
| DEFAULT_GW = "defaultGW" |
| SYNC_STATUS = "syncStatus" |
| SAS_STATUS = "sasStatus" |
| ENB_STATUS = "enbStatus" |
| GPS_SCAN_STATUS = "gpsScanStatus" |
| |
| STATUS_PARAMETERS = { |
| # Status nodes |
| DEFAULT_GW: TrParam( |
| "Device.X_SCM_DeviceFeature.X_SCM_NEStatus.X_SCM_DEFGW_Status", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_STATUS: TrParam( |
| "Device.Services.FAPService.1.FAPControl.LTE.X_SCM_SAS.State", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| GPS_SCAN_STATUS: TrParam( |
| "Device.FAP.GPS.ScanStatus", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.GPS_LAT: TrParam( |
| "Device.FAP.GPS.LockedLatitude", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.GPS_LONG: TrParam( |
| "Device.FAP.GPS.LockedLongitude", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SW_VERSION: TrParam( |
| "Device.DeviceInfo.SoftwareVersion", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SERIAL_NUMBER: TrParam( |
| "Device.DeviceInfo.SerialNumber", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| |
| # Derived status params that don't have tr69 representation. |
| DERIVED_STATUS_PARAMETERS = { |
| ParameterName.RF_TX_STATUS: TrParam( |
| "Device.X_SCM_DeviceFeature.SystemParams.RunControl.Service.4.Status", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.GPS_STATUS: TrParam( |
| "Device.FAP.GPS.ScanStatus", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PTP_STATUS: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.MME_STATUS: TrParam( |
| InvalidTrParamPath, # FAP_CONTROL + "LTE.X_SCM_S1State", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.OP_STATE: TrParam( |
| "Device.X_SCM_DeviceFeature.SystemParams.RunControl.Service.4.Status", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| } |
| |
| @classmethod |
| def set_magma_device_cfg( |
| cls, name_to_val: Dict, device_cfg: EnodebConfiguration, |
| ): |
| """ |
| Convert Sercomm name_to_val representation to magma device_cfg |
| """ |
| success_str = "SUCCESS" # String constant returned by radio |
| insync_str = "INSYNC" |
| |
| if ( |
| name_to_val.get(cls.DEFAULT_GW) |
| and name_to_val[cls.DEFAULT_GW].upper() != success_str |
| ): |
| # Nothing will proceed if the eNB doesn't have an IP on the WAN |
| serial_num = "unknown" |
| if device_cfg.has_parameter(ParameterName.SERIAL_NUMBER): |
| serial_num = device_cfg.get_parameter(ParameterName.SERIAL_NUMBER,) |
| EnodebdLogger.error( |
| "Radio with serial number %s doesn't have IP address " "on WAN", |
| serial_num, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=False, |
| ) |
| return |
| |
| if ( |
| name_to_val.get(cls.SAS_STATUS) |
| and name_to_val[cls.SAS_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=True, |
| ) |
| else: |
| # No sas grant so not transmitting. There is no explicit node for Tx |
| # in Sercomm |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.GPS_SCAN_STATUS) |
| and name_to_val[cls.GPS_SCAN_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=True, |
| ) |
| # Time comes through GPS so can only be insync with GPS is |
| # in sync, we use PTP_STATUS field to overload timer is in Sync. |
| if ( |
| name_to_val.get(cls.SYNC_STATUS) |
| and name_to_val[cls.SYNC_STATUS].upper() == insync_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.DEFAULT_GW) |
| and name_to_val[cls.DEFAULT_GW].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.ENB_STATUS) |
| and name_to_val[cls.ENB_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=False, |
| ) |
| |
| pass_through_params = [ParameterName.GPS_LAT, ParameterName.GPS_LONG] |
| |
| for name in pass_through_params: |
| device_cfg.set_parameter(name, name_to_val[name]) |
| |
| |
| class SercommMiscParameters: |
| """ |
| Default set of parameters that enable carrier aggregation and other |
| miscellaneous properties |
| """ |
| |
| MISC_PARAMETERS = { |
| ParameterName.CARRIER_AGG_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_RRMConfig.X_SCM_CA_Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.CARRIER_NUMBER: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_RRMConfig.X_SCM_Cell_Number", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.CONTIGUOUS_CC: TrParam( |
| FAP_CONTROL + "LTE.X_SCM_RRMConfig.X_SCM_CELL_Freq_Contiguous", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.PRIM_SOURCE: TrParam( |
| FAPSERVICE_PATH + "REM.X_SCM_tfcsManagerConfig.primSrc", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.CWMP_ENABLE: TrParam( |
| "Device.ManagementServer.EnableCWMP", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| } |
| |
| |
| class SercommTrDataModel(DataModel): |
| """ |
| Class to represent relevant data model parameters from TR-196/TR-098. |
| This class is effectively read-only. |
| |
| These models have these idiosyncrasies (on account of running TR098): |
| |
| - Parameter content root is different (InternetGatewayDevice) |
| - GetParameter queries with a wildcard e.g. InternetGatewayDevice. do |
| not respond with the full tree (we have to query all parameters) |
| - MME status is not exposed - we assume the MME is connected if |
| the eNodeB is transmitting (OpState=true) |
| - Parameters such as band capability/duplex config |
| are rooted under `boardconf.` and not the device config root |
| - Num PLMNs is not reported by these units |
| """ |
| |
| PARAMETERS = { |
| # Top-level objects |
| ParameterName.DEVICE: TrParam( |
| "Device.", |
| is_invasive=False, |
| type=TrParameterType.OBJECT, |
| is_optional=False, |
| ), |
| ParameterName.FAP_SERVICE: TrParam( |
| FAP_CONTROL, |
| is_invasive=False, |
| type=TrParameterType.OBJECT, |
| is_optional=False, |
| ), |
| # Device info |
| ParameterName.IP_ADDRESS: TrParam( |
| "Device.IP.Interface.1.IPv4Address.1.IPAddress", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # RF-related parameters |
| ParameterName.FREQ_BAND_1: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.FreqBandIndicator", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| ParameterName.FREQ_BAND_2: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_FreqBandIndicator2", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| ParameterName.FREQ_BAND_LIST: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_FreqBandIndicatorConfigList", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNDL: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.EARFCNDL", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNUL: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.EARFCNUL", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNDL2: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_EARFCNDL2", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNUL2: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_EARFCNUL2", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNDL_LIST: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_EARFCNDLConfigList", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.EARFCNUL_LIST: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_EARFCNULConfigList", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.DL_BANDWIDTH: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.DLBandwidth", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.UL_BANDWIDTH: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.ULBandwidth", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.PCI_LIST: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.PhyCellID", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SUBFRAME_ASSIGNMENT: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.SPECIAL_SUBFRAME_PATTERN: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.CELL_ENABLE64QAM: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.PHY.PUSCH.Enable64QAM", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.CELL_ID: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.Common.CellIdentity", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| # Readonly LTE state |
| ParameterName.ADMIN_STATE: TrParam( |
| FAP_CONTROL + "LTE.AdminState", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.GPS_ENABLE: TrParam( |
| "Device.FAP.GPS.ScanOnBoot", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| # Core network parameters |
| ParameterName.MME_ADDRESS: TrParam( |
| FAP_CONTROL + "LTE.Gateway.S1SigLinkServerList", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.MME_PORT: TrParam( |
| FAP_CONTROL + "LTE.Gateway.S1SigLinkPort", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.TAC1: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.TAC", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.TAC2: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.X_SCM_TAC2", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| # Management server parameters |
| ParameterName.PERIODIC_INFORM_ENABLE: TrParam( |
| "Device.ManagementServer.PeriodicInformEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PERIODIC_INFORM_INTERVAL: TrParam( |
| "Device.ManagementServer.PeriodicInformInterval", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| # Performance management parameters |
| ParameterName.PERF_MGMT_ENABLE: TrParam( |
| "Device.FAP.PerfMgmt.Config.1.Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PERF_MGMT_UPLOAD_INTERVAL: TrParam( |
| "Device.FAP.PerfMgmt.Config.1.PeriodicUploadInterval", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.PERF_MGMT_UPLOAD_URL: TrParam( |
| "Device.FAP.PerfMgmt.Config.1.URL", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.TX_POWER: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_SCM_TxPowerConfig", |
| is_invasive=True, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.TUNNEL_TYPE: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.Tunnel.1.TunnelRef", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| TRANSFORMS_FOR_ENB = {} |
| NUM_PLMNS_IN_CONFIG = 1 |
| for i in range(1, NUM_PLMNS_IN_CONFIG + 1): |
| PARAMETERS[ParameterName.PLMN_N % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d." % i, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_CELL_RESERVED % i] = TrParam( |
| FAPSERVICE_PATH |
| + "CellConfig.LTE.EPC.PLMNList.%d.CellReservedForOperatorUse" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_ENABLE % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.Enable" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_PRIMARY % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.IsPrimary" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_PLMNID % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.PLMNID" % i, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ) |
| |
| PARAMETERS.update(SASParameters.SAS_PARAMETERS) |
| PARAMETERS.update(SercommMiscParameters.MISC_PARAMETERS) |
| PARAMETERS.update(StatusParameters.STATUS_PARAMETERS) |
| PARAMETERS.update(StatusParameters.DERIVED_STATUS_PARAMETERS) |
| |
| TRANSFORMS_FOR_MAGMA = { |
| # We don't set these parameters |
| ParameterName.BAND_CAPABILITY: transform_for_magma.band_capability, |
| ParameterName.DUPLEX_MODE_CAPABILITY: transform_for_magma.duplex_mode, |
| } |
| |
| @classmethod |
| def get_parameter(cls, param_name: ParameterName) -> Optional[TrParam]: |
| return cls.PARAMETERS.get(param_name) |
| |
| @classmethod |
| def _get_magma_transforms(cls,) -> Dict[ParameterName, Callable[[Any], Any]]: |
| return cls.TRANSFORMS_FOR_MAGMA |
| |
| @classmethod |
| def _get_enb_transforms(cls) -> Dict[ParameterName, Callable[[Any], Any]]: |
| return cls.TRANSFORMS_FOR_ENB |
| |
| @classmethod |
| def get_load_parameters(cls) -> List[ParameterName]: |
| """ |
| Load all the parameters instead of a subset. |
| """ |
| return list(cls.PARAMETERS.keys()) |
| |
| @classmethod |
| def get_num_plmns(cls) -> int: |
| return cls.NUM_PLMNS_IN_CONFIG |
| |
| @classmethod |
| def get_parameter_names(cls) -> List[ParameterName]: |
| excluded_params = [ |
| str(ParameterName.DEVICE), |
| str(ParameterName.FAP_SERVICE), |
| ] |
| names = list( |
| filter( |
| lambda x: (not str(x).startswith("PLMN")) |
| and (str(x) not in excluded_params), |
| cls.PARAMETERS.keys(), |
| ), |
| ) |
| return names |
| |
| @classmethod |
| def get_numbered_param_names(cls,) -> Dict[ParameterName, List[ParameterName]]: |
| names = {} |
| for i in range(1, cls.NUM_PLMNS_IN_CONFIG + 1): |
| params = [ |
| ParameterName.PLMN_N_CELL_RESERVED % i, |
| ParameterName.PLMN_N_ENABLE % i, |
| ParameterName.PLMN_N_PRIMARY % i, |
| ParameterName.PLMN_N_PLMNID % i, |
| ] |
| names[ParameterName.PLMN_N % i] = params |
| |
| return names |
| |
| @classmethod |
| def get_sas_param_names(cls) -> List[ParameterName]: |
| return SASParameters.SAS_PARAMETERS.keys() |
| |
| |
| class SercommConfigurationInitializer(EnodebConfigurationPostProcessor): |
| """ |
| Class to add the sas related parameters to the desired config. |
| """ |
| |
| SAS_KEY = "sas" |
| |
| def __init__(self, acs: EnodebAcsStateMachine): |
| super().__init__() |
| self.acs = acs |
| |
| def postprocess(self, desired_cfg: EnodebConfiguration) -> None: |
| pass |
| |
| |
| class SercommSendGetTransientParametersState(EnodebAcsState): |
| """ |
| Periodically read eNodeB status. Note: keep frequency low to avoid |
| backing up large numbers of read operations if enodebd is busy. |
| Some eNB parameters are read only and updated by the eNB itself. |
| """ |
| |
| def __init__(self, acs: EnodebAcsStateMachine, when_upgrade: str, when_done: str): |
| super().__init__() |
| self.acs = acs |
| self.upgrade_transition = when_upgrade |
| self.done_transition = when_done |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| |
| request = models.GetParameterValues() |
| request.ParameterNames = models.ParameterNames() |
| request.ParameterNames.string = [] |
| |
| for _, tr_param in StatusParameters.STATUS_PARAMETERS.items(): |
| path = tr_param.path |
| request.ParameterNames.string.append(path) |
| |
| request.ParameterNames.string.append( |
| "Device.X_SCM_DeviceFeature.SystemParams.RunControl.Service.4.Status", |
| ) |
| |
| request.ParameterNames.arrayType = "xsd:string[%d]" % len( |
| request.ParameterNames.string |
| ) |
| |
| return AcsMsgAndTransition(msg=request, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| |
| name_to_val = parse_get_parameter_values_response(self.acs.data_model, message,) |
| |
| # Update device configuration |
| StatusParameters.set_magma_device_cfg( |
| name_to_val, self.acs.device_cfg, |
| ) |
| |
| if name_to_val["SW version"] != "RC3923@2202151120": |
| print("Get into Firmware Upgrade state") |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.upgrade_transition |
| ) |
| |
| print("Skip firmware upgrade, configure the enb") |
| return AcsReadMsgResult(msg_handled=True, next_state=self.done_transition) |
| |
| def state_description(self) -> str: |
| return "Getting transient read-only parameters" |
| |
| |
| class SercommGetInitState(EnodebAcsState): |
| """ |
| After the first Inform message the following can happen: |
| 1 - eNB can try to learn the RPC method of the ACS, reply back with the |
| RPC response (happens right after boot) |
| 2 - eNB can send an empty message -> This means that the eNB is already |
| provisioned so transition to next state. Only transition to next state |
| after this message. |
| 3 - Some other method call that we don't care about so ignore. |
| expected that the eNB -> This is an unhandled state so unlikely |
| """ |
| |
| def __init__(self, acs: EnodebAcsStateMachine, when_done): |
| super().__init__() |
| self.acs = acs |
| self.done_transition = when_done |
| self._is_rpc_request = False |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| """ |
| Return empty message response if care about this |
| message type otherwise return empty RPC methods response. |
| """ |
| if not self._is_rpc_request: |
| resp = models.DummyInput() |
| return AcsMsgAndTransition(msg=resp, next_state=None) |
| |
| resp = models.GetRPCMethodsResponse() |
| resp.MethodList = models.MethodList() |
| RPC_METHODS = ["Inform", "GetRPCMethods", "TransferComplete"] |
| resp.MethodList.arrayType = "xsd:string[%d]" % len(RPC_METHODS) |
| resp.MethodList.string = RPC_METHODS |
| # Don't transition to next state wait for the empty HTTP post |
| return AcsMsgAndTransition(msg=resp, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| # If this is a regular Inform, not after a reboot we'll get an empty |
| # message, in this case transition to the next state. We consider |
| # this phase as "initialized" |
| if isinstance(message, models.DummyInput): |
| return AcsReadMsgResult(msg_handled=True, next_state=self.done_transition,) |
| if not isinstance(message, models.GetRPCMethods): |
| # Unexpected, just don't die, ignore message. |
| logging.error("Ignoring message %s", str(type(message))) |
| # Set this so get_msg will return an empty message |
| self._is_rpc_request = False |
| else: |
| # Return a valid RPC response |
| self._is_rpc_request = True |
| return AcsReadMsgResult(msg_handled=True, next_state=None) |
| |
| def state_description(self) -> str: |
| return "Initializing the post boot sequence for eNB" |
| |
| |
| class SercommGetObjectParametersState(EnodebAcsState): |
| """ |
| Get information on parameters belonging to objects that can be added or |
| removed from the configuration. |
| |
| Englewood will report a parameter value as None if it does not exist |
| in the data model, rather than replying with a Fault message like most |
| eNB devices. |
| """ |
| |
| def __init__( |
| self, |
| acs: EnodebAcsStateMachine, |
| when_delete: str, |
| when_add: str, |
| when_set: str, |
| when_skip: str, |
| ): |
| super().__init__() |
| self.acs = acs |
| self.rm_obj_transition = when_delete |
| self.add_obj_transition = when_add |
| self.set_params_transition = when_set |
| self.skip_transition = when_skip |
| |
| def get_params_to_get(self, data_model: DataModel,) -> List[ParameterName]: |
| names = [] |
| |
| # First get base params |
| names = get_params_to_get( |
| self.acs.device_cfg, self.acs.data_model, request_all_params=True, |
| ) |
| # Add object params. |
| num_plmns = data_model.get_num_plmns() |
| obj_to_params = data_model.get_numbered_param_names() |
| for i in range(1, num_plmns + 1): |
| obj_name = ParameterName.PLMN_N % i |
| desired = obj_to_params[obj_name] |
| names += desired |
| |
| print(obj_to_params) |
| return names |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| """ Respond with GetParameterValuesRequest """ |
| names = self.get_params_to_get(self.acs.data_model,) |
| |
| # Generate the request |
| request = models.GetParameterValues() |
| request.ParameterNames = models.ParameterNames() |
| request.ParameterNames.arrayType = "xsd:string[%d]" % len(names) |
| request.ParameterNames.string = [] |
| for name in names: |
| path = self.acs.data_model.get_parameter(name).path |
| if path is not InvalidTrParamPath: |
| # Only get data elements backed by tr69 path |
| request.ParameterNames.string.append(path) |
| |
| return AcsMsgAndTransition(msg=request, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| """ |
| Process GetParameterValuesResponse |
| """ |
| if not isinstance(message, models.GetParameterValuesResponse): |
| return AcsReadMsgResult(msg_handled=False, next_state=None) |
| |
| path_to_val = {} |
| for param_value_struct in message.ParameterList.ParameterValueStruct: |
| path_to_val[param_value_struct.Name] = param_value_struct.Value.Data |
| |
| EnodebdLogger.debug("Received object parameters: %s", str(path_to_val)) |
| |
| # Parse simple params |
| param_name_list = self.acs.data_model.get_parameter_names() |
| |
| for name in param_name_list: |
| path = self.acs.data_model.get_parameter(name).path |
| if path in path_to_val: |
| value = path_to_val.get(path) |
| magma_val = self.acs.data_model.transform_for_magma(name, value,) |
| self.acs.device_cfg.set_parameter(name, magma_val) |
| |
| # Parse object params |
| num_plmns = self.acs.data_model.get_num_plmns() |
| for i in range(1, num_plmns + 1): |
| obj_name = ParameterName.PLMN_N % i |
| obj_to_params = self.acs.data_model.get_numbered_param_names() |
| param_name_list = obj_to_params[obj_name] |
| for name in param_name_list: |
| path = self.acs.data_model.get_parameter(name).path |
| if path in path_to_val: |
| value = path_to_val.get(path) |
| if value is None: |
| continue |
| if obj_name not in self.acs.device_cfg.get_object_names(): |
| self.acs.device_cfg.add_object(obj_name) |
| magma_value = self.acs.data_model.transform_for_magma(name, value) |
| self.acs.device_cfg.set_parameter_for_object( |
| name, magma_value, obj_name, |
| ) |
| |
| # Now we have enough information to build the desired configuration |
| if self.acs.desired_cfg is None: |
| self.acs.desired_cfg = build_desired_config( |
| self.acs.device_cfg, self.acs.data_model, self.acs.config_postprocessor, |
| ) |
| |
| if ( |
| len(get_all_objects_to_delete(self.acs.desired_cfg, self.acs.device_cfg,),) |
| > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.rm_obj_transition, |
| ) |
| elif ( |
| len(get_all_objects_to_add(self.acs.desired_cfg, self.acs.device_cfg,),) > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.add_obj_transition, |
| ) |
| elif ( |
| len( |
| get_all_param_values_to_set( |
| self.acs.desired_cfg, self.acs.device_cfg, self.acs.data_model, |
| ), |
| ) |
| > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.set_params_transition, |
| ) |
| return AcsReadMsgResult(msg_handled=True, next_state=self.skip_transition,) |
| |
| def state_description(self) -> str: |
| return "Getting well known parameters" |