blob: 1d3147451808094aae3ca15c222df4fc5227c080 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020-present Open Networking Foundation
#
# SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
import sys
import os
import json
import logging
import enum
import requests
import time
import serial
import subprocess
from collections import namedtuple
'''
"Simple" script that checks Aether network operational status periodically
by controlling the attached 4G/LTE modem with AT commands and
report the result to the central monitoring server.
'''
CONF = json.loads(
open(os.getenv('CONFIG_FILE', "./config.json")).read(),
object_hook=lambda d: namedtuple('X', d.keys())(*d.values())
)
logging.basicConfig(
filename=CONF.log_file,
format='%(asctime)s [%(levelname)s] %(message)s',
level=logging.getLevelName(CONF.log_level)
)
class State(enum.Enum):
error = "-1"
disconnected = "0"
connected = "1"
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
class Modem():
log = logging.getLogger('aether_edge_monitoring.Modem')
read_timeout = 0.1
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self._response = None
def get_modem_port(self):
cmd = "ls " + CONF.modem.port
sp = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
rc = sp.wait()
ret,err = sp.communicate()
if err != "" :
logging.error("unable to find serial port " + err)
ret = ret.replace(CONF.modem.port,"").strip()
logging.info("Modem.get_modem_port found " + ret)
return ret
def connect(self):
self.port=self.get_modem_port()
logging.info("modem.connect Port: %s, BaudRate: %i",self.port,self.baudrate)
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1)
def _write(self, command):
if self.serial.inWaiting() > 0:
self.serial.flushInput()
self._response = b""
self.serial.write(bytearray(command + "\r", "ascii"))
read = self.serial.inWaiting()
while True:
if read > 0:
self._response += self.serial.read(read)
else:
time.sleep(self.read_timeout)
read = self.serial.inWaiting()
if read == 0:
break
return self._response.decode("ascii").replace('\r\n', ' ')
def write(self, command, wait_resp=True):
response = self._write(command)
self.log.debug("%s: %s", command, response)
if wait_resp and "ERROR" in response:
return False, None
return True, response
def get_state(self):
success, result = self.write('AT+CGATT?')
if not success or 'CGATT:' not in result:
return State.error
state = result.split('CGATT:')[1].split(' ')[0]
return State(state)
def close(self):
self.serial.close()
def get_control_plane_state(modem):
# Disable radio fuction
# "echo" works more stable than serial for this action
try:
logging.debug("echo 'AT+CFUN=0' > " + modem.port)
subprocess.check_output(
"echo 'AT+CFUN=0' > " + modem.port, shell=True)
except subprocess.CalledProcessError as e:
logging.error("Write 'AT+CFUN=0' failed")
return State.error
# Wait until the modem is fully disconnected
retry = 0
state = None
while retry < CONF.detach_timeout:
state = modem.get_state()
if state is State.disconnected:
break
time.sleep(1)
retry += 1
if state is not State.disconnected:
logging.error("Failed to disconnect")
return State.error
time.sleep(2)
# Enable radio function
# "echo" works more stable than serial for this action
try:
logging.debug("echo 'AT+CFUN=1' > " + modem.port)
subprocess.check_output(
"echo 'AT+CFUN=1' > " + modem.port, shell=True)
except subprocess.CalledProcessError as e:
logging.error("Write 'AT+CFUN=1' failed")
return State.error
# Wait attach_timeout sec for the modem to be fully connected
retry = 0
while retry < CONF.attach_timeout:
state = modem.get_state()
if state is State.connected:
break
time.sleep(1)
retry += 1
# CGATT sometimes returns None
if state is State.error:
state = State.disconnected
return state
def get_user_plane_state(modem):
try:
subprocess.check_output(
"ping -c 3 " + CONF.ips.user_plane_ping_test + ">/dev/null 2>&1",
shell=True)
return State.connected
except subprocess.CalledProcessError as e:
logging.warning("User plane test failed")
return State.disconnected
def run_ping_test(ip, count):
'''
Runs the ping test
Input: IP to ping, # times to ping
Returns: dict of the min/avg/max/stddev numbers from the ping command result
'''
result = {'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0}
try:
pingResult = subprocess.check_output(
"ping -c " + str(count) + " " + ip + \
" | tail -1 | awk '{print $4}'",
shell=True).decode("UTF-8").split("/")
result = {'min': float(pingResult[0]),
'avg': float(pingResult[1]),
'max': float(pingResult[2]),
'stddev': float(pingResult[3])}
except Exception as e:
logging.error("Ping test failed for " + ip + ": %s", e)
return result
def get_ping_test(modem):
'''
Prepares the ping test.
Each ping test result saves the min/avg/max/stddev to dict.
1) Performs ping test to Google Public DNS for 10 iterations.
2) # TODO: Performs ping to device on network.
'''
speedtest_ping = {}
speedtest_ping['dns'] = run_ping_test(CONF.ips.speedtest_ping_dns, 10)
return speedtest_ping
def run_iperf_test(ip, port, time_duration, is_downlink):
'''
Runs iperf test to specified IP in the config file.
- Runs for 10 seconds (10 iterations)
- Retrieves downlink and uplink test results from json output
'''
result = 0.0
if not ip:
return result
try:
iperfResult = json.loads(subprocess.check_output(
"iperf3 -c " + ip +
" -p " + str(port) +
" -t " + str(time_duration) +
(" -R " if is_downlink else "") +
" --json", shell=True).decode("UTF-8"))
received_mbps = iperfResult['end']['sum_received']['bits_per_second'] / 1000000
sent_mbps = iperfResult['end']['sum_sent']['bits_per_second'] / 1000000.0
result = received_mbps if is_downlink else sent_mbps
except Exception as e:
logging.error("iperf test failed for " + ip + ": %s", e)
return result
def get_iperf_test(modem):
'''
Prepares the iperf test.
'''
speedtest_iperf = {}
speedtest_iperf['cluster'] = {}
speedtest_iperf['cluster']['downlink'] = run_iperf_test(CONF.ips.speedtest_iperf, CONF.iperf_port, 10, True)
speedtest_iperf['cluster']['uplink'] = run_iperf_test(CONF.ips.speedtest_iperf, CONF.iperf_port, 10, False)
return speedtest_iperf
def get_signal_quality(modem):
success, result = modem.write('AT+CESQ')
logging.debug("get_signal_quality success %i result %s",success,result)
if not success or 'CESQ: ' not in result:
logging.error("Failed to get signal quality")
return {'rsrq':0, 'rsrp':0}
logging.debug("%s", result)
tmp_rsrq = result.split('CESQ:')[1].split(',')[4]
tmp_rsrp = result.split('CESQ:')[1].split(',')[5]
rsrq = int(tmp_rsrq.strip())
rsrp = int(tmp_rsrp.strip().split(' ')[0])
result = {
'rsrq': 0 if rsrq is 255 else rsrq,
'rsrp': 0 if rsrp is 255 else rsrp
}
return result
def report_status(signal_quality, cp_state=None, up_state=None, speedtest_ping=None, speedtest_iperf=None):
report = {
'name': CONF.edge_name,
'status': {
'control_plane': "disconnected",
'user_plane': "disconnected"
},
'speedtest': {
'ping': {
'dns': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
}
},
'iperf': {
'cluster': {
'downlink': 0.0,
'uplink': 0.0
}
}
},
'signal_quality': {
'rsrq': 0,
'rsrp': 0
}
}
if cp_state is not None:
report['status']['control_plane'] = cp_state.name
if up_state is not None:
report['status']['user_plane'] = up_state.name
if speedtest_ping is not None:
report['speedtest']['ping'] = speedtest_ping
if speedtest_iperf is not None:
report['speedtest']['iperf'] = speedtest_iperf
report['signal_quality'] = signal_quality
logging.info("Sending report %s", report)
global cycles
cycles += 1
logging.info("Number of cycles since modem restart %i",cycles)
try:
result = requests.post(CONF.report_url, json=report)
except requests.exceptions.ConnectionError as e:
logging.error("Failed to report for %s", e)
pass
try:
result.raise_for_status()
except requests.exceptions.HTTPError as e:
logging.error("Failed to report for %s", e)
pass
time.sleep(CONF.report_interval)
def reset_usb():
cmd = "/usr/sbin/uhubctl -a 0 -l 2" # -a 0 = action is shutdown -l 2 location = bus 2 on pi controls power to all hubs
ret = subprocess.call(cmd,shell=True)
logging.info("Shutting down usb hub 2 results %s" , ret)
time.sleep(10)# let power down process settle out
cmd = "/usr/sbin/uhubctl -a 1 -l 2" # -a 1 = action is start -l 2 location = bus 2 on pi controls power to all hubs
ret = subprocess.call(cmd,shell=True)
logging.info("Starting up usb hub 2 results %s" , ret)
time.sleep(10) #allow dbus to finish
global cycles
cycles = 0
def main():
global cycles
cycles = 0
for ip in CONF.ips:
if not ip:
continue
try:
subprocess.check_output("sudo ip route replace {}/32 via {}".format(
ip, CONF.modem.ip_addr), shell=True)
except subprocess.CalledProcessError as e:
logging.error("Failed to add routes", e.returncode, e.output)
reset_usb()
sys.exit(1)
modem = Modem(CONF.modem.port, CONF.modem.baud)
try:
modem.connect()
except serial.serialutil.SerialException as e:
logging.error("Failed to connect the modem for %s", e)
sys.exit(1)
while True:
signal_quality = get_signal_quality(modem)
cp_state = get_control_plane_state(modem)
if cp_state is State.error:
logging.error("Modem is in error state.")
reset_usb()
sys.exit(1)
if cp_state is State.disconnected:
# Failed to attach, don't need to run other tests
report_status(signal_quality)
continue
up_state = get_user_plane_state(modem)
if up_state is State.disconnected:
# Basic user plane test failed, don't need to run the rest of tests
report_status(signal_quality, cp_state)
continue
speedtest_ping = get_ping_test(modem)
speedtest_iperf = get_iperf_test(modem)
report_status(signal_quality, cp_state, up_state, speedtest_ping, speedtest_iperf)
modem.close()
if __name__ == "__main__":
main()