Implement parallel start of ONOS instances when forming/setting up a ONOS cluster.
For now, restrict it to -async or --async-mode option for cord-test.py setup or run mode.
Change-Id: If366e7b7370ede3574e070a23c5e17e723abe677
diff --git a/src/test/setup/cord-test.py b/src/test/setup/cord-test.py
index bfa2a8d..ae10f9d 100755
--- a/src/test/setup/cord-test.py
+++ b/src/test/setup/cord-test.py
@@ -400,8 +400,10 @@
Container.IMAGE_PREFIX = args.prefix
cluster_mode = True if args.onos_instances > 1 else False
+ async_mode = cluster_mode and args.async_mode
existing_list = [ c['Names'][0][1:] for c in Container.dckr.containers() if c['Image'] == args.onos ]
setup_cluster = False if len(existing_list) == args.onos_instances else True
+ onos_ips = []
if onos_ip is None:
image_names = args.onos.rsplit(':', 1)
onos_cnt['image'] = image_names[0]
@@ -417,9 +419,14 @@
Onos.TAG = onos_cnt['tag']
data_volume = '{}-data'.format(Onos.NAME) if args.shared_volume else None
onos = Onos(image = Onos.IMAGE,
- tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode, data_volume = data_volume)
- onos_ip = onos.ip()
- onos_ips = [ onos_ip ]
+ tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
+ data_volume = data_volume, async = async_mode)
+ if onos.running:
+ onos_ip = onos.ipaddr
+ onos_ips.append(onos_ip)
+ else:
+ onos_ips.append(onos_ip)
+
num_onos_instances = args.onos_instances
if num_onos_instances > 1 and onos is not None:
onos_instances = []
@@ -427,10 +434,17 @@
for i in range(1, num_onos_instances):
name = '{}-{}'.format(Onos.NAME, i+1)
data_volume = '{}-data'.format(name) if args.shared_volume else None
+ quagga_config = Onos.get_quagga_config(i)
onos = Onos(name = name, image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
- data_volume = data_volume)
+ data_volume = data_volume, async = async_mode, quagga_config = quagga_config)
onos_instances.append(onos)
- onos_ips.append(onos.ipaddr)
+ if onos.running:
+ onos_ips.append(onos.ipaddr)
+ if async_mode is True:
+ Onos.start_cluster_async(onos_instances)
+ if not onos_ips:
+ for onos in onos_instances:
+ onos_ips.append(onos.ipaddr)
try:
for ip in onos_ips:
print('Installing cord tester ONOS app %s in ONOS instance %s' %(args.app,ip))
@@ -640,6 +654,7 @@
Onos.PREFIX = args.prefix
Onos.TAG = onos_cnt['tag']
cluster_mode = True if args.onos_instances > 1 else False
+ async_mode = cluster_mode and args.async_mode
existing_list = [ c['Names'][0][1:] for c in Container.dckr.containers() if c['Image'] == args.onos ]
setup_cluster = False if len(existing_list) == args.onos_instances else True
#cleanup existing volumes before forming a new cluster
@@ -651,24 +666,35 @@
except: pass
onos = None
+ onos_ips = []
if onos_ip is None:
data_volume = '{}-data'.format(Onos.NAME) if args.shared_volume else None
onos = Onos(image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
- data_volume = data_volume)
- onos_ip = onos.ip()
+ data_volume = data_volume, async = async_mode)
+ if onos.running:
+ onos_ip = onos.ipaddr
+ onos_ips.append(onos_ip)
+ else:
+ onos_ips.append(onos_ip)
num_onos_instances = args.onos_instances
- onos_ips = [ onos_ip ]
if num_onos_instances > 1 and onos is not None:
onos_instances = []
onos_instances.append(onos)
for i in range(1, num_onos_instances):
name = '{}-{}'.format(Onos.NAME, i+1)
data_volume = '{}-data'.format(name) if args.shared_volume else None
+ quagga_config = Onos.get_quagga_config(i)
onos = Onos(name = name, image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
- data_volume = data_volume)
+ data_volume = data_volume, async = async_mode, quagga_config = quagga_config)
onos_instances.append(onos)
- onos_ips.append(onos.ipaddr)
+ if onos.running:
+ onos_ips.append(onos.ipaddr)
+ if async_mode is True:
+ Onos.start_cluster_async(onos_instances)
+ if not onos_ips:
+ for onos in onos_instances:
+ onos_ips.append(onos.ipaddr)
if setup_cluster is True:
Onos.setup_cluster(onos_instances)
@@ -975,6 +1001,8 @@
parser_run.add_argument('-j', '--onos-instances', default=1, type=int,
help='Specify number to test onos instances to form cluster')
parser_run.add_argument('-v', '--shared-volume', action='store_true', help='Start ONOS cluster instances with shared volume')
+ parser_run.add_argument('-async', '--async-mode', action='store_true',
+ help='Start ONOS cluster instances in async mode')
parser_run.add_argument('-log', '--log-level', default=onos_log_level,
choices=['DEBUG','TRACE','ERROR','WARN','INFO'],
type=str,
@@ -1010,6 +1038,8 @@
help='Specify number of test onos instances to spawn')
parser_setup.add_argument('-v', '--shared-volume', action='store_true',
help='Start ONOS cluster instances with shared volume')
+ parser_setup.add_argument('-async', '--async-mode', action='store_true',
+ help='Start ONOS cluster instances in async mode')
parser_setup.add_argument('-f', '--foreground', action='store_true', help='Run in foreground')
parser_setup.set_defaults(func=setupCordTester)
diff --git a/src/test/utils/CordContainer.py b/src/test/utils/CordContainer.py
index 8391273..24aa6b5 100644
--- a/src/test/utils/CordContainer.py
+++ b/src/test/utils/CordContainer.py
@@ -17,13 +17,16 @@
import io
import json
import yaml
+import errno
from pyroute2 import IPRoute
+from pyroute2.netlink import NetlinkError
from itertools import chain
from nsenter import Namespace
from docker import Client
from shutil import rmtree
from OnosCtrl import OnosCtrl
from OnosLog import OnosLog
+from threadPool import ThreadPool
class docker_netns(object):
@@ -159,7 +162,14 @@
ip = IPRoute()
br = ip.link_lookup(ifname=quagga_config['bridge'])
if len(br) == 0:
- ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+ try:
+ ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+ except NetlinkError as e:
+ err, _ = e.args
+ if err == errno.EEXIST:
+ pass
+ else:
+ raise NetlinkError(*e.args)
br = ip.link_lookup(ifname=quagga_config['bridge'])
br = br[0]
ip.link('set', index=br, state='up')
@@ -299,7 +309,7 @@
class Onos(Container):
- quagga_config = ( { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, )
+ quagga_config = [ { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, ]
SYSTEM_MEMORY = (get_mem(),) * 2
INSTANCE_MEMORY = (get_mem(instances=3),) * 2
JAVA_OPTS = '-Xms{} -Xmx{} -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode'.format(*SYSTEM_MEMORY)#-XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
@@ -310,7 +320,7 @@
('igmp', '1.1-SNAPSHOT'),
#('vtn', '1.1-SNAPSHOT'),
)
- ports = [ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
+ ports = [] #[ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
setup_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'setup')
host_config_dir = os.path.join(setup_dir, 'onos-config')
guest_config_dir = '/root/onos/config'
@@ -377,7 +387,7 @@
def __init__(self, name = NAME, image = IMAGE, prefix = PREFIX, tag = TAG,
boot_delay = 20, restart = False, network_cfg = None,
- cluster = False, data_volume = None):
+ cluster = False, data_volume = None, async = False, quagga_config = None):
if restart is True:
##Find the right image to restart
running_image = filter(lambda c: c['Names'][0] == '/{}'.format(name), self.dckr.containers())
@@ -388,6 +398,8 @@
tag = image_name.split(':')[1]
except: pass
+ if quagga_config is not None:
+ self.quagga_config = quagga_config
super(Onos, self).__init__(name, image, prefix = prefix, tag = tag, quagga_config = self.quagga_config)
self.boot_delay = boot_delay
self.data_map = None
@@ -422,31 +434,70 @@
json_data = json.dumps(network_cfg, indent=4)
with open('{}/network-cfg.json'.format(self.host_config_dir), 'w') as f:
f.write(json_data)
- print('Starting ONOS container %s' %self.name)
- self.start(ports = self.ports, environment = self.env,
- host_config = self.host_config, volumes = self.volumes, tty = True)
- if not restart:
- ##wait a bit before fetching IP to regenerate cluster cfg
- time.sleep(5)
- ip = self.ip()
- ##Just a quick hack/check to ensure we don't regenerate in the common case.
- ##As ONOS is usually the first test container that is started
- if cluster is False:
- if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
- print('Regenerating ONOS cluster cfg for ip %s' %ip)
- self.generate_cluster_cfg(ip)
- self.kill()
- self.remove_container(self.name, force=True)
- print('Restarting ONOS container %s' %self.name)
- self.start(ports = self.ports, environment = self.env,
- host_config = self.host_config, volumes = self.volumes, tty = True)
- print('Waiting for ONOS to boot')
- time.sleep(boot_delay)
- self.wait_for_onos_start(self.ip())
+ if cluster is False or async is False:
+ print('Starting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ if not restart:
+ ##wait a bit before fetching IP to regenerate cluster cfg
+ time.sleep(5)
+ ip = self.ip()
+ ##Just a quick hack/check to ensure we don't regenerate in the common case.
+ ##As ONOS is usually the first test container that is started
+ if cluster is False:
+ if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
+ print('Regenerating ONOS cluster cfg for ip %s' %ip)
+ self.generate_cluster_cfg(ip)
+ self.kill()
+ self.remove_container(self.name, force=True)
+ print('Restarting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ print('Waiting for ONOS to boot')
+ time.sleep(boot_delay)
+ self.wait_for_onos_start(self.ip())
+ self.running = True
+ else:
+ self.running = False
+ else:
+ self.running = True
+ if self.running:
+ self.ipaddr = self.ip()
+ if cluster is False:
+ self.install_cord_apps(self.ipaddr)
+ @classmethod
+ def get_quagga_config(cls, instance = 0):
+ quagga_config = cls.quagga_config[:]
+ if instance == 0:
+ return quagga_config
+ ip = quagga_config[0]['ip']
+ octets = ip.split('.')
+ octets[3] = str((int(octets[3]) + 1) & 255)
+ ip = '.'.join(octets)
+ quagga_config[0]['ip'] = ip
+ return quagga_config
+
+ @classmethod
+ def start_cluster_async(cls, onos_instances):
+ instances = filter(lambda o: o.running == False, onos_instances)
+ if not instances:
+ return
+ tpool = ThreadPool(len(instances), queue_size = 1, wait_timeout = 1)
+ for onos in instances:
+ tpool.addTask(onos.start_async)
+ tpool.cleanUpThreads()
+
+ def start_async(self):
+ print('Starting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ time.sleep(3)
self.ipaddr = self.ip()
- if cluster is False:
- self.install_cord_apps(self.ipaddr)
+ print('Waiting for ONOS container %s to start' %self.name)
+ self.wait_for_onos_start(self.ipaddr)
+ self.running = True
+ print('ONOS container %s started' %self.name)
@classmethod
def wait_for_onos_start(cls, ip, tries = 30):