| """ |
| Copyright 2020 The Magma Authors. |
| |
| This source code is licensed under the BSD-style license found in the |
| LICENSE file in the root directory of this source tree. |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| import logging |
| from typing import Any, Callable, Dict, List, Optional, Type |
| |
| from common.service import MagmaService |
| from data_models import transform_for_magma |
| from data_models.data_model import ( |
| DataModel, |
| InvalidTrParamPath, |
| TrParam, |
| ) |
| from data_models.data_model_parameters import ( |
| ParameterName, |
| TrParameterType, |
| ) |
| from configuration.service_configs import load_enb_config |
| from device_config.configuration_init import build_desired_config |
| from device_config.enodeb_config_postprocessor import EnodebConfigurationPostProcessor |
| from device_config.enodeb_configuration import EnodebConfiguration |
| from devices.device_utils import EnodebDeviceName |
| from logger import EnodebdLogger |
| from state_machines.acs_state_utils import ( |
| get_all_objects_to_add, |
| get_all_objects_to_delete, |
| get_all_param_values_to_set, |
| get_params_to_get, |
| parse_get_parameter_values_response, |
| ) |
| from state_machines.enb_acs import EnodebAcsStateMachine |
| from state_machines.enb_acs_impl import BasicEnodebAcsStateMachine |
| from state_machines.enb_acs_states import ( |
| AcsMsgAndTransition, |
| AcsReadMsgResult, |
| AddObjectsState, |
| DeleteObjectsState, |
| EnbSendRebootState, |
| EndSessionState, |
| EnodebAcsState, |
| ErrorState, |
| GetParametersState, |
| SetParameterValuesState, |
| WaitGetParametersState, |
| WaitInformMRebootState, |
| WaitInformState, |
| WaitRebootResponseState, |
| WaitSetParameterValuesState, |
| ) |
| from tr069 import models |
| |
| |
| class SASParameters: |
| """ Class modeling the SAS parameters and their TR path""" |
| |
| # SAS parameters for FreedomFiOne |
| FAP_CONTROL = "Device.Services.FAPService.1.FAPControl." |
| FAPSERVICE_PATH = "Device.Services.FAPService.1." |
| |
| # Sas management parameters |
| SAS_ENABLE = "sas_enabled" |
| SAS_SERVER_URL = "sas_server_url" |
| SAS_UID = "sas_uid" |
| SAS_CATEGORY = "sas_category" |
| SAS_CHANNEL_TYPE = "sas_channel_type" |
| SAS_CERT_SUBJECT = "sas_cert_subject" |
| SAS_IC_GROUP_ID = "sas_icg_group_id" |
| SAS_LOCATION = "sas_location" |
| SAS_HEIGHT_TYPE = "sas_height_type" |
| SAS_CPI_ENABLE = "sas_cpi_enable" |
| SAS_CPI_IPE = "sas_cpi_ipe" # Install param supplied enable |
| FREQ_BAND_1 = "freq_band_1" |
| FREQ_BAND_2 = "freq_band_2" |
| # For CBRS radios we set this to the limit and the SAS can reduce the |
| # power if needed. |
| TX_POWER_CONFIG = "tx_power_config" |
| |
| SAS_PARAMETERS = { |
| SAS_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| SAS_SERVER_URL: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.Server", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_UID: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.UserContactInformation", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_CATEGORY: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.Category", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_CHANNEL_TYPE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.ProtectionLevel", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_CERT_SUBJECT: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.CertSubject", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # SAS_IC_GROUP_ID: TrParam( |
| # FAP_CONTROL + 'LTE.X_000E8F_SAS.ICGGroupId', is_invasive=False, |
| # type=TrParameterType.STRING, False), |
| SAS_LOCATION: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.Location", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_HEIGHT_TYPE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.HeightType", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| SAS_CPI_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.CPIEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| SAS_CPI_IPE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_SAS.CPIInstallParamSuppliedEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| FREQ_BAND_1: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.FreqBandIndicator", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| FREQ_BAND_2: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_000E8F_FreqBandIndicator2", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| TX_POWER_CONFIG: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.X_000E8F_TxPowerConfig", |
| is_invasive=True, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| } |
| |
| |
| class StatusParameters: |
| """ |
| Stateful class that converts eNB status to Magma understood status. |
| FreedomFiOne has many status params that interact with each other. |
| This class maintains the business logic of parsing these status params |
| and converting it to Magma understood fields. |
| """ |
| |
| DEFGW_STATUS_PATH = "Device.X_SCM_DeviceFeature.X_SCM_NEStatus.X_SCM_DEFGW_Status" |
| SAS_STATUS_PATH = "Device.Services.FAPService.1.FAPControl.LTE.X_SCM_SAS.State" |
| ENB_STATUS_PATH = "Device.X_SCM_DeviceFeature.X_SCM_NEStatus.X_SCM_eNB_Status" |
| |
| # Status parameters |
| DEFAULT_GW = "defaultGW" |
| SYNC_STATUS = "syncStatus" |
| SAS_STATUS = "sasStatus" |
| ENB_STATUS = "enbStatus" |
| GPS_SCAN_STATUS = "gpsScanStatus" |
| |
| STATUS_PARAMETERS = { |
| # Status nodes |
| # This works |
| DEFAULT_GW: TrParam( |
| DEFGW_STATUS_PATH, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # SYNC_STATUS: TrParam( |
| # STATUS_PATH + 'X_000E8F_Sync_Status', is_invasive=False, |
| # type=TrParameterType.STRING, is_optional=False, |
| # ), |
| # This works |
| SAS_STATUS: TrParam( |
| SAS_STATUS_PATH, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # This doesn't work |
| # ENB_STATUS: TrParam( |
| # ENB_STATUS_PATH, is_invasive=False, |
| # type=TrParameterType.STRING, is_optional=False, |
| # ), |
| # GPS status, lat, long |
| GPS_SCAN_STATUS: TrParam( |
| "Device.FAP.GPS.ScanStatus", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.GPS_LAT: TrParam( |
| "Device.FAP.GPS.LockedLatitude", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.GPS_LONG: TrParam( |
| "Device.FAP.GPS.LockedLongitude", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| |
| # Derived status params that don't have tr69 representation. |
| DERIVED_STATUS_PARAMETERS = { |
| ParameterName.RF_TX_STATUS: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.GPS_STATUS: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PTP_STATUS: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.MME_STATUS: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.OP_STATE: TrParam( |
| InvalidTrParamPath, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| } |
| |
| @classmethod |
| def set_magma_device_cfg( |
| cls, name_to_val: Dict, device_cfg: EnodebConfiguration, |
| ): |
| """ |
| Convert FreedomFiOne name_to_val representation to magma device_cfg |
| """ |
| success_str = "SUCCESS" # String constant returned by radio |
| insync_str = "INSYNC" |
| |
| if ( |
| name_to_val.get(cls.DEFAULT_GW) |
| and name_to_val[cls.DEFAULT_GW].upper() != success_str |
| ): |
| # Nothing will proceed if the eNB doesn't have an IP on the WAN |
| serial_num = "unknown" |
| if device_cfg.has_parameter(ParameterName.SERIAL_NUMBER): |
| serial_num = device_cfg.get_parameter(ParameterName.SERIAL_NUMBER,) |
| EnodebdLogger.error( |
| "Radio with serial number %s doesn't have IP address " "on WAN", |
| serial_num, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=False, |
| ) |
| return |
| |
| if ( |
| name_to_val.get(cls.SAS_STATUS) |
| and name_to_val[cls.SAS_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=True, |
| ) |
| else: |
| # No sas grant so not transmitting. There is no explicit node for Tx |
| # in FreedomFiOne |
| device_cfg.set_parameter( |
| param_name=ParameterName.RF_TX_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.GPS_SCAN_STATUS) |
| and name_to_val[cls.GPS_SCAN_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=True, |
| ) |
| # Time comes through GPS so can only be insync with GPS is |
| # in sync, we use PTP_STATUS field to overload timer is in Sync. |
| if ( |
| name_to_val.get(cls.SYNC_STATUS) |
| and name_to_val[cls.SYNC_STATUS].upper() == insync_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.GPS_STATUS, value=False, |
| ) |
| device_cfg.set_parameter( |
| param_name=ParameterName.PTP_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.DEFAULT_GW) |
| and name_to_val[cls.DEFAULT_GW].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.MME_STATUS, value=False, |
| ) |
| |
| if ( |
| name_to_val.get(cls.ENB_STATUS) |
| and name_to_val[cls.ENB_STATUS].upper() == success_str |
| ): |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=True, |
| ) |
| else: |
| device_cfg.set_parameter( |
| param_name=ParameterName.OP_STATE, value=False, |
| ) |
| |
| pass_through_params = [ParameterName.GPS_LAT, ParameterName.GPS_LONG] |
| for name in pass_through_params: |
| device_cfg.set_parameter(name, name_to_val[name]) |
| |
| |
| class FreedomFiOneMiscParameters: |
| """ |
| Default set of parameters that enable carrier aggregation and other |
| miscellaneous properties |
| """ |
| |
| FAP_CONTROL = "Device.Services.FAPService.1.FAPControl." |
| FAPSERVICE_PATH = "Device.Services.FAPService.1." |
| |
| # Tunnel ref format clobber it to non IPSEC as we don't support |
| # IPSEC |
| TUNNEL_REF = "tunnel_ref" |
| PRIM_SOURCE = "prim_src" |
| |
| # Carrier aggregation |
| CARRIER_AGG_ENABLE = "carrier_agg_enable" |
| CARRIER_NUMBER = "carrier_number" # Carrier aggregation params |
| CONTIGUOUS_CC = "contiguous_cc" |
| WEB_UI_ENABLE = "web_ui_enable" # Enable or disable local enb UI |
| |
| MISC_PARAMETERS = { |
| # WEB_UI_ENABLE: TrParam( |
| # 'Device.X_000E8F_DeviceFeature.X_000E8F_WebServerEnable', |
| # is_invasive=False, |
| # type=TrParameterType.BOOLEAN, is_optional=False, |
| # ), |
| CARRIER_AGG_ENABLE: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_RRMConfig.X_000E8F_CA_Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| CARRIER_NUMBER: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_RRMConfig.X_000E8F_Cell_Number", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| CONTIGUOUS_CC: TrParam( |
| FAP_CONTROL + "LTE.X_000E8F_RRMConfig.X_000E8F_CELL_Freq_Contiguous", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| TUNNEL_REF: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.Tunnel.1.TunnelRef", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| PRIM_SOURCE: TrParam( |
| FAPSERVICE_PATH + "REM.X_000E8F_tfcsManagerConfig.primSrc", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| |
| # Hardcoded defaults |
| defaults = { |
| # Use IPV4 only |
| TUNNEL_REF: "Device.IP.Interface.1.IPv4Address.1.", |
| # Only synchronize with GPS |
| PRIM_SOURCE: "FREE_RUNNING", |
| # Always enable carrier aggregation for the CBRS bands |
| CARRIER_AGG_ENABLE: False, |
| CARRIER_NUMBER: 1, # CBRS has two carriers |
| CONTIGUOUS_CC: 0, # Its not contiguous carrier |
| } |
| |
| |
| class FreedomFiOneHandler(BasicEnodebAcsStateMachine): |
| def __init__(self, service: MagmaService,) -> None: |
| self._state_map = {} |
| super().__init__(service=service, use_param_key=True) |
| |
| def reboot_asap(self) -> None: |
| self.transition("reboot") |
| |
| def is_enodeb_connected(self) -> bool: |
| return not isinstance(self.state, WaitInformState) |
| |
| def _init_state_map(self) -> None: |
| self._state_map = { |
| # Inform comes in -> Respond with InformResponse |
| "wait_inform": WaitInformState(self, when_done="get_rpc_methods"), |
| # If first inform after boot -> GetRpc request comes in, if not |
| # empty request comes in => Transition to get_transient_params |
| "get_rpc_methods": FreedomFiOneGetInitState( |
| self, when_done="get_transient_params", |
| ), |
| # Read transient readonly params. |
| "get_transient_params": FreedomFiOneSendGetTransientParametersState( |
| self, when_done="get_params", |
| ), |
| "get_params": FreedomFiOneGetObjectParametersState( |
| self, |
| when_delete="delete_objs", |
| when_add="add_objs", |
| when_set="set_params", |
| when_skip="end_session", |
| ), |
| "delete_objs": DeleteObjectsState( |
| self, when_add="add_objs", when_skip="set_params", |
| ), |
| "add_objs": AddObjectsState(self, when_done="set_params"), |
| "set_params": SetParameterValuesState(self, when_done="wait_set_params",), |
| "wait_set_params": WaitSetParameterValuesState( |
| self, |
| when_done="check_get_params", |
| when_apply_invasive="check_get_params", |
| status_non_zero_allowed=True, |
| ), |
| "check_get_params": GetParametersState( |
| self, when_done="check_wait_get_params", request_all_params=True, |
| ), |
| "check_wait_get_params": WaitGetParametersState( |
| self, when_done="end_session", |
| ), |
| "end_session": EndSessionState(self), |
| # These states are only entered through manual user intervention |
| "reboot": EnbSendRebootState(self, when_done="wait_reboot"), |
| "wait_reboot": WaitRebootResponseState( |
| self, when_done="wait_post_reboot_inform", |
| ), |
| "wait_post_reboot_inform": WaitInformMRebootState( |
| self, when_done="wait_empty", when_timeout="wait_inform", |
| ), |
| # The states below are entered when an unexpected message type is |
| # received |
| "unexpected_fault": ErrorState( |
| self, inform_transition_target="wait_inform", |
| ), |
| } |
| |
| @property |
| def device_name(self) -> str: |
| return EnodebDeviceName.FREEDOMFI_ONE |
| |
| @property |
| def data_model_class(self) -> Type[DataModel]: |
| return FreedomFiOneTrDataModel |
| |
| @property |
| def config_postprocessor(self) -> EnodebConfigurationPostProcessor: |
| return FreedomFiOneConfigurationInitializer(self) |
| |
| @property |
| def state_map(self) -> Dict[str, EnodebAcsState]: |
| return self._state_map |
| |
| @property |
| def disconnected_state_name(self) -> str: |
| return "wait_inform" |
| |
| @property |
| def unexpected_fault_state_name(self) -> str: |
| return "unexpected_fault" |
| |
| |
| class FreedomFiOneTrDataModel(DataModel): |
| """ |
| Class to represent relevant data model parameters from TR-196/TR-098. |
| This class is effectively read-only. |
| |
| These models have these idiosyncrasies (on account of running TR098): |
| |
| - Parameter content root is different (InternetGatewayDevice) |
| - GetParameter queries with a wildcard e.g. InternetGatewayDevice. do |
| not respond with the full tree (we have to query all parameters) |
| - MME status is not exposed - we assume the MME is connected if |
| the eNodeB is transmitting (OpState=true) |
| - Parameters such as band capability/duplex config |
| are rooted under `boardconf.` and not the device config root |
| - Num PLMNs is not reported by these units |
| """ |
| |
| # Mapping of TR parameter paths to aliases |
| DEVICE_PATH = "Device." |
| FAPSERVICE_PATH = DEVICE_PATH + "Services.FAPService.1." |
| FAP_CONTROL = FAPSERVICE_PATH + "FAPControl." |
| BCCH = FAPSERVICE_PATH + "REM.LTE.Cell.1.BCCH." |
| |
| PARAMETERS = { |
| # Top-level objects |
| ParameterName.DEVICE: TrParam( |
| DEVICE_PATH, |
| is_invasive=False, |
| type=TrParameterType.OBJECT, |
| is_optional=False, |
| ), |
| ParameterName.FAP_SERVICE: TrParam( |
| FAP_CONTROL, |
| is_invasive=False, |
| type=TrParameterType.OBJECT, |
| is_optional=False, |
| ), |
| # Device info |
| ParameterName.SW_VERSION: TrParam( |
| DEVICE_PATH + "DeviceInfo.SoftwareVersion", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SERIAL_NUMBER: TrParam( |
| DEVICE_PATH + "DeviceInfo.SerialNumber", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| # RF-related parameters |
| ParameterName.EARFCNDL: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.EARFCNDL", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.DL_BANDWIDTH: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.DLBandwidth", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.UL_BANDWIDTH: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.ULBandwidth", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.PCI: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.RF.PhyCellID", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.SUBFRAME_ASSIGNMENT: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.PHY.TDDFrame.SubFrameAssignment", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.SPECIAL_SUBFRAME_PATTERN: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.PHY.TDDFrame.SpecialSubframePatterns", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.CELL_ID: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.RAN.Common.CellIdentity", |
| is_invasive=False, |
| type=TrParameterType.UNSIGNED_INT, |
| is_optional=False, |
| ), |
| # Readonly LTE state |
| ParameterName.ADMIN_STATE: TrParam( |
| FAP_CONTROL + "LTE.AdminState", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.GPS_ENABLE: TrParam( |
| DEVICE_PATH + "FAP.GPS.ScanOnBoot", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| # Core network parameters |
| ParameterName.MME_IP: TrParam( |
| FAP_CONTROL + "LTE.Gateway.S1SigLinkServerList", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| ParameterName.MME_PORT: TrParam( |
| FAP_CONTROL + "LTE.Gateway.S1SigLinkPort", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| # It may not work, comment out first |
| # ParameterName.NUM_PLMNS: TrParam( |
| # FAPSERVICE_PATH + 'CellConfig.LTE.EPC.PLMNListNumberOfEntries', |
| # is_invasive=False, |
| # type=TrParameterType.INT, is_optional=False, |
| # ), |
| ParameterName.TAC: TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.TAC", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| # Management server parameters |
| ParameterName.PERIODIC_INFORM_ENABLE: TrParam( |
| DEVICE_PATH + "ManagementServer.PeriodicInformEnable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PERIODIC_INFORM_INTERVAL: TrParam( |
| DEVICE_PATH + "ManagementServer.PeriodicInformInterval", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| # Performance management parameters |
| ParameterName.PERF_MGMT_ENABLE: TrParam( |
| DEVICE_PATH + "FAP.PerfMgmt.Config.1.Enable", |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ), |
| ParameterName.PERF_MGMT_UPLOAD_INTERVAL: TrParam( |
| DEVICE_PATH + "FAP.PerfMgmt.Config.1.PeriodicUploadInterval", |
| is_invasive=False, |
| type=TrParameterType.INT, |
| is_optional=False, |
| ), |
| ParameterName.PERF_MGMT_UPLOAD_URL: TrParam( |
| DEVICE_PATH + "FAP.PerfMgmt.Config.1.URL", |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ), |
| } |
| TRANSFORMS_FOR_ENB = {} |
| NUM_PLMNS_IN_CONFIG = 1 |
| for i in range(1, NUM_PLMNS_IN_CONFIG + 1): |
| PARAMETERS[ParameterName.PLMN_N % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d." % i, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_CELL_RESERVED % i] = TrParam( |
| FAPSERVICE_PATH |
| + "CellConfig.LTE.EPC.PLMNList.%d.CellReservedForOperatorUse" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_ENABLE % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.Enable" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_PRIMARY % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.IsPrimary" % i, |
| is_invasive=False, |
| type=TrParameterType.BOOLEAN, |
| is_optional=False, |
| ) |
| PARAMETERS[ParameterName.PLMN_N_PLMNID % i] = TrParam( |
| FAPSERVICE_PATH + "CellConfig.LTE.EPC.PLMNList.%d.PLMNID" % i, |
| is_invasive=False, |
| type=TrParameterType.STRING, |
| is_optional=False, |
| ) |
| |
| PARAMETERS.update(SASParameters.SAS_PARAMETERS) |
| PARAMETERS.update(FreedomFiOneMiscParameters.MISC_PARAMETERS) |
| PARAMETERS.update(StatusParameters.STATUS_PARAMETERS) |
| # These are stateful parameters that have no tr-69 representation |
| PARAMETERS.update(StatusParameters.DERIVED_STATUS_PARAMETERS) |
| |
| TRANSFORMS_FOR_MAGMA = { |
| # We don't set these parameters |
| ParameterName.BAND_CAPABILITY: transform_for_magma.band_capability, |
| ParameterName.DUPLEX_MODE_CAPABILITY: transform_for_magma.duplex_mode, |
| } |
| |
| @classmethod |
| def get_parameter(cls, param_name: ParameterName) -> Optional[TrParam]: |
| return cls.PARAMETERS.get(param_name) |
| |
| @classmethod |
| def _get_magma_transforms(cls,) -> Dict[ParameterName, Callable[[Any], Any]]: |
| return cls.TRANSFORMS_FOR_MAGMA |
| |
| @classmethod |
| def _get_enb_transforms(cls) -> Dict[ParameterName, Callable[[Any], Any]]: |
| return cls.TRANSFORMS_FOR_ENB |
| |
| @classmethod |
| def get_load_parameters(cls) -> List[ParameterName]: |
| """ |
| Load all the parameters instead of a subset. |
| """ |
| return list(cls.PARAMETERS.keys()) |
| |
| @classmethod |
| def get_num_plmns(cls) -> int: |
| return cls.NUM_PLMNS_IN_CONFIG |
| |
| @classmethod |
| def get_parameter_names(cls) -> List[ParameterName]: |
| excluded_params = [ |
| str(ParameterName.DEVICE), |
| str(ParameterName.FAP_SERVICE), |
| ] |
| names = list( |
| filter( |
| lambda x: (not str(x).startswith("PLMN")) |
| and (str(x) not in excluded_params), |
| cls.PARAMETERS.keys(), |
| ), |
| ) |
| return names |
| |
| @classmethod |
| def get_numbered_param_names(cls,) -> Dict[ParameterName, List[ParameterName]]: |
| names = {} |
| for i in range(1, cls.NUM_PLMNS_IN_CONFIG + 1): |
| params = [ |
| ParameterName.PLMN_N_CELL_RESERVED % i, |
| ParameterName.PLMN_N_ENABLE % i, |
| ParameterName.PLMN_N_PRIMARY % i, |
| ParameterName.PLMN_N_PLMNID % i, |
| ] |
| names[ParameterName.PLMN_N % i] = params |
| |
| return names |
| |
| @classmethod |
| def get_sas_param_names(cls) -> List[ParameterName]: |
| return SASParameters.SAS_PARAMETERS.keys() |
| |
| |
| class FreedomFiOneConfigurationInitializer(EnodebConfigurationPostProcessor): |
| """ |
| Class to add the sas related parameters to the desired config. |
| """ |
| |
| SAS_KEY = "sas" |
| WEB_UI_ENABLE_LIST_KEY = "web_ui_enable_list" |
| |
| def __init__(self, acs: EnodebAcsStateMachine): |
| super().__init__() |
| self.acs = acs |
| |
| def postprocess( |
| self, mconfig: Any, service_cfg: Any, desired_cfg: EnodebConfiguration, |
| ) -> None: |
| |
| desired_cfg.delete_parameter(ParameterName.EARFCNDL) |
| desired_cfg.delete_parameter(ParameterName.DL_BANDWIDTH) |
| desired_cfg.delete_parameter(ParameterName.UL_BANDWIDTH) |
| |
| # go through misc parameters and set them to default. |
| for name, val in FreedomFiOneMiscParameters.defaults.items(): |
| desired_cfg.set_parameter(name, val) |
| |
| # Bump up the parameter key version |
| self.acs.parameter_version_inc() |
| |
| if self.WEB_UI_ENABLE_LIST_KEY in service_cfg: |
| serial_nos = service_cfg.get(self.WEB_UI_ENABLE_LIST_KEY) |
| if self.acs.device_cfg.has_parameter(ParameterName.SERIAL_NUMBER,): |
| if self.acs.get_parameter(ParameterName.SERIAL_NUMBER) in serial_nos: |
| desired_cfg.set_parameter( |
| FreedomFiOneMiscParameters.WEB_UI_ENABLE, True, |
| ) |
| else: |
| # This should not happen |
| EnodebdLogger.error("Serial number unknown for device") |
| |
| # Load eNB customized configuration from "./magma_config/serial_number/" |
| # and configure each connected eNB based on serial number |
| enbcfg = load_enb_config() |
| sn = self.acs.get_parameter(ParameterName.SERIAL_NUMBER) |
| |
| for name, val in enbcfg.get(sn, {}).items(): |
| # The SAS configuration for eNodeB |
| if name in ["sas", "cell"]: |
| for subname, subval in val.items(): |
| print("Config %s updated to: %s" % (subname, subval)) |
| desired_cfg.set_parameter(subname, subval) |
| |
| print(desired_cfg) |
| |
| |
| class FreedomFiOneSendGetTransientParametersState(EnodebAcsState): |
| """ |
| Periodically read eNodeB status. Note: keep frequency low to avoid |
| backing up large numbers of read operations if enodebd is busy. |
| Some eNB parameters are read only and updated by the eNB itself. |
| """ |
| |
| def __init__(self, acs: EnodebAcsStateMachine, when_done: str): |
| super().__init__() |
| self.acs = acs |
| self.done_transition = when_done |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| |
| request = models.GetParameterValues() |
| request.ParameterNames = models.ParameterNames() |
| request.ParameterNames.string = [] |
| |
| # request = models.GetParameterNames() |
| # request.ParameterPath = "Device." |
| # request.NextLevel = False |
| |
| # Get the status parameters which was defined in Line 171 |
| for _, tr_param in StatusParameters.STATUS_PARAMETERS.items(): |
| path = tr_param.path |
| request.ParameterNames.string.append(path) |
| |
| request.ParameterNames.arrayType = "xsd:string[%d]" % len( |
| request.ParameterNames.string |
| ) |
| |
| return AcsMsgAndTransition(msg=request, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| |
| if not isinstance(message, models.GetParameterValuesResponse): |
| return AcsReadMsgResult(msg_handled=False, next_state=None) |
| # Current values of the fetched parameters |
| name_to_val = parse_get_parameter_values_response(self.acs.data_model, message,) |
| EnodebdLogger.debug("Received Parameters: %s", str(name_to_val)) |
| |
| # Update device configuration |
| StatusParameters.set_magma_device_cfg( |
| name_to_val, self.acs.device_cfg, |
| ) |
| |
| return AcsReadMsgResult(msg_handled=True, next_state=self.done_transition,) |
| |
| def state_description(self) -> str: |
| return "Getting transient read-only parameters" |
| |
| |
| class FreedomFiOneGetInitState(EnodebAcsState): |
| """ |
| After the first Inform message the following can happen: |
| 1 - eNB can try to learn the RPC method of the ACS, reply back with the |
| RPC response (happens right after boot) |
| 2 - eNB can send an empty message -> This means that the eNB is already |
| provisioned so transition to next state. Only transition to next state |
| after this message. |
| 3 - Some other method call that we don't care about so ignore. |
| expected that the eNB -> This is an unhandled state so unlikely |
| """ |
| |
| def __init__(self, acs: EnodebAcsStateMachine, when_done): |
| super().__init__() |
| self.acs = acs |
| self.done_transition = when_done |
| self._is_rpc_request = False |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| """ |
| Return empty message response if care about this |
| message type otherwise return empty RPC methods response. |
| """ |
| if not self._is_rpc_request: |
| resp = models.DummyInput() |
| return AcsMsgAndTransition(msg=resp, next_state=None) |
| |
| resp = models.GetRPCMethodsResponse() |
| resp.MethodList = models.MethodList() |
| RPC_METHODS = ["Inform", "GetRPCMethods", "TransferComplete"] |
| resp.MethodList.arrayType = "xsd:string[%d]" % len(RPC_METHODS) |
| resp.MethodList.string = RPC_METHODS |
| # Don't transition to next state wait for the empty HTTP post |
| return AcsMsgAndTransition(msg=resp, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| # If this is a regular Inform, not after a reboot we'll get an empty |
| # message, in this case transition to the next state. We consider |
| # this phase as "initialized" |
| if isinstance(message, models.DummyInput): |
| return AcsReadMsgResult(msg_handled=True, next_state=self.done_transition,) |
| if not isinstance(message, models.GetRPCMethods): |
| # Unexpected, just don't die, ignore message. |
| logging.error("Ignoring message %s", str(type(message))) |
| # Set this so get_msg will return an empty message |
| self._is_rpc_request = False |
| else: |
| # Return a valid RPC response |
| self._is_rpc_request = True |
| return AcsReadMsgResult(msg_handled=True, next_state=None) |
| |
| def state_description(self) -> str: |
| return "Initializing the post boot sequence for eNB" |
| |
| |
| class FreedomFiOneGetObjectParametersState(EnodebAcsState): |
| """ |
| Get information on parameters belonging to objects that can be added or |
| removed from the configuration. |
| |
| Englewood will report a parameter value as None if it does not exist |
| in the data model, rather than replying with a Fault message like most |
| eNB devices. |
| """ |
| |
| def __init__( |
| self, |
| acs: EnodebAcsStateMachine, |
| when_delete: str, |
| when_add: str, |
| when_set: str, |
| when_skip: str, |
| ): |
| super().__init__() |
| self.acs = acs |
| self.rm_obj_transition = when_delete |
| self.add_obj_transition = when_add |
| self.set_params_transition = when_set |
| self.skip_transition = when_skip |
| |
| def get_params_to_get(self, data_model: DataModel,) -> List[ParameterName]: |
| names = [] |
| |
| # First get base params |
| names = get_params_to_get( |
| self.acs.device_cfg, self.acs.data_model, request_all_params=True, |
| ) |
| # Add object params. |
| num_plmns = data_model.get_num_plmns() |
| obj_to_params = data_model.get_numbered_param_names() |
| for i in range(1, num_plmns + 1): |
| obj_name = ParameterName.PLMN_N % i |
| desired = obj_to_params[obj_name] |
| names += desired |
| return names |
| |
| def get_msg(self, message: Any) -> AcsMsgAndTransition: |
| """ Respond with GetParameterValuesRequest """ |
| names = self.get_params_to_get(self.acs.data_model,) |
| |
| # Generate the request |
| request = models.GetParameterValues() |
| request.ParameterNames = models.ParameterNames() |
| request.ParameterNames.arrayType = "xsd:string[%d]" % len(names) |
| request.ParameterNames.string = [] |
| for name in names: |
| path = self.acs.data_model.get_parameter(name).path |
| if path is not InvalidTrParamPath: |
| # Only get data elements backed by tr69 path |
| request.ParameterNames.string.append(path) |
| |
| return AcsMsgAndTransition(msg=request, next_state=None) |
| |
| def read_msg(self, message: Any) -> AcsReadMsgResult: |
| """ |
| Process GetParameterValuesResponse |
| """ |
| if not isinstance(message, models.GetParameterValuesResponse): |
| return AcsReadMsgResult(msg_handled=False, next_state=None) |
| |
| path_to_val = {} |
| for param_value_struct in message.ParameterList.ParameterValueStruct: |
| path_to_val[param_value_struct.Name] = param_value_struct.Value.Data |
| |
| EnodebdLogger.debug("Received object parameters: %s", str(path_to_val)) |
| |
| # Parse simple params |
| param_name_list = self.acs.data_model.get_parameter_names() |
| |
| for name in param_name_list: |
| path = self.acs.data_model.get_parameter(name).path |
| if path in path_to_val: |
| value = path_to_val.get(path) |
| magma_val = self.acs.data_model.transform_for_magma(name, value,) |
| self.acs.device_cfg.set_parameter(name, magma_val) |
| |
| # Parse object params |
| num_plmns = self.acs.data_model.get_num_plmns() |
| for i in range(1, num_plmns + 1): |
| obj_name = ParameterName.PLMN_N % i |
| obj_to_params = self.acs.data_model.get_numbered_param_names() |
| param_name_list = obj_to_params[obj_name] |
| for name in param_name_list: |
| path = self.acs.data_model.get_parameter(name).path |
| if path in path_to_val: |
| value = path_to_val.get(path) |
| if value is None: |
| continue |
| if obj_name not in self.acs.device_cfg.get_object_names(): |
| self.acs.device_cfg.add_object(obj_name) |
| magma_value = self.acs.data_model.transform_for_magma(name, value) |
| self.acs.device_cfg.set_parameter_for_object( |
| name, magma_value, obj_name, |
| ) |
| |
| # Now we have enough information to build the desired configuration |
| if self.acs.desired_cfg is None: |
| self.acs.desired_cfg = build_desired_config( |
| self.acs.mconfig, |
| self.acs.service_config, |
| self.acs.device_cfg, |
| self.acs.data_model, |
| self.acs.config_postprocessor, |
| ) |
| |
| if ( |
| len(get_all_objects_to_delete(self.acs.desired_cfg, self.acs.device_cfg,),) |
| > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.rm_obj_transition, |
| ) |
| elif ( |
| len(get_all_objects_to_add(self.acs.desired_cfg, self.acs.device_cfg,),) > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.add_obj_transition, |
| ) |
| elif ( |
| len( |
| get_all_param_values_to_set( |
| self.acs.desired_cfg, self.acs.device_cfg, self.acs.data_model, |
| ), |
| ) |
| > 0 |
| ): |
| return AcsReadMsgResult( |
| msg_handled=True, next_state=self.set_params_transition, |
| ) |
| return AcsReadMsgResult(msg_handled=True, next_state=self.skip_transition,) |
| |
| def state_description(self) -> str: |
| return "Getting well known parameters" |