blob: 97039f4977003ec5989483cb70112ee1c0cefb3c [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import socket
import fcntl
import struct
import os
import logging
log_test = logging.getLogger('cordTester')
test_consolehandler = logging.StreamHandler()
#test_consolehandler.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
log_test.addHandler(test_consolehandler)
# we use subprocess as commands.getstatusoutput would be deprecated
def getstatusoutput(cmd):
command = [ '/bin/bash', '-c', cmd ]
p = subprocess.Popen(command, stdout = subprocess.PIPE)
out, _ = p.communicate()
return p.returncode, out.strip()
def get_ip(iface):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
info = fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', bytes(iface[:15])))
except:
info = None
s.close()
if info:
return '.'.join( [ str(ord(c)) for c in info[20:24] ] )
return None
def get_mac(iface = None, pad = 4):
if iface is None:
iface = os.getenv('TEST_SWITCH', 'ovsbr0')
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(iface[:15])))
except:
info = ['0'] * 24
s.close()
sep = ''
if pad == 0:
sep = ':'
return '0'*pad + sep.join(['%02x' %ord(char) for char in info[18:24]])
def get_default_gw():
cmd = "ip route show | grep default | head -1 | awk '{print $3}'"
cmd_dev = "ip route show | grep default | head -1 | awk '{print $NF}'"
st, gw = getstatusoutput(cmd)
st2, gw_device = getstatusoutput(cmd_dev)
if st != 0:
gw = None
if st2 != 0:
gw_device = None
return gw, gw_device
def get_controllers():
controllers = os.getenv('ONOS_CONTROLLER_IP') or 'localhost'
return controllers.split(',')
def get_controller():
controllers = get_controllers()
return controllers[0]
def running_on_pod():
"""If we are running on Ciab or inside a physical podd, key file would be set"""
return True if os.environ.get('SSH_KEY_FILE', None) else False