Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/enodeb_status.py b/enodeb_status.py
new file mode 100644
index 0000000..442061b
--- /dev/null
+++ b/enodeb_status.py
@@ -0,0 +1,602 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import os
+from collections import namedtuple
+from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
+
+from lte.protos.enodebd_pb2 import SingleEnodebStatus
+from lte.protos.mconfig import mconfigs_pb2
+from common import serialization_utils
+import metrics
+from data_models.data_model_parameters import ParameterName
+from device_config.configuration_util import (
+ find_enb_by_cell_id,
+ get_enb_rf_tx_desired,
+)
+from exceptions import ConfigurationError
+from logger import EnodebdLogger as logger
+from s1ap_client import get_all_enb_state
+from state_machines.enb_acs import EnodebAcsStateMachine
+from state_machines.enb_acs_manager import StateMachineManager
+from orc8r.protos.service303_pb2 import State
+
+# There are 2 levels of caching for GPS coordinates from the enodeB: module
+# variables (in-memory) and on disk. In the event the enodeB stops reporting
+# GPS, we will continue to report the cached coordinates from the in-memory
+# cached coordinates. If enodebd is restarted, this in-memory cache will be
+# populated by the file
+
+CACHED_GPS_COORD_FILE_PATH = os.path.join(
+ '/var/opt/magma/enodebd',
+ 'gps_coords.txt',
+)
+
+# Cache GPS coordinates in memory so we don't write to the file cache if the
+# coordinates have not changed. We can read directly from here instead of the
+# file cache when the enodeB goes down unless these are unintialized.
+_gps_lat_cached = None
+_gps_lon_cached = None
+
+EnodebStatus = NamedTuple(
+ 'EnodebStatus',
+ [
+ ('enodeb_configured', bool),
+ ('gps_latitude', str),
+ ('gps_longitude', str),
+ ('enodeb_connected', bool),
+ ('opstate_enabled', bool),
+ ('rf_tx_on', bool),
+ ('rf_tx_desired', bool),
+ ('gps_connected', bool),
+ ('ptp_connected', bool),
+ ('mme_connected', bool),
+ ('fsm_state', str),
+ ('cell_id', int),
+ ],
+)
+
+# TODO: Remove after checkins support multiple eNB status
+MagmaOldEnodebdStatus = namedtuple(
+ 'MagmaOldEnodebdStatus',
+ [
+ 'enodeb_serial',
+ 'enodeb_configured',
+ 'gps_latitude',
+ 'gps_longitude',
+ 'enodeb_connected',
+ 'opstate_enabled',
+ 'rf_tx_on',
+ 'rf_tx_desired',
+ 'gps_connected',
+ 'ptp_connected',
+ 'mme_connected',
+ 'enodeb_state',
+ ],
+)
+
+MagmaEnodebdStatus = NamedTuple(
+ 'MagmaEnodebdStatus',
+ [
+ ('n_enodeb_connected', str),
+ ('all_enodeb_configured', str),
+ ('all_enodeb_opstate_enabled', str),
+ ('all_enodeb_rf_tx_configured', str),
+ ('any_enodeb_gps_connected', str),
+ ('all_enodeb_ptp_connected', str),
+ ('all_enodeb_mme_connected', str),
+ ('gateway_gps_longitude', str),
+ ('gateway_gps_latitude', str),
+ ],
+)
+
+
+def update_status_metrics(status: EnodebStatus) -> None:
+ """ Update metrics for eNodeB status """
+ # Call every second
+ metrics_by_stat_key = {
+ 'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
+ 'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
+ 'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
+ 'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
+ 'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
+ 'gps_connected': metrics.STAT_GPS_CONNECTED,
+ 'ptp_connected': metrics.STAT_PTP_CONNECTED,
+ 'mme_connected': metrics.STAT_MME_CONNECTED,
+ }
+
+ def get_metric_value(enodeb_status: Dict[str, str], key: str):
+ # Metrics are "sticky" when synced to the cloud - if we don't
+ # receive a status update from enodeb, set the metric to 0
+ # to explicitly indicate that it was not received, otherwise the
+ # metrics collector will continue to report the last value
+ val = enodeb_status.get(key, None)
+ if val is None:
+ return 0
+ if type(val) is not bool:
+ logger.error('Could not cast metric value %s to int', val)
+ return 0
+ return int(val) # val should be either True or False
+
+ for stat_key, metric in metrics_by_stat_key.items():
+ metric.set(get_metric_value(status._asdict(), stat_key))
+
+
+# TODO: Remove after checkins support multiple eNB status
+def get_service_status_old(
+ enb_acs_manager: StateMachineManager,
+) -> Dict[str, Any]:
+ """ Get service status compatible with older controller """
+ enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+ # Since we only expect users to plug in a single eNB, generate service
+ # status with the first one we find that is connected
+ for enb_serial, enb_status in enb_status_by_serial.items():
+ if enb_status.enodeb_connected:
+ return MagmaOldEnodebdStatus(
+ enodeb_serial=enb_serial,
+ enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
+ gps_latitude=enb_status.gps_latitude,
+ gps_longitude=enb_status.gps_longitude,
+ enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
+ opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
+ rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
+ rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
+ gps_connected=_bool_to_str(enb_status.gps_connected),
+ ptp_connected=_bool_to_str(enb_status.ptp_connected),
+ mme_connected=_bool_to_str(enb_status.mme_connected),
+ enodeb_state=enb_status.fsm_state,
+ )._asdict()
+ return MagmaOldEnodebdStatus(
+ enodeb_serial='N/A',
+ enodeb_configured='0',
+ gps_latitude='0.0',
+ gps_longitude='0.0',
+ enodeb_connected='0',
+ opstate_enabled='0',
+ rf_tx_on='0',
+ rf_tx_desired='N/A',
+ gps_connected='0',
+ ptp_connected='0',
+ mme_connected='0',
+ enodeb_state='N/A',
+ )._asdict()
+
+
+def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
+ enodebd_status = _get_enodebd_status(enb_acs_manager)
+ return enodebd_status._asdict()
+
+
+def _get_enodebd_status(
+ enb_acs_manager: StateMachineManager,
+) -> MagmaEnodebdStatus:
+ enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+ # Start from default values for enodebd status
+ n_enodeb_connected = 0
+ all_enodeb_configured = False
+ all_enodeb_opstate_enabled = False
+ all_enodeb_rf_tx_configured = False
+ any_enodeb_gps_connected = False
+ all_enodeb_ptp_connected = False
+ all_enodeb_mme_connected = False
+ gateway_gps_longitude = '0.0'
+ gateway_gps_latitude = '0.0'
+
+ def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
+ return enb_status.rf_tx_on == enb_status.rf_tx_desired
+
+ if enb_status_by_serial:
+ enb_status_list = list(enb_status_by_serial.values())
+ # Aggregate all eNB status for enodebd status, repetitive but
+ # clearer for output purposes.
+ n_enodeb_connected = sum(
+ enb_status.enodeb_connected for enb_status in enb_status_list
+ )
+ all_enodeb_configured = all(
+ enb_status.enodeb_configured for enb_status in enb_status_list
+ )
+ all_enodeb_mme_connected = all(
+ enb_status.mme_connected for enb_status in enb_status_list
+ )
+ all_enodeb_opstate_enabled = all(
+ enb_status.opstate_enabled for enb_status in enb_status_list
+ )
+ all_enodeb_ptp_connected = all(
+ enb_status.ptp_connected for enb_status in enb_status_list
+ )
+ any_enodeb_gps_connected = any(
+ enb_status.gps_connected for enb_status in enb_status_list
+ )
+ all_enodeb_rf_tx_configured = all(
+ _is_rf_tx_configured(enb_status) for enb_status in enb_status_list
+ )
+ if n_enodeb_connected:
+ gateway_gps_longitude = enb_status_list[0].gps_longitude
+ gateway_gps_latitude = enb_status_list[0].gps_latitude
+
+ return MagmaEnodebdStatus(
+ n_enodeb_connected=str(n_enodeb_connected),
+ all_enodeb_configured=str(all_enodeb_configured),
+ all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
+ all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
+ any_enodeb_gps_connected=str(any_enodeb_gps_connected),
+ all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
+ all_enodeb_mme_connected=str(all_enodeb_mme_connected),
+ gateway_gps_longitude=str(gateway_gps_longitude),
+ gateway_gps_latitude=str(gateway_gps_latitude),
+ )
+
+
+def get_all_enb_status(
+ enb_acs_manager: StateMachineManager,
+) -> Dict[str, EnodebStatus]:
+ enb_status_by_serial = {}
+ serial_list = enb_acs_manager.get_connected_serial_id_list()
+ for enb_serial in serial_list:
+ handler = enb_acs_manager.get_handler_by_serial(enb_serial)
+ status = get_enb_status(handler)
+ enb_status_by_serial[enb_serial] = status
+
+ return enb_status_by_serial
+
+
+def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
+ """
+ Returns a dict representing the status of an enodeb
+
+ The returned dictionary will be a subset of the following keys:
+ - enodeb_connected
+ - enodeb_configured
+ - opstate_enabled
+ - rf_tx_on
+ - rf_tx_desired
+ - gps_connected
+ - ptp_connected
+ - mme_connected
+ - gps_latitude
+ - gps_longitude
+ - ip_address
+ - cell_id
+
+ The set of keys returned will depend on the connection status of the
+ enodeb. A missing key indicates that the value is unknown.
+
+ Returns:
+ Status dictionary for the enodeb state
+ """
+ enodeb_configured = enodeb.is_enodeb_configured()
+
+ # We cache GPS coordinates so try to read them before the early return
+ # if the enB is not connected
+ gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)
+
+ enodeb_connected = enodeb.is_enodeb_connected()
+ opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
+ rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
+ rf_tx_on = rf_tx_on and enodeb_connected
+ try:
+ enb_serial = \
+ enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
+ enb_cell_id = int(
+ enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
+ )
+ rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
+ except (KeyError, ConfigurationError):
+ rf_tx_desired = False
+ enb_cell_id = 0
+
+ mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
+ gps_connected = _get_gps_status_as_bool(enodeb)
+ try:
+ ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
+ except ConfigurationError:
+ ptp_connected = False
+
+ return EnodebStatus(
+ enodeb_configured=enodeb_configured,
+ gps_latitude=gps_lat,
+ gps_longitude=gps_lon,
+ enodeb_connected=enodeb_connected,
+ opstate_enabled=opstate_enabled,
+ rf_tx_on=rf_tx_on,
+ rf_tx_desired=rf_tx_desired,
+ gps_connected=gps_connected,
+ ptp_connected=ptp_connected,
+ mme_connected=mme_connected,
+ fsm_state=enodeb.get_state(),
+ cell_id=enb_cell_id,
+ )
+
+
+def get_single_enb_status(
+ device_serial: str,
+ state_machine_manager: StateMachineManager,
+) -> SingleEnodebStatus:
+ try:
+ handler = state_machine_manager.get_handler_by_serial(device_serial)
+ except KeyError:
+ return _empty_enb_status()
+
+ # This namedtuple is missing IP and serial info
+ status = get_enb_status(handler)
+
+ # Get IP info
+ ip = state_machine_manager.get_ip_of_serial(device_serial)
+
+ def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
+ if status:
+ return SingleEnodebStatus.StatusProperty.Value('ON')
+ return SingleEnodebStatus.StatusProperty.Value('OFF')
+
+ # Build the message to return through gRPC
+ enb_status = SingleEnodebStatus()
+ enb_status.device_serial = device_serial
+ enb_status.ip_address = ip
+ enb_status.connected = get_status_property(status.enodeb_connected)
+ enb_status.configured = get_status_property(status.enodeb_configured)
+ enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
+ enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
+ enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
+ enb_status.gps_connected = get_status_property(status.gps_connected)
+ enb_status.ptp_connected = get_status_property(status.ptp_connected)
+ enb_status.mme_connected = get_status_property(status.mme_connected)
+ enb_status.gps_longitude = status.gps_longitude
+ enb_status.gps_latitude = status.gps_latitude
+ enb_status.fsm_state = status.fsm_state
+ return enb_status
+
+
+def get_operational_states(
+ enb_acs_manager: StateMachineManager,
+ mconfig: mconfigs_pb2.EnodebD,
+) -> List[State]:
+ """
+ Returns: A list of State with EnodebStatus encoded as JSON
+ """
+ states = []
+ configured_serial_ids = []
+ enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+
+ # Get S1 connected eNBs
+ enb_statuses = get_all_enb_state()
+
+ for serial_id in enb_status_by_serial:
+ enb_status_dict = enb_status_by_serial[serial_id]._asdict()
+
+ # Add IP address to state
+ enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
+ serial_id,
+ )
+
+ # Add num of UEs connected
+ num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
+ enb_status_dict['ues_connected'] = num_ue_connected
+
+ serialized = json.dumps(enb_status_dict)
+ state = State(
+ type="single_enodeb",
+ deviceID=serial_id,
+ value=serialized.encode('utf-8'),
+ )
+ configured_serial_ids.append(serial_id)
+ states.append(state)
+
+ # Get state for externally configured enodebs
+ s1_states = get_enb_s1_connected_states(
+ enb_statuses,
+ configured_serial_ids,
+ mconfig,
+ )
+ states.extend(s1_states)
+
+ return states
+
+
+def get_enb_s1_connected_states(
+ enb_s1_state_map, configured_serial_ids,
+ mconfig,
+) -> List[State]:
+ states = []
+ for enb_id in enb_s1_state_map:
+ enb = find_enb_by_cell_id(mconfig, enb_id)
+ if enb and enb.serial_num not in configured_serial_ids:
+ status = EnodebStatus(
+ enodeb_configured=False,
+ gps_latitude='N/A',
+ gps_longitude='N/A',
+ enodeb_connected=True,
+ opstate_enabled=False,
+ rf_tx_on=False,
+ rf_tx_desired=False,
+ gps_connected=False,
+ ptp_connected=False,
+ mme_connected=True,
+ fsm_state='N/A',
+ cell_id=enb_id,
+ )
+ status_dict = status._asdict()
+
+ # Add IP address to state
+ status_dict['ip_address'] = enb.config.ip_address
+
+ # Add num of UEs connected to state, use cellID from mconfig
+ status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)
+
+ serialized = json.dumps(status_dict)
+ state = State(
+ type="single_enodeb",
+ deviceID=enb.serial_num,
+ value=serialized.encode('utf-8'),
+ )
+ states.append(state)
+ return states
+
+
+def _empty_enb_status() -> SingleEnodebStatus:
+ enb_status = SingleEnodebStatus()
+ enb_status.device_serial = 'N/A'
+ enb_status.ip_address = 'N/A'
+ enb_status.connected = '0'
+ enb_status.configured = '0'
+ enb_status.opstate_enabled = '0'
+ enb_status.rf_tx_on = '0'
+ enb_status.rf_tx_desired = 'N/A'
+ enb_status.gps_connected = '0'
+ enb_status.ptp_connected = '0'
+ enb_status.mme_connected = '0'
+ enb_status.gps_longitude = '0.0'
+ enb_status.gps_latitude = '0.0'
+ enb_status.fsm_state = 'N/A'
+ return enb_status
+
+
+def _parse_param_as_bool(
+ enodeb: EnodebAcsStateMachine,
+ param_name: ParameterName,
+) -> bool:
+ try:
+ return _format_as_bool(enodeb.get_parameter(param_name), param_name)
+ except (KeyError, ConfigurationError):
+ return False
+
+
+def _format_as_bool(
+ param_value: Union[bool, str, int],
+ param_name: Optional[Union[ParameterName, str]] = None,
+) -> bool:
+ """ Returns '1' for true, and '0' for false """
+ stripped_value = str(param_value).lower().strip()
+ if stripped_value in {'true', '1'}:
+ return True
+ elif stripped_value in {'false', '0'}:
+ return False
+ else:
+ logger.warning(
+ '%s parameter not understood (%s)', param_name, param_value,
+ )
+ return False
+
+
+def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
+ try:
+ if not enodeb.has_parameter(ParameterName.GPS_STATUS):
+ return False
+ else:
+ param = enodeb.get_parameter(ParameterName.GPS_STATUS)
+ if isinstance(param, bool):
+ # No translation to do.
+ return param
+ stripped_value = param.lower().strip()
+ if stripped_value == '0' or stripped_value == '2':
+ # 2 = GPS locking
+ return False
+ elif stripped_value == '1':
+ return True
+ else:
+ logger.warning(
+ 'GPS status parameter not understood (%s)', param,
+ )
+ return False
+ except (KeyError, ConfigurationError):
+ return False
+
+
+def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
+ str, str,
+]:
+ """
+ Read the GPS coordinates of the enB from its configuration or the
+ cached coordinate file if the preceding read fails. If reading from
+ enB configuration succeeds, this method will cache the new coordinates.
+
+ Returns:
+ (str, str): GPS latitude, GPS longitude
+ """
+ lat, lon = '', ''
+ try:
+ lat = enodeb.get_parameter(ParameterName.GPS_LAT)
+ lon = enodeb.get_parameter(ParameterName.GPS_LONG)
+
+ if lat != _gps_lat_cached or lon != _gps_lon_cached:
+ _cache_new_gps_coords(lat, lon)
+ return lat, lon
+ except (KeyError, ConfigurationError):
+ return _get_cached_gps_coords()
+ except ValueError:
+ logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
+ return '0', '0'
+
+
+def _get_cached_gps_coords() -> Tuple[str, str]:
+ """
+ Returns cached GPS coordinates if enB is disconnected or otherwise not
+ reporting coordinates.
+
+ Returns:
+ (str, str): (GPS lat, GPS lon)
+ """
+ # pylint: disable=global-statement
+ global _gps_lat_cached, _gps_lon_cached
+ if _gps_lat_cached is None or _gps_lon_cached is None:
+ _gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
+ return _gps_lat_cached, _gps_lon_cached
+
+
+def _read_gps_coords_from_file():
+ try:
+ with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
+ lines = f.readlines()
+ if len(lines) != 2:
+ logger.warning(
+ 'Expected to find 2 lines in GPS '
+ 'coordinate file but only found %d',
+ len(lines),
+ )
+ return '0', '0'
+ return tuple(map(lambda l: l.strip(), lines))
+ except OSError:
+ logger.warning('Could not open cached GPS coordinate file')
+ return '0', '0'
+
+
+def _cache_new_gps_coords(gps_lat, gps_lon):
+ """
+ Cache GPS coordinates in the module-level variables here and write them
+ to a managed file on disk.
+
+ Args:
+ gps_lat (str): latitude as a string
+ gps_lon (str): longitude as a string
+ """
+ # pylint: disable=global-statement
+ global _gps_lat_cached, _gps_lon_cached
+ _gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
+ _write_gps_coords_to_file(gps_lat, gps_lon)
+
+
+def _write_gps_coords_to_file(gps_lat, gps_lon):
+ lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
+ try:
+ serialization_utils.write_to_file_atomically(
+ CACHED_GPS_COORD_FILE_PATH,
+ lines,
+ )
+ except OSError:
+ pass
+
+
+def _bool_to_str(b: bool) -> str:
+ if b is True:
+ return "1"
+ return "0"