blob: 757db8b74c29aa0ae4d635929bc85d2480d48f60 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020-present Open Networking Foundation
#
# SPDX-License-Identifier: Apache-2.0
import sys
import os
import time
import requests
import json
import enum
import logging
from collections import namedtuple
from pyadb import ADB
'''
Check Aether network operational status and report it to
central monitoring server
1) check mobile connctivity after toggling the airplane mode
2) check if ping to 8.8.8.8 works
'''
CONF = json.loads(
open(os.getenv('CONFIG_FILE', "./config.json")).read(),
object_hook=lambda d: namedtuple('X', d.keys())(*d.values())
)
logging.basicConfig(
filename=CONF.log_file,
format='%(asctime)s [%(levelname)s] %(message)s',
level=logging.WARN
)
ADB_GET_COMMANDS = {
"apn_mode": "settings get global airplane_mode_on",
"lte_state": "dumpsys telephony.registry | grep -m1 mDataConnectionState",
"ping_result": "ping -c 3 8.8.8.8&>/dev/null; echo $?",
"ping_dns_result": "ping -c 10 " + "8.8.8.8" +
" | tail -1 | awk '{print $4}'"
}
ADB_APN_COMMANDS = {
"home": "input keyevent 3",
"setting": "am start -a android.settings.AIRPLANE_MODE_SETTINGS",
"toggle": "input tap " + \
CONF.adb.apn_mode_toggle_location.x + " " + \
CONF.adb.apn_mode_toggle_location.y
}
class State(enum.Enum):
error = "-1"
disconnected = "0"
connecting = "1"
connected = "2"
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
edge_status = {
'name': CONF.edge_name,
'status': {
'control_plane': None,
'user_plane': 'connected'
},
'speedtest': {
'ping': {
'dns': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
}
}
}
}
def _run_adb_shell(adb, command):
result, error = adb.shell_command(command)
# error is returned even when succeeded
# so ignore error value here
if result:
logging.debug(result)
time.sleep(2)
result = result[0] if result is not None else None
return True, result
def get_control_plane_state(adb):
'''
check aether control plane works by toggling airplane mode
'''
# get the current airplane mode
success, result = _run_adb_shell(adb, ADB_GET_COMMANDS['apn_mode'])
if not success or result is None:
return State.error, result
apn_mode_on = True if result == "1" else False
# toggle the airplane mode
for command in ADB_APN_COMMANDS.values():
success, result = _run_adb_shell(adb, command)
if not success:
return State.error, result
if not apn_mode_on:
success, result = _run_adb_shell(adb, ADB_APN_COMMANDS['toggle'])
if not success:
return State.error, result
# get connection state
state = State.connecting.value
retry_count = 10
while retry_count > 0:
success, result = _run_adb_shell(adb, ADB_GET_COMMANDS['lte_state'])
if not success or result is None:
continue
state = result.split("=")[1]
if state == State.connected.value:
break
time.sleep(1)
retry_count -= 1
if not State.has_value(state):
return State.error, None
return State(state), None
def get_user_plane_state(adb):
'''
checks aether user plane connectivity with ping to 8.8.8.8
'''
success, result = _run_adb_shell(adb, ADB_GET_COMMANDS['ping_result'])
if not success or result is None:
return State.error, result
state = State.connected if result == "0" else State.disconnected
return state, None
def run_ping_test(adb, ip, count):
'''
Runs the ping test
Input: IP to ping, # times to ping
Returns: dict of the min/avg/max/stddev numbers from the ping command result
'''
result = {'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0}
try:
commandResult, commandOutput = _run_adb_shell(adb, ADB_GET_COMMANDS['ping_dns_result'])
pingResult = commandOutput.split("/")
result = {'min': float(pingResult[0]),
'avg': float(pingResult[1]),
'max': float(pingResult[2]),
'stddev': float(pingResult[3])}
except Exception as e:
logging.error("Ping test failed for " + ip + ": %s", e)
return result
def get_ping_test(adb):
'''
Each ping test result saves the min/avg/max/stddev to dict.
1) Performs ping test to Google Public DNS for 10 iterations.
2) # TODO: Performs ping to device on network.
'''
speedtest_ping = {}
speedtest_ping['dns'] = run_ping_test(adb, "8.8.8.8", 10)
return speedtest_ping
def report_aether_network_state():
'''
report the aether network state to the monitoring server
'''
logging.info("Sending report %s", edge_status)
try:
result = requests.post(CONF.report_url, json=edge_status)
except requests.exceptions.ConnectionError:
logging.error("Failed to report for %s", e)
pass
try:
result.raise_for_status()
except requests.exceptions.HTTPError as e:
logging.error("Failed to report for %s", e)
pass
def main():
adb = ADB()
if adb.set_adb_path(CONF.adb.path) is False:
logging.error(CONF.adb.path + " not found")
sys.exit(1)
dev = adb.get_devices()
if len(dev) == 0:
logging.error("No device found")
sys.exit(1)
adb.set_target_device(dev[0])
success, result = _run_adb_shell(adb, "svc data enable")
if not success:
logging.error("Failed to turn data on")
sys.exit(1)
while True:
_run_adb_shell(adb, "svc power stayon true")
cp_state, err = get_control_plane_state(adb)
up_state, err = get_user_plane_state(adb)
speedtest_ping = get_ping_test(adb)
edge_status['status']['control_plane'] = cp_state.name
edge_status['status']['user_plane'] = up_state.name
edge_status['speedtest']['ping'] = speedtest_ping
report_aether_network_state()
_run_adb_shell(adb, "svc power stayon false")
time.sleep(CONF.report_interval)
if __name__ == "__main__":
main()