blob: f7f5b6f2fa8645346df530bb577d2011be042e99 [file] [log] [blame]
# SPDX-FileCopyrightText: 2020 The Magma Authors.
# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
#
# SPDX-License-Identifier: BSD-3-Clause
import json
import os
from collections import namedtuple
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from lte.protos.enodebd_pb2 import SingleEnodebStatus
from lte.protos.mconfig import mconfigs_pb2
from common import serialization_utils
import metrics
from data_models.data_model_parameters import ParameterName
from device_config.configuration_util import (
find_enb_by_cell_id,
get_enb_rf_tx_desired,
)
from exceptions import ConfigurationError
from logger import EnodebdLogger as logger
from s1ap_client import get_all_enb_state
from state_machines.enb_acs import EnodebAcsStateMachine
from state_machines.enb_acs_manager import StateMachineManager
from orc8r.protos.service303_pb2 import State
# There are 2 levels of caching for GPS coordinates from the enodeB: module
# variables (in-memory) and on disk. In the event the enodeB stops reporting
# GPS, we will continue to report the cached coordinates from the in-memory
# cached coordinates. If enodebd is restarted, this in-memory cache will be
# populated by the file
CACHED_GPS_COORD_FILE_PATH = os.path.join(
'/var/opt/magma/enodebd',
'gps_coords.txt',
)
# Cache GPS coordinates in memory so we don't write to the file cache if the
# coordinates have not changed. We can read directly from here instead of the
# file cache when the enodeB goes down unless these are unintialized.
_gps_lat_cached = None
_gps_lon_cached = None
EnodebStatus = NamedTuple(
'EnodebStatus',
[
('enodeb_configured', bool),
('gps_latitude', str),
('gps_longitude', str),
('enodeb_connected', bool),
('opstate_enabled', bool),
('rf_tx_on', bool),
('rf_tx_desired', bool),
('gps_connected', bool),
('ptp_connected', bool),
('mme_connected', bool),
('fsm_state', str),
('cell_id', int),
],
)
# TODO: Remove after checkins support multiple eNB status
MagmaOldEnodebdStatus = namedtuple(
'MagmaOldEnodebdStatus',
[
'enodeb_serial',
'enodeb_configured',
'gps_latitude',
'gps_longitude',
'enodeb_connected',
'opstate_enabled',
'rf_tx_on',
'rf_tx_desired',
'gps_connected',
'ptp_connected',
'mme_connected',
'enodeb_state',
],
)
MagmaEnodebdStatus = NamedTuple(
'MagmaEnodebdStatus',
[
('n_enodeb_connected', str),
('all_enodeb_configured', str),
('all_enodeb_opstate_enabled', str),
('all_enodeb_rf_tx_configured', str),
('any_enodeb_gps_connected', str),
('all_enodeb_ptp_connected', str),
('all_enodeb_mme_connected', str),
('gateway_gps_longitude', str),
('gateway_gps_latitude', str),
],
)
def update_status_metrics(status: EnodebStatus) -> None:
""" Update metrics for eNodeB status """
# Call every second
metrics_by_stat_key = {
'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
'gps_connected': metrics.STAT_GPS_CONNECTED,
'ptp_connected': metrics.STAT_PTP_CONNECTED,
'mme_connected': metrics.STAT_MME_CONNECTED,
}
def get_metric_value(enodeb_status: Dict[str, str], key: str):
# Metrics are "sticky" when synced to the cloud - if we don't
# receive a status update from enodeb, set the metric to 0
# to explicitly indicate that it was not received, otherwise the
# metrics collector will continue to report the last value
val = enodeb_status.get(key, None)
if val is None:
return 0
if type(val) is not bool:
logger.error('Could not cast metric value %s to int', val)
return 0
return int(val) # val should be either True or False
for stat_key, metric in metrics_by_stat_key.items():
metric.set(get_metric_value(status._asdict(), stat_key))
# TODO: Remove after checkins support multiple eNB status
def get_service_status_old(
enb_acs_manager: StateMachineManager,
) -> Dict[str, Any]:
""" Get service status compatible with older controller """
enb_status_by_serial = get_all_enb_status(enb_acs_manager)
# Since we only expect users to plug in a single eNB, generate service
# status with the first one we find that is connected
for enb_serial, enb_status in enb_status_by_serial.items():
if enb_status.enodeb_connected:
return MagmaOldEnodebdStatus(
enodeb_serial=enb_serial,
enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
gps_latitude=enb_status.gps_latitude,
gps_longitude=enb_status.gps_longitude,
enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
gps_connected=_bool_to_str(enb_status.gps_connected),
ptp_connected=_bool_to_str(enb_status.ptp_connected),
mme_connected=_bool_to_str(enb_status.mme_connected),
enodeb_state=enb_status.fsm_state,
)._asdict()
return MagmaOldEnodebdStatus(
enodeb_serial='N/A',
enodeb_configured='0',
gps_latitude='0.0',
gps_longitude='0.0',
enodeb_connected='0',
opstate_enabled='0',
rf_tx_on='0',
rf_tx_desired='N/A',
gps_connected='0',
ptp_connected='0',
mme_connected='0',
enodeb_state='N/A',
)._asdict()
def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
enodebd_status = _get_enodebd_status(enb_acs_manager)
return enodebd_status._asdict()
def _get_enodebd_status(
enb_acs_manager: StateMachineManager,
) -> MagmaEnodebdStatus:
enb_status_by_serial = get_all_enb_status(enb_acs_manager)
# Start from default values for enodebd status
n_enodeb_connected = 0
all_enodeb_configured = False
all_enodeb_opstate_enabled = False
all_enodeb_rf_tx_configured = False
any_enodeb_gps_connected = False
all_enodeb_ptp_connected = False
all_enodeb_mme_connected = False
gateway_gps_longitude = '0.0'
gateway_gps_latitude = '0.0'
def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
return enb_status.rf_tx_on == enb_status.rf_tx_desired
if enb_status_by_serial:
enb_status_list = list(enb_status_by_serial.values())
# Aggregate all eNB status for enodebd status, repetitive but
# clearer for output purposes.
n_enodeb_connected = sum(
enb_status.enodeb_connected for enb_status in enb_status_list
)
all_enodeb_configured = all(
enb_status.enodeb_configured for enb_status in enb_status_list
)
all_enodeb_mme_connected = all(
enb_status.mme_connected for enb_status in enb_status_list
)
all_enodeb_opstate_enabled = all(
enb_status.opstate_enabled for enb_status in enb_status_list
)
all_enodeb_ptp_connected = all(
enb_status.ptp_connected for enb_status in enb_status_list
)
any_enodeb_gps_connected = any(
enb_status.gps_connected for enb_status in enb_status_list
)
all_enodeb_rf_tx_configured = all(
_is_rf_tx_configured(enb_status) for enb_status in enb_status_list
)
if n_enodeb_connected:
gateway_gps_longitude = enb_status_list[0].gps_longitude
gateway_gps_latitude = enb_status_list[0].gps_latitude
return MagmaEnodebdStatus(
n_enodeb_connected=str(n_enodeb_connected),
all_enodeb_configured=str(all_enodeb_configured),
all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
any_enodeb_gps_connected=str(any_enodeb_gps_connected),
all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
all_enodeb_mme_connected=str(all_enodeb_mme_connected),
gateway_gps_longitude=str(gateway_gps_longitude),
gateway_gps_latitude=str(gateway_gps_latitude),
)
def get_all_enb_status(
enb_acs_manager: StateMachineManager,
) -> Dict[str, EnodebStatus]:
enb_status_by_serial = {}
serial_list = enb_acs_manager.get_connected_serial_id_list()
for enb_serial in serial_list:
handler = enb_acs_manager.get_handler_by_serial(enb_serial)
status = get_enb_status(handler)
enb_status_by_serial[enb_serial] = status
return enb_status_by_serial
def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
"""
Returns a dict representing the status of an enodeb
The returned dictionary will be a subset of the following keys:
- enodeb_connected
- enodeb_configured
- opstate_enabled
- rf_tx_on
- rf_tx_desired
- gps_connected
- ptp_connected
- mme_connected
- gps_latitude
- gps_longitude
- ip_address
- cell_id
The set of keys returned will depend on the connection status of the
enodeb. A missing key indicates that the value is unknown.
Returns:
Status dictionary for the enodeb state
"""
enodeb_configured = enodeb.is_enodeb_configured()
# We cache GPS coordinates so try to read them before the early return
# if the enB is not connected
gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)
enodeb_connected = enodeb.is_enodeb_connected()
opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
rf_tx_on = rf_tx_on and enodeb_connected
try:
enb_serial = \
enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
enb_cell_id = int(
enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
)
rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
except (KeyError, ConfigurationError):
rf_tx_desired = False
enb_cell_id = 0
mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
gps_connected = _get_gps_status_as_bool(enodeb)
try:
ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
except ConfigurationError:
ptp_connected = False
return EnodebStatus(
enodeb_configured=enodeb_configured,
gps_latitude=gps_lat,
gps_longitude=gps_lon,
enodeb_connected=enodeb_connected,
opstate_enabled=opstate_enabled,
rf_tx_on=rf_tx_on,
rf_tx_desired=rf_tx_desired,
gps_connected=gps_connected,
ptp_connected=ptp_connected,
mme_connected=mme_connected,
fsm_state=enodeb.get_state(),
cell_id=enb_cell_id,
)
def get_single_enb_status(
device_serial: str,
state_machine_manager: StateMachineManager,
) -> SingleEnodebStatus:
try:
handler = state_machine_manager.get_handler_by_serial(device_serial)
except KeyError:
return _empty_enb_status()
# This namedtuple is missing IP and serial info
status = get_enb_status(handler)
# Get IP info
ip = state_machine_manager.get_ip_of_serial(device_serial)
def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
if status:
return SingleEnodebStatus.StatusProperty.Value('ON')
return SingleEnodebStatus.StatusProperty.Value('OFF')
# Build the message to return through gRPC
enb_status = SingleEnodebStatus()
enb_status.device_serial = device_serial
enb_status.ip_address = ip
enb_status.connected = get_status_property(status.enodeb_connected)
enb_status.configured = get_status_property(status.enodeb_configured)
enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
enb_status.gps_connected = get_status_property(status.gps_connected)
enb_status.ptp_connected = get_status_property(status.ptp_connected)
enb_status.mme_connected = get_status_property(status.mme_connected)
enb_status.gps_longitude = status.gps_longitude
enb_status.gps_latitude = status.gps_latitude
enb_status.fsm_state = status.fsm_state
return enb_status
def get_operational_states(
enb_acs_manager: StateMachineManager,
mconfig: mconfigs_pb2.EnodebD,
) -> List[State]:
"""
Returns: A list of State with EnodebStatus encoded as JSON
"""
states = []
configured_serial_ids = []
enb_status_by_serial = get_all_enb_status(enb_acs_manager)
# Get S1 connected eNBs
enb_statuses = get_all_enb_state()
for serial_id in enb_status_by_serial:
enb_status_dict = enb_status_by_serial[serial_id]._asdict()
# Add IP address to state
enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
serial_id,
)
# Add num of UEs connected
num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
enb_status_dict['ues_connected'] = num_ue_connected
serialized = json.dumps(enb_status_dict)
state = State(
type="single_enodeb",
deviceID=serial_id,
value=serialized.encode('utf-8'),
)
configured_serial_ids.append(serial_id)
states.append(state)
# Get state for externally configured enodebs
s1_states = get_enb_s1_connected_states(
enb_statuses,
configured_serial_ids,
mconfig,
)
states.extend(s1_states)
return states
def get_enb_s1_connected_states(
enb_s1_state_map, configured_serial_ids,
mconfig,
) -> List[State]:
states = []
for enb_id in enb_s1_state_map:
enb = find_enb_by_cell_id(mconfig, enb_id)
if enb and enb.serial_num not in configured_serial_ids:
status = EnodebStatus(
enodeb_configured=False,
gps_latitude='N/A',
gps_longitude='N/A',
enodeb_connected=True,
opstate_enabled=False,
rf_tx_on=False,
rf_tx_desired=False,
gps_connected=False,
ptp_connected=False,
mme_connected=True,
fsm_state='N/A',
cell_id=enb_id,
)
status_dict = status._asdict()
# Add IP address to state
status_dict['ip_address'] = enb.config.ip_address
# Add num of UEs connected to state, use cellID from mconfig
status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)
serialized = json.dumps(status_dict)
state = State(
type="single_enodeb",
deviceID=enb.serial_num,
value=serialized.encode('utf-8'),
)
states.append(state)
return states
def _empty_enb_status() -> SingleEnodebStatus:
enb_status = SingleEnodebStatus()
enb_status.device_serial = 'N/A'
enb_status.ip_address = 'N/A'
enb_status.connected = '0'
enb_status.configured = '0'
enb_status.opstate_enabled = '0'
enb_status.rf_tx_on = '0'
enb_status.rf_tx_desired = 'N/A'
enb_status.gps_connected = '0'
enb_status.ptp_connected = '0'
enb_status.mme_connected = '0'
enb_status.gps_longitude = '0.0'
enb_status.gps_latitude = '0.0'
enb_status.fsm_state = 'N/A'
return enb_status
def _parse_param_as_bool(
enodeb: EnodebAcsStateMachine,
param_name: ParameterName,
) -> bool:
try:
return _format_as_bool(enodeb.get_parameter(param_name), param_name)
except (KeyError, ConfigurationError):
return False
def _format_as_bool(
param_value: Union[bool, str, int],
param_name: Optional[Union[ParameterName, str]] = None,
) -> bool:
""" Returns '1' for true, and '0' for false """
stripped_value = str(param_value).lower().strip()
if stripped_value in {'true', '1', 'enabled'}:
return True
elif stripped_value in {'false', '0', 'disabled', 'InProgress'}:
return False
else:
logger.warning(
'%s parameter not understood (%s)', param_name, param_value,
)
return False
def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
try:
if not enodeb.has_parameter(ParameterName.GPS_STATUS):
return False
else:
param = enodeb.get_parameter(ParameterName.GPS_STATUS)
if isinstance(param, bool):
# No translation to do.
return param
stripped_value = param.lower().strip()
if stripped_value in ['0', '2', 'inprogress']:
# 2 = GPS locking
return False
elif stripped_value == '1':
return True
else:
logger.warning(
'GPS status parameter not understood (%s)', param,
)
return False
except (KeyError, ConfigurationError, AttributeError):
return False
def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
str, str,
]:
"""
Read the GPS coordinates of the enB from its configuration or the
cached coordinate file if the preceding read fails. If reading from
enB configuration succeeds, this method will cache the new coordinates.
Returns:
(str, str): GPS latitude, GPS longitude
"""
lat, lon = '', ''
try:
lat = enodeb.get_parameter(ParameterName.GPS_LAT)
lon = enodeb.get_parameter(ParameterName.GPS_LONG)
if lat != _gps_lat_cached or lon != _gps_lon_cached:
_cache_new_gps_coords(lat, lon)
return lat, lon
except (KeyError, ConfigurationError):
return _get_cached_gps_coords()
except ValueError:
logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
return '0', '0'
def _get_cached_gps_coords() -> Tuple[str, str]:
"""
Returns cached GPS coordinates if enB is disconnected or otherwise not
reporting coordinates.
Returns:
(str, str): (GPS lat, GPS lon)
"""
# pylint: disable=global-statement
global _gps_lat_cached, _gps_lon_cached
if _gps_lat_cached is None or _gps_lon_cached is None:
_gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
return _gps_lat_cached, _gps_lon_cached
def _read_gps_coords_from_file():
try:
with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
lines = f.readlines()
if len(lines) != 2:
logger.warning(
'Expected to find 2 lines in GPS '
'coordinate file but only found %d',
len(lines),
)
return '0', '0'
return tuple(map(lambda l: l.strip(), lines))
except OSError:
logger.warning('Could not open cached GPS coordinate file')
return '0', '0'
def _cache_new_gps_coords(gps_lat, gps_lon):
"""
Cache GPS coordinates in the module-level variables here and write them
to a managed file on disk.
Args:
gps_lat (str): latitude as a string
gps_lon (str): longitude as a string
"""
# pylint: disable=global-statement
global _gps_lat_cached, _gps_lon_cached
_gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
_write_gps_coords_to_file(gps_lat, gps_lon)
def _write_gps_coords_to_file(gps_lat, gps_lon):
lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
try:
serialization_utils.write_to_file_atomically(
CACHED_GPS_COORD_FILE_PATH,
lines,
)
except OSError:
pass
def _bool_to_str(b: bool) -> str:
if b is True:
return "1"
return "0"