# SPDX-FileCopyrightText: 2020 The Magma Authors.
# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
#
# SPDX-License-Identifier: BSD-3-Clause

import json
import os
from collections import namedtuple
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union

from lte.protos.enodebd_pb2 import SingleEnodebStatus
from lte.protos.mconfig import mconfigs_pb2
from common import serialization_utils
import metrics
from data_models.data_model_parameters import ParameterName
from device_config.configuration_util import (
    find_enb_by_cell_id,
    get_enb_rf_tx_desired,
)
from exceptions import ConfigurationError
from logger import EnodebdLogger as logger
from s1ap_client import get_all_enb_state
from state_machines.enb_acs import EnodebAcsStateMachine
from state_machines.enb_acs_manager import StateMachineManager
from orc8r.protos.service303_pb2 import State

# There are 2 levels of caching for GPS coordinates from the enodeB: module
# variables (in-memory) and on disk. In the event the enodeB stops reporting
# GPS, we will continue to report the cached coordinates from the in-memory
# cached coordinates. If enodebd is restarted, this in-memory cache will be
# populated by the file

CACHED_GPS_COORD_FILE_PATH = os.path.join(
    '/var/opt/magma/enodebd',
    'gps_coords.txt',
)

# Cache GPS coordinates in memory so we don't write to the file cache if the
# coordinates have not changed. We can read directly from here instead of the
# file cache when the enodeB goes down unless these are unintialized.
_gps_lat_cached = None
_gps_lon_cached = None

EnodebStatus = NamedTuple(
    'EnodebStatus',
    [
        ('enodeb_configured', bool),
        ('gps_latitude', str),
        ('gps_longitude', str),
        ('enodeb_connected', bool),
        ('opstate_enabled', bool),
        ('rf_tx_on', bool),
        ('rf_tx_desired', bool),
        ('gps_connected', bool),
        ('ptp_connected', bool),
        ('mme_connected', bool),
        ('fsm_state', str),
        ('cell_id', int),
    ],
)

# TODO: Remove after checkins support multiple eNB status
MagmaOldEnodebdStatus = namedtuple(
    'MagmaOldEnodebdStatus',
    [
        'enodeb_serial',
        'enodeb_configured',
        'gps_latitude',
        'gps_longitude',
        'enodeb_connected',
        'opstate_enabled',
        'rf_tx_on',
        'rf_tx_desired',
        'gps_connected',
        'ptp_connected',
        'mme_connected',
        'enodeb_state',
    ],
)

MagmaEnodebdStatus = NamedTuple(
    'MagmaEnodebdStatus',
    [
        ('n_enodeb_connected', str),
        ('all_enodeb_configured', str),
        ('all_enodeb_opstate_enabled', str),
        ('all_enodeb_rf_tx_configured', str),
        ('any_enodeb_gps_connected', str),
        ('all_enodeb_ptp_connected', str),
        ('all_enodeb_mme_connected', str),
        ('gateway_gps_longitude', str),
        ('gateway_gps_latitude', str),
    ],
)


def update_status_metrics(status: EnodebStatus) -> None:
    """ Update metrics for eNodeB status """
    # Call every second
    metrics_by_stat_key = {
        'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
        'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
        'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
        'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
        'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
        'gps_connected': metrics.STAT_GPS_CONNECTED,
        'ptp_connected': metrics.STAT_PTP_CONNECTED,
        'mme_connected': metrics.STAT_MME_CONNECTED,
    }

    def get_metric_value(enodeb_status: Dict[str, str], key: str):
        # Metrics are "sticky" when synced to the cloud - if we don't
        # receive a status update from enodeb, set the metric to 0
        # to explicitly indicate that it was not received, otherwise the
        # metrics collector will continue to report the last value
        val = enodeb_status.get(key, None)
        if val is None:
            return 0
        if type(val) is not bool:
            logger.error('Could not cast metric value %s to int', val)
            return 0
        return int(val)  # val should be either True or False

    for stat_key, metric in metrics_by_stat_key.items():
        metric.set(get_metric_value(status._asdict(), stat_key))


