Init commit for standalone enodebd

Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/enodeb_status.py b/enodeb_status.py
new file mode 100644
index 0000000..442061b
--- /dev/null
+++ b/enodeb_status.py
@@ -0,0 +1,602 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import os
+from collections import namedtuple
+from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
+
+from lte.protos.enodebd_pb2 import SingleEnodebStatus
+from lte.protos.mconfig import mconfigs_pb2
+from common import serialization_utils
+import metrics
+from data_models.data_model_parameters import ParameterName
+from device_config.configuration_util import (
+    find_enb_by_cell_id,
+    get_enb_rf_tx_desired,
+)
+from exceptions import ConfigurationError
+from logger import EnodebdLogger as logger
+from s1ap_client import get_all_enb_state
+from state_machines.enb_acs import EnodebAcsStateMachine
+from state_machines.enb_acs_manager import StateMachineManager
+from orc8r.protos.service303_pb2 import State
+
+# There are 2 levels of caching for GPS coordinates from the enodeB: module
+# variables (in-memory) and on disk. In the event the enodeB stops reporting
+# GPS, we will continue to report the cached coordinates from the in-memory
+# cached coordinates. If enodebd is restarted, this in-memory cache will be
+# populated by the file
+
+CACHED_GPS_COORD_FILE_PATH = os.path.join(
+    '/var/opt/magma/enodebd',
+    'gps_coords.txt',
+)
+
+# Cache GPS coordinates in memory so we don't write to the file cache if the
+# coordinates have not changed. We can read directly from here instead of the
+# file cache when the enodeB goes down unless these are unintialized.
+_gps_lat_cached = None
+_gps_lon_cached = None
+
+EnodebStatus = NamedTuple(
+    'EnodebStatus',
+    [
+        ('enodeb_configured', bool),
+        ('gps_latitude', str),
+        ('gps_longitude', str),
+        ('enodeb_connected', bool),
+        ('opstate_enabled', bool),
+        ('rf_tx_on', bool),
+        ('rf_tx_desired', bool),
+        ('gps_connected', bool),
+        ('ptp_connected', bool),
+        ('mme_connected', bool),
+        ('fsm_state', str),
+        ('cell_id', int),
+    ],
+)
+
+# TODO: Remove after checkins support multiple eNB status
+MagmaOldEnodebdStatus = namedtuple(
+    'MagmaOldEnodebdStatus',
+    [
+        'enodeb_serial',
+        'enodeb_configured',
+        'gps_latitude',
+        'gps_longitude',
+        'enodeb_connected',
+        'opstate_enabled',
+        'rf_tx_on',
+        'rf_tx_desired',
+        'gps_connected',
+        'ptp_connected',
+        'mme_connected',
+        'enodeb_state',
+    ],
+)
+
+MagmaEnodebdStatus = NamedTuple(
+    'MagmaEnodebdStatus',
+    [
+        ('n_enodeb_connected', str),
+        ('all_enodeb_configured', str),
+        ('all_enodeb_opstate_enabled', str),
+        ('all_enodeb_rf_tx_configured', str),
+        ('any_enodeb_gps_connected', str),
+        ('all_enodeb_ptp_connected', str),
+        ('all_enodeb_mme_connected', str),
+        ('gateway_gps_longitude', str),
+        ('gateway_gps_latitude', str),
+    ],
+)
+
+
+def update_status_metrics(status: EnodebStatus) -> None:
+    """ Update metrics for eNodeB status """
+    # Call every second
+    metrics_by_stat_key = {
+        'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
+        'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
+        'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
+        'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
+        'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
+        'gps_connected': metrics.STAT_GPS_CONNECTED,
+        'ptp_connected': metrics.STAT_PTP_CONNECTED,
+        'mme_connected': metrics.STAT_MME_CONNECTED,
+    }
+
+    def get_metric_value(enodeb_status: Dict[str, str], key: str):
+        # Metrics are "sticky" when synced to the cloud - if we don't
+        # receive a status update from enodeb, set the metric to 0
+        # to explicitly indicate that it was not received, otherwise the
+        # metrics collector will continue to report the last value
+        val = enodeb_status.get(key, None)
+        if val is None:
+            return 0
+        if type(val) is not bool:
+            logger.error('Could not cast metric value %s to int', val)
+            return 0
+        return int(val)  # val should be either True or False
+
+    for stat_key, metric in metrics_by_stat_key.items():
+        metric.set(get_metric_value(status._asdict(), stat_key))
+
+
+# TODO: Remove after checkins support multiple eNB status
+def get_service_status_old(
+        enb_acs_manager: StateMachineManager,
+) -> Dict[str, Any]:
+    """ Get service status compatible with older controller """
+    enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+    # Since we only expect users to plug in a single eNB, generate service
+    # status with the first one we find that is connected
+    for enb_serial, enb_status in enb_status_by_serial.items():
+        if enb_status.enodeb_connected:
+            return MagmaOldEnodebdStatus(
+                enodeb_serial=enb_serial,
+                enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
+                gps_latitude=enb_status.gps_latitude,
+                gps_longitude=enb_status.gps_longitude,
+                enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
+                opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
+                rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
+                rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
+                gps_connected=_bool_to_str(enb_status.gps_connected),
+                ptp_connected=_bool_to_str(enb_status.ptp_connected),
+                mme_connected=_bool_to_str(enb_status.mme_connected),
+                enodeb_state=enb_status.fsm_state,
+            )._asdict()
+    return MagmaOldEnodebdStatus(
+        enodeb_serial='N/A',
+        enodeb_configured='0',
+        gps_latitude='0.0',
+        gps_longitude='0.0',
+        enodeb_connected='0',
+        opstate_enabled='0',
+        rf_tx_on='0',
+        rf_tx_desired='N/A',
+        gps_connected='0',
+        ptp_connected='0',
+        mme_connected='0',
+        enodeb_state='N/A',
+    )._asdict()
+
+
+def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
+    enodebd_status = _get_enodebd_status(enb_acs_manager)
+    return enodebd_status._asdict()
+
+
+def _get_enodebd_status(
+        enb_acs_manager: StateMachineManager,
+) -> MagmaEnodebdStatus:
+    enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+    # Start from default values for enodebd status
+    n_enodeb_connected = 0
+    all_enodeb_configured = False
+    all_enodeb_opstate_enabled = False
+    all_enodeb_rf_tx_configured = False
+    any_enodeb_gps_connected = False
+    all_enodeb_ptp_connected = False
+    all_enodeb_mme_connected = False
+    gateway_gps_longitude = '0.0'
+    gateway_gps_latitude = '0.0'
+
+    def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
+        return enb_status.rf_tx_on == enb_status.rf_tx_desired
+
+    if enb_status_by_serial:
+        enb_status_list = list(enb_status_by_serial.values())
+        # Aggregate all eNB status for enodebd status, repetitive but
+        # clearer for output purposes.
+        n_enodeb_connected = sum(
+            enb_status.enodeb_connected for enb_status in enb_status_list
+        )
+        all_enodeb_configured = all(
+            enb_status.enodeb_configured for enb_status in enb_status_list
+        )
+        all_enodeb_mme_connected = all(
+            enb_status.mme_connected for enb_status in enb_status_list
+        )
+        all_enodeb_opstate_enabled = all(
+            enb_status.opstate_enabled for enb_status in enb_status_list
+        )
+        all_enodeb_ptp_connected = all(
+            enb_status.ptp_connected for enb_status in enb_status_list
+        )
+        any_enodeb_gps_connected = any(
+            enb_status.gps_connected for enb_status in enb_status_list
+        )
+        all_enodeb_rf_tx_configured = all(
+            _is_rf_tx_configured(enb_status) for enb_status in enb_status_list
+        )
+        if n_enodeb_connected:
+            gateway_gps_longitude = enb_status_list[0].gps_longitude
+            gateway_gps_latitude = enb_status_list[0].gps_latitude
+
+    return MagmaEnodebdStatus(
+        n_enodeb_connected=str(n_enodeb_connected),
+        all_enodeb_configured=str(all_enodeb_configured),
+        all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
+        all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
+        any_enodeb_gps_connected=str(any_enodeb_gps_connected),
+        all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
+        all_enodeb_mme_connected=str(all_enodeb_mme_connected),
+        gateway_gps_longitude=str(gateway_gps_longitude),
+        gateway_gps_latitude=str(gateway_gps_latitude),
+    )
+
+
+def get_all_enb_status(
+        enb_acs_manager: StateMachineManager,
+) -> Dict[str, EnodebStatus]:
+    enb_status_by_serial = {}
+    serial_list = enb_acs_manager.get_connected_serial_id_list()
+    for enb_serial in serial_list:
+        handler = enb_acs_manager.get_handler_by_serial(enb_serial)
+        status = get_enb_status(handler)
+        enb_status_by_serial[enb_serial] = status
+
+    return enb_status_by_serial
+
+
+def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
+    """
+    Returns a dict representing the status of an enodeb
+
+    The returned dictionary will be a subset of the following keys:
+        - enodeb_connected
+        - enodeb_configured
+        - opstate_enabled
+        - rf_tx_on
+        - rf_tx_desired
+        - gps_connected
+        - ptp_connected
+        - mme_connected
+        - gps_latitude
+        - gps_longitude
+        - ip_address
+        - cell_id
+
+    The set of keys returned will depend on the connection status of the
+    enodeb. A missing key indicates that the value is unknown.
+
+    Returns:
+        Status dictionary for the enodeb state
+    """
+    enodeb_configured = enodeb.is_enodeb_configured()
+
+    # We cache GPS coordinates so try to read them before the early return
+    # if the enB is not connected
+    gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)
+
+    enodeb_connected = enodeb.is_enodeb_connected()
+    opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
+    rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
+    rf_tx_on = rf_tx_on and enodeb_connected
+    try:
+        enb_serial = \
+            enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
+        enb_cell_id = int(
+            enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
+        )
+        rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
+    except (KeyError, ConfigurationError):
+        rf_tx_desired = False
+        enb_cell_id = 0
+
+    mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
+    gps_connected = _get_gps_status_as_bool(enodeb)
+    try:
+        ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
+    except ConfigurationError:
+        ptp_connected = False
+
+    return EnodebStatus(
+        enodeb_configured=enodeb_configured,
+        gps_latitude=gps_lat,
+        gps_longitude=gps_lon,
+        enodeb_connected=enodeb_connected,
+        opstate_enabled=opstate_enabled,
+        rf_tx_on=rf_tx_on,
+        rf_tx_desired=rf_tx_desired,
+        gps_connected=gps_connected,
+        ptp_connected=ptp_connected,
+        mme_connected=mme_connected,
+        fsm_state=enodeb.get_state(),
+        cell_id=enb_cell_id,
+    )
+
+
+def get_single_enb_status(
+        device_serial: str,
+        state_machine_manager: StateMachineManager,
+) -> SingleEnodebStatus:
+    try:
+        handler = state_machine_manager.get_handler_by_serial(device_serial)
+    except KeyError:
+        return _empty_enb_status()
+
+    # This namedtuple is missing IP and serial info
+    status = get_enb_status(handler)
+
+    # Get IP info
+    ip = state_machine_manager.get_ip_of_serial(device_serial)
+
+    def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
+        if status:
+            return SingleEnodebStatus.StatusProperty.Value('ON')
+        return SingleEnodebStatus.StatusProperty.Value('OFF')
+
+    # Build the message to return through gRPC
+    enb_status = SingleEnodebStatus()
+    enb_status.device_serial = device_serial
+    enb_status.ip_address = ip
+    enb_status.connected = get_status_property(status.enodeb_connected)
+    enb_status.configured = get_status_property(status.enodeb_configured)
+    enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
+    enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
+    enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
+    enb_status.gps_connected = get_status_property(status.gps_connected)
+    enb_status.ptp_connected = get_status_property(status.ptp_connected)
+    enb_status.mme_connected = get_status_property(status.mme_connected)
+    enb_status.gps_longitude = status.gps_longitude
+    enb_status.gps_latitude = status.gps_latitude
+    enb_status.fsm_state = status.fsm_state
+    return enb_status
+
+
+def get_operational_states(
+    enb_acs_manager: StateMachineManager,
+    mconfig: mconfigs_pb2.EnodebD,
+) -> List[State]:
+    """
+    Returns: A list of State with EnodebStatus encoded as JSON
+    """
+    states = []
+    configured_serial_ids = []
+    enb_status_by_serial = get_all_enb_status(enb_acs_manager)
+
+    # Get S1 connected eNBs
+    enb_statuses = get_all_enb_state()
+
+    for serial_id in enb_status_by_serial:
+        enb_status_dict = enb_status_by_serial[serial_id]._asdict()
+
+        # Add IP address to state
+        enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
+            serial_id,
+        )
+
+        # Add num of UEs connected
+        num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
+        enb_status_dict['ues_connected'] = num_ue_connected
+
+        serialized = json.dumps(enb_status_dict)
+        state = State(
+            type="single_enodeb",
+            deviceID=serial_id,
+            value=serialized.encode('utf-8'),
+        )
+        configured_serial_ids.append(serial_id)
+        states.append(state)
+
+    # Get state for externally configured enodebs
+    s1_states = get_enb_s1_connected_states(
+        enb_statuses,
+        configured_serial_ids,
+        mconfig,
+    )
+    states.extend(s1_states)
+
+    return states
+
+
+def get_enb_s1_connected_states(
+    enb_s1_state_map, configured_serial_ids,
+    mconfig,
+) -> List[State]:
+    states = []
+    for enb_id in enb_s1_state_map:
+        enb = find_enb_by_cell_id(mconfig, enb_id)
+        if enb and enb.serial_num not in configured_serial_ids:
+            status = EnodebStatus(
+                enodeb_configured=False,
+                gps_latitude='N/A',
+                gps_longitude='N/A',
+                enodeb_connected=True,
+                opstate_enabled=False,
+                rf_tx_on=False,
+                rf_tx_desired=False,
+                gps_connected=False,
+                ptp_connected=False,
+                mme_connected=True,
+                fsm_state='N/A',
+                cell_id=enb_id,
+            )
+            status_dict = status._asdict()
+
+            # Add IP address to state
+            status_dict['ip_address'] = enb.config.ip_address
+
+            # Add num of UEs connected to state, use cellID from mconfig
+            status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)
+
+            serialized = json.dumps(status_dict)
+            state = State(
+                type="single_enodeb",
+                deviceID=enb.serial_num,
+                value=serialized.encode('utf-8'),
+            )
+            states.append(state)
+    return states
+
+
+def _empty_enb_status() -> SingleEnodebStatus:
+    enb_status = SingleEnodebStatus()
+    enb_status.device_serial = 'N/A'
+    enb_status.ip_address = 'N/A'
+    enb_status.connected = '0'
+    enb_status.configured = '0'
+    enb_status.opstate_enabled = '0'
+    enb_status.rf_tx_on = '0'
+    enb_status.rf_tx_desired = 'N/A'
+    enb_status.gps_connected = '0'
+    enb_status.ptp_connected = '0'
+    enb_status.mme_connected = '0'
+    enb_status.gps_longitude = '0.0'
+    enb_status.gps_latitude = '0.0'
+    enb_status.fsm_state = 'N/A'
+    return enb_status
+
+
+def _parse_param_as_bool(
+        enodeb: EnodebAcsStateMachine,
+        param_name: ParameterName,
+) -> bool:
+    try:
+        return _format_as_bool(enodeb.get_parameter(param_name), param_name)
+    except (KeyError, ConfigurationError):
+        return False
+
+
+def _format_as_bool(
+        param_value: Union[bool, str, int],
+        param_name: Optional[Union[ParameterName, str]] = None,
+) -> bool:
+    """ Returns '1' for true, and '0' for false """
+    stripped_value = str(param_value).lower().strip()
+    if stripped_value in {'true', '1'}:
+        return True
+    elif stripped_value in {'false', '0'}:
+        return False
+    else:
+        logger.warning(
+            '%s parameter not understood (%s)', param_name, param_value,
+        )
+        return False
+
+
+def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
+    try:
+        if not enodeb.has_parameter(ParameterName.GPS_STATUS):
+            return False
+        else:
+            param = enodeb.get_parameter(ParameterName.GPS_STATUS)
+            if isinstance(param, bool):
+                # No translation to do.
+                return param
+            stripped_value = param.lower().strip()
+            if stripped_value == '0' or stripped_value == '2':
+                # 2 = GPS locking
+                return False
+            elif stripped_value == '1':
+                return True
+            else:
+                logger.warning(
+                    'GPS status parameter not understood (%s)', param,
+                )
+                return False
+    except (KeyError, ConfigurationError):
+        return False
+
+
+def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
+    str, str,
+]:
+    """
+    Read the GPS coordinates of the enB from its configuration or the
+    cached coordinate file if the preceding read fails. If reading from
+    enB configuration succeeds, this method will cache the new coordinates.
+
+    Returns:
+        (str, str): GPS latitude, GPS longitude
+    """
+    lat, lon = '', ''
+    try:
+        lat = enodeb.get_parameter(ParameterName.GPS_LAT)
+        lon = enodeb.get_parameter(ParameterName.GPS_LONG)
+
+        if lat != _gps_lat_cached or lon != _gps_lon_cached:
+            _cache_new_gps_coords(lat, lon)
+        return lat, lon
+    except (KeyError, ConfigurationError):
+        return _get_cached_gps_coords()
+    except ValueError:
+        logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
+        return '0', '0'
+
+
+def _get_cached_gps_coords() -> Tuple[str, str]:
+    """
+    Returns cached GPS coordinates if enB is disconnected or otherwise not
+    reporting coordinates.
+
+    Returns:
+        (str, str): (GPS lat, GPS lon)
+    """
+    # pylint: disable=global-statement
+    global _gps_lat_cached, _gps_lon_cached
+    if _gps_lat_cached is None or _gps_lon_cached is None:
+        _gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
+    return _gps_lat_cached, _gps_lon_cached
+
+
+def _read_gps_coords_from_file():
+    try:
+        with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
+            lines = f.readlines()
+            if len(lines) != 2:
+                logger.warning(
+                    'Expected to find 2 lines in GPS '
+                    'coordinate file but only found %d',
+                    len(lines),
+                )
+                return '0', '0'
+            return tuple(map(lambda l: l.strip(), lines))
+    except OSError:
+        logger.warning('Could not open cached GPS coordinate file')
+        return '0', '0'
+
+
+def _cache_new_gps_coords(gps_lat, gps_lon):
+    """
+    Cache GPS coordinates in the module-level variables here and write them
+    to a managed file on disk.
+
+    Args:
+        gps_lat (str): latitude as a string
+        gps_lon (str): longitude as a string
+    """
+    # pylint: disable=global-statement
+    global _gps_lat_cached, _gps_lon_cached
+    _gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
+    _write_gps_coords_to_file(gps_lat, gps_lon)
+
+
+def _write_gps_coords_to_file(gps_lat, gps_lon):
+    lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
+    try:
+        serialization_utils.write_to_file_atomically(
+            CACHED_GPS_COORD_FILE_PATH,
+            lines,
+        )
+    except OSError:
+        pass
+
+
+def _bool_to_str(b: bool) -> str:
+    if b is True:
+        return "1"
+    return "0"