# TODO: Remove after checkins support multiple eNB status
def get_service_status_old(
        enb_acs_manager: StateMachineManager,
) -> Dict[str, Any]:
    """ Get service status compatible with older controller """
    enb_status_by_serial = get_all_enb_status(enb_acs_manager)
    # Since we only expect users to plug in a single eNB, generate service
    # status with the first one we find that is connected
    for enb_serial, enb_status in enb_status_by_serial.items():
        if enb_status.enodeb_connected:
            return MagmaOldEnodebdStatus(
                enodeb_serial=enb_serial,
                enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
                gps_latitude=enb_status.gps_latitude,
                gps_longitude=enb_status.gps_longitude,
                enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
                opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
                rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
                rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
                gps_connected=_bool_to_str(enb_status.gps_connected),
                ptp_connected=_bool_to_str(enb_status.ptp_connected),
                mme_connected=_bool_to_str(enb_status.mme_connected),
                enodeb_state=enb_status.fsm_state,
            )._asdict()
    return MagmaOldEnodebdStatus(
        enodeb_serial='N/A',
        enodeb_configured='0',
        gps_latitude='0.0',
        gps_longitude='0.0',
        enodeb_connected='0',
        opstate_enabled='0',
        rf_tx_on='0',
        rf_tx_desired='N/A',
        gps_connected='0',
        ptp_connected='0',
        mme_connected='0',
        enodeb_state='N/A',
    )._asdict()


def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
    enodebd_status = _get_enodebd_status(enb_acs_manager)
    return enodebd_status._asdict()


def _get_enodebd_status(
        enb_acs_manager: StateMachineManager,
) -> MagmaEnodebdStatus:
    enb_status_by_serial = get_all_enb_status(enb_acs_manager)
    # Start from default values for enodebd status
    n_enodeb_connected = 0
    all_enodeb_configured = False
    all_enodeb_opstate_enabled = False
    all_enodeb_rf_tx_configured = False
    any_enodeb_gps_connected = False
    all_enodeb_ptp_connected = False
    all_enodeb_mme_connected = False
    gateway_gps_longitude = '0.0'
    gateway_gps_latitude = '0.0'

    def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
        return enb_status.rf_tx_on == enb_status.rf_tx_desired

    if enb_status_by_serial:
        enb_status_list = list(enb_status_by_serial.values())
        # Aggregate all eNB status for enodebd status, repetitive but
        # clearer for output purposes.
        n_enodeb_connected = sum(
            enb_status.enodeb_connected for enb_status in enb_status_list
        )
        all_enodeb_configured = all(
            enb_status.enodeb_configured for enb_status in enb_status_list
        )
        all_enodeb_mme_connected = all(
            enb_status.mme_connected for enb_status in enb_status_list
        )
        all_enodeb_opstate_enabled = all(
            enb_status.opstate_enabled for enb_status in enb_status_list
        )
        all_enodeb_ptp_connected = all(
            enb_status.ptp_connected for enb_status in enb_status_list
        )
        any_enodeb_gps_connected = any(
            enb_status.gps_connected for enb_status in enb_status_list
        )
        all_enodeb_rf_tx_configured = all(
            _is_rf_tx_configured(enb_status) for enb_status in enb_status_list
        )
        if n_enodeb_connected:
            gateway_gps_longitude = enb_status_list[0].gps_longitude
            gateway_gps_latitude = enb_status_list[0].gps_latitude

    return MagmaEnodebdStatus(
        n_enodeb_connected=str(n_enodeb_connected),
        all_enodeb_configured=str(all_enodeb_configured),
        all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
        all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
        any_enodeb_gps_connected=str(any_enodeb_gps_connected),
        all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
        all_enodeb_mme_connected=str(all_enodeb_mme_connected),
        gateway_gps_longitude=str(gateway_gps_longitude),
        gateway_gps_latitude=str(gateway_gps_latitude),
    )


def get_all_enb_status(
        enb_acs_manager: StateMachineManager,
) -> Dict[str, EnodebStatus]:
    enb_status_by_serial = {}
    serial_list = enb_acs_manager.get_connected_serial_id_list()
    for enb_serial in serial_list:
        handler = enb_acs_manager.get_handler_by_serial(enb_serial)
        status = get_enb_status(handler)
        enb_status_by_serial[enb_serial] = status

    return enb_status_by_serial


def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
    """
    Returns a dict representing the status of an enodeb

    The returned dictionary will be a subset of the following keys:
        - enodeb_connected
        - enodeb_configured
        - opstate_enabled
        - rf_tx_on
        - rf_tx_desired
        - gps_connected
        - ptp_connected
        - mme_connected
        - gps_latitude
        - gps_longitude
        - ip_address
        - cell_id

    The set of keys returned will depend on the connection status of the
    enodeb. A missing key indicates that the value is unknown.

    Returns:
        Status dictionary for the enodeb state
    """
    enodeb_configured = enodeb.is_enodeb_configured()

    # We cache GPS coordinates so try to read them before the early return
    # if the enB is not connected
    gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)

    enodeb_connected = enodeb.is_enodeb_connected()
    opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
    rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
    rf_tx_on = rf_tx_on and enodeb_connected
    try:
        enb_serial = \
            enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
        enb_cell_id = int(
            enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
        )
        rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
    except (KeyError, ConfigurationError):
        rf_tx_desired = False
        enb_cell_id = 0

    mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
    gps_connected = _get_gps_status_as_bool(enodeb)
    try:
        ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
    except ConfigurationError:
        ptp_connected = False

    return EnodebStatus(
        enodeb_configured=enodeb_configured,
        gps_latitude=gps_lat,
        gps_longitude=gps_lon,
        enodeb_connected=enodeb_connected,
        opstate_enabled=opstate_enabled,
        rf_tx_on=rf_tx_on,
        rf_tx_desired=rf_tx_desired,
        gps_connected=gps_connected,
        ptp_connected=ptp_connected,
        mme_connected=mme_connected,
        fsm_state=enodeb.get_state(),
        cell_id=enb_cell_id,
    )


def get_single_enb_status(
        device_serial: str,
        state_machine_manager: StateMachineManager,
) -> SingleEnodebStatus:
    try:
        handler = state_machine_manager.get_handler_by_serial(device_serial)
    except KeyError:
        return _empty_enb_status()

    # This namedtuple is missing IP and serial info
    status = get_enb_status(handler)

    # Get IP info
    ip = state_machine_manager.get_ip_of_serial(device_serial)

    def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
        if status:
            return SingleEnodebStatus.StatusProperty.Value('ON')
        return SingleEnodebStatus.StatusProperty.Value('OFF')

    # Build the message to return through gRPC
    enb_status = SingleEnodebStatus()
    enb_status.device_serial = device_serial
    enb_status.ip_address = ip
    enb_status.connected = get_status_property(status.enodeb_connected)
    enb_status.configured = get_status_property(status.enodeb_configured)
    enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
    enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
    enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
    enb_status.gps_connected = get_status_property(status.gps_connected)
    enb_status.ptp_connected = get_status_property(status.ptp_connected)
    enb_status.mme_connected = get_status_property(status.mme_connected)
    enb_status.gps_longitude = status.gps_longitude
    enb_status.gps_latitude = status.gps_latitude
    enb_status.fsm_state = status.fsm_state
    return enb_status


def get_operational_states(
    enb_acs_manager: StateMachineManager,
    mconfig: mconfigs_pb2.EnodebD,
) -> List[State]:
    """
    Returns: A list of State with EnodebStatus encoded as JSON
    """
    states = []
    configured_serial_ids = []
    enb_status_by_serial = get_all_enb_status(enb_acs_manager)

    # Get S1 connected eNBs
    enb_statuses = get_all_enb_state()

    for serial_id in enb_status_by_serial:
        enb_status_dict = enb_status_by_serial[serial_id]._asdict()

        # Add IP address to state
        enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
            serial_id,
        )

        # Add num of UEs connected
        num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
        enb_status_dict['ues_connected'] = num_ue_connected

        serialized = json.dumps(enb_status_dict)
        state = State(
            type="single_enodeb",
            deviceID=serial_id,
            value=serialized.encode('utf-8'),
        )
        configured_serial_ids.append(serial_id)
        states.append(state)

    # Get state for externally configured enodebs
    s1_states = get_enb_s1_connected_states(
        enb_statuses,
        configured_serial_ids,
        mconfig,
    )
    states.extend(s1_states)

    return states


def get_enb_s1_connected_states(
    enb_s1_state_map, configured_serial_ids,
    mconfig,
) -> List[State]:
    states = []
    for enb_id in enb_s1_state_map:
        enb = find_enb_by_cell_id(mconfig, enb_id)
        if enb and enb.serial_num not in configured_serial_ids:
            status = EnodebStatus(
                enodeb_configured=False,
                gps_latitude='N/A',
                gps_longitude='N/A',
                enodeb_connected=True,
                opstate_enabled=False,
                rf_tx_on=False,
                rf_tx_desired=False,
                gps_connected=False,
                ptp_connected=False,
                mme_connected=True,
                fsm_state='N/A',
                cell_id=enb_id,
            )
            status_dict = status._asdict()

            # Add IP address to state
            status_dict['ip_address'] = enb.config.ip_address

            # Add num of UEs connected to state, use cellID from mconfig
            status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)

            serialized = json.dumps(status_dict)
            state = State(
                type="single_enodeb",
                deviceID=enb.serial_num,
                value=serialized.encode('utf-8'),
            )
            states.append(state)
    return states


def _empty_enb_status() -> SingleEnodebStatus:
    enb_status = SingleEnodebStatus()
    enb_status.device_serial = 'N/A'
    enb_status.ip_address = 'N/A'
    enb_status.connected = '0'
    enb_status.configured = '0'
    enb_status.opstate_enabled = '0'
    enb_status.rf_tx_on = '0'
    enb_status.rf_tx_desired = 'N/A'
    enb_status.gps_connected = '0'
    enb_status.ptp_connected = '0'
    enb_status.mme_connected = '0'
    enb_status.gps_longitude = '0.0'
    enb_status.gps_latitude = '0.0'
    enb_status.fsm_state = 'N/A'
    return enb_status


def _parse_param_as_bool(
        enodeb: EnodebAcsStateMachine,
        param_name: ParameterName,
) -> bool:
    try:
        return _format_as_bool(enodeb.get_parameter(param_name), param_name)
    except (KeyError, ConfigurationError):
        return False


def _format_as_bool(
        param_value: Union[bool, str, int],
        param_name: Optional[Union[ParameterName, str]] = None,
) -> bool:
    """ Returns '1' for true, and '0' for false """
    stripped_value = str(param_value).lower().strip()
    if stripped_value in {'true', '1', 'enabled'}:
        return True
    elif stripped_value in {'false', '0', 'disabled', 'InProgress'}:
        return False
    else:
        logger.warning(
            '%s parameter not understood (%s)', param_name, param_value,
        )
        return False


def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
    try:
        if not enodeb.has_parameter(ParameterName.GPS_STATUS):
            return False
        else:
            param = enodeb.get_parameter(ParameterName.GPS_STATUS)
            if isinstance(param, bool):
                # No translation to do.
                return param
            stripped_value = param.lower().strip()
            if stripped_value in ['0', '2', 'inprogress']:
                # 2 = GPS locking
                return False
            elif stripped_value == '1':
                return True
            else:
                logger.warning(
                    'GPS status parameter not understood (%s)', param,
                )
                return False
    except (KeyError, ConfigurationError, AttributeError):
        return False


def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
    str, str,
]:
    """
    Read the GPS coordinates of the enB from its configuration or the
    cached coordinate file if the preceding read fails. If reading from
    enB configuration succeeds, this method will cache the new coordinates.

    Returns:
        (str, str): GPS latitude, GPS longitude
    """
    lat, lon = '', ''
    try:
        lat = enodeb.get_parameter(ParameterName.GPS_LAT)
        lon = enodeb.get_parameter(ParameterName.GPS_LONG)

        if lat != _gps_lat_cached or lon != _gps_lon_cached:
            _cache_new_gps_coords(lat, lon)
        return lat, lon
    except (KeyError, ConfigurationError):
        return _get_cached_gps_coords()
    except ValueError:
        logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
        return '0', '0'


def _get_cached_gps_coords() -> Tuple[str, str]:
    """
    Returns cached GPS coordinates if enB is disconnected or otherwise not
    reporting coordinates.

    Returns:
        (str, str): (GPS lat, GPS lon)
    """
    # pylint: disable=global-statement
    global _gps_lat_cached, _gps_lon_cached
    if _gps_lat_cached is None or _gps_lon_cached is None:
        _gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
    return _gps_lat_cached, _gps_lon_cached


def _read_gps_coords_from_file():
    try:
        with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
            lines = f.readlines()
            if len(lines) != 2:
                logger.warning(
                    'Expected to find 2 lines in GPS '
                    'coordinate file but only found %d',
                    len(lines),
                )
                return '0', '0'
            return tuple(map(lambda l: l.strip(), lines))
    except OSError:
        logger.warning('Could not open cached GPS coordinate file')
        return '0', '0'


def _cache_new_gps_coords(gps_lat, gps_lon):
    """
    Cache GPS coordinates in the module-level variables here and write them
    to a managed file on disk.

    Args:
        gps_lat (str): latitude as a string
        gps_lon (str): longitude as a string
    """
    # pylint: disable=global-statement
    global _gps_lat_cached, _gps_lon_cached
    _gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
    _write_gps_coords_to_file(gps_lat, gps_lon)


def _write_gps_coords_to_file(gps_lat, gps_lon):
    lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
    try:
        serialization_utils.write_to_file_atomically(
            CACHED_GPS_COORD_FILE_PATH,
            lines,
        )
    except OSError:
        pass


def _bool_to_str(b: bool) -> str:
    if b is True:
        return "1"
    return "0"
