Implement parallel start of ONOS instances when forming/setting up a ONOS cluster.
For now, restrict it to -async or --async-mode option for cord-test.py setup or run mode.
Change-Id: If366e7b7370ede3574e070a23c5e17e723abe677
diff --git a/src/test/utils/CordContainer.py b/src/test/utils/CordContainer.py
index 8391273..24aa6b5 100644
--- a/src/test/utils/CordContainer.py
+++ b/src/test/utils/CordContainer.py
@@ -17,13 +17,16 @@
import io
import json
import yaml
+import errno
from pyroute2 import IPRoute
+from pyroute2.netlink import NetlinkError
from itertools import chain
from nsenter import Namespace
from docker import Client
from shutil import rmtree
from OnosCtrl import OnosCtrl
from OnosLog import OnosLog
+from threadPool import ThreadPool
class docker_netns(object):
@@ -159,7 +162,14 @@
ip = IPRoute()
br = ip.link_lookup(ifname=quagga_config['bridge'])
if len(br) == 0:
- ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+ try:
+ ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+ except NetlinkError as e:
+ err, _ = e.args
+ if err == errno.EEXIST:
+ pass
+ else:
+ raise NetlinkError(*e.args)
br = ip.link_lookup(ifname=quagga_config['bridge'])
br = br[0]
ip.link('set', index=br, state='up')
@@ -299,7 +309,7 @@
class Onos(Container):
- quagga_config = ( { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, )
+ quagga_config = [ { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, ]
SYSTEM_MEMORY = (get_mem(),) * 2
INSTANCE_MEMORY = (get_mem(instances=3),) * 2
JAVA_OPTS = '-Xms{} -Xmx{} -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode'.format(*SYSTEM_MEMORY)#-XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
@@ -310,7 +320,7 @@
('igmp', '1.1-SNAPSHOT'),
#('vtn', '1.1-SNAPSHOT'),
)
- ports = [ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
+ ports = [] #[ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
setup_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'setup')
host_config_dir = os.path.join(setup_dir, 'onos-config')
guest_config_dir = '/root/onos/config'
@@ -377,7 +387,7 @@
def __init__(self, name = NAME, image = IMAGE, prefix = PREFIX, tag = TAG,
boot_delay = 20, restart = False, network_cfg = None,
- cluster = False, data_volume = None):
+ cluster = False, data_volume = None, async = False, quagga_config = None):
if restart is True:
##Find the right image to restart
running_image = filter(lambda c: c['Names'][0] == '/{}'.format(name), self.dckr.containers())
@@ -388,6 +398,8 @@
tag = image_name.split(':')[1]
except: pass
+ if quagga_config is not None:
+ self.quagga_config = quagga_config
super(Onos, self).__init__(name, image, prefix = prefix, tag = tag, quagga_config = self.quagga_config)
self.boot_delay = boot_delay
self.data_map = None
@@ -422,31 +434,70 @@
json_data = json.dumps(network_cfg, indent=4)
with open('{}/network-cfg.json'.format(self.host_config_dir), 'w') as f:
f.write(json_data)
- print('Starting ONOS container %s' %self.name)
- self.start(ports = self.ports, environment = self.env,
- host_config = self.host_config, volumes = self.volumes, tty = True)
- if not restart:
- ##wait a bit before fetching IP to regenerate cluster cfg
- time.sleep(5)
- ip = self.ip()
- ##Just a quick hack/check to ensure we don't regenerate in the common case.
- ##As ONOS is usually the first test container that is started
- if cluster is False:
- if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
- print('Regenerating ONOS cluster cfg for ip %s' %ip)
- self.generate_cluster_cfg(ip)
- self.kill()
- self.remove_container(self.name, force=True)
- print('Restarting ONOS container %s' %self.name)
- self.start(ports = self.ports, environment = self.env,
- host_config = self.host_config, volumes = self.volumes, tty = True)
- print('Waiting for ONOS to boot')
- time.sleep(boot_delay)
- self.wait_for_onos_start(self.ip())
+ if cluster is False or async is False:
+ print('Starting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ if not restart:
+ ##wait a bit before fetching IP to regenerate cluster cfg
+ time.sleep(5)
+ ip = self.ip()
+ ##Just a quick hack/check to ensure we don't regenerate in the common case.
+ ##As ONOS is usually the first test container that is started
+ if cluster is False:
+ if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
+ print('Regenerating ONOS cluster cfg for ip %s' %ip)
+ self.generate_cluster_cfg(ip)
+ self.kill()
+ self.remove_container(self.name, force=True)
+ print('Restarting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ print('Waiting for ONOS to boot')
+ time.sleep(boot_delay)
+ self.wait_for_onos_start(self.ip())
+ self.running = True
+ else:
+ self.running = False
+ else:
+ self.running = True
+ if self.running:
+ self.ipaddr = self.ip()
+ if cluster is False:
+ self.install_cord_apps(self.ipaddr)
+ @classmethod
+ def get_quagga_config(cls, instance = 0):
+ quagga_config = cls.quagga_config[:]
+ if instance == 0:
+ return quagga_config
+ ip = quagga_config[0]['ip']
+ octets = ip.split('.')
+ octets[3] = str((int(octets[3]) + 1) & 255)
+ ip = '.'.join(octets)
+ quagga_config[0]['ip'] = ip
+ return quagga_config
+
+ @classmethod
+ def start_cluster_async(cls, onos_instances):
+ instances = filter(lambda o: o.running == False, onos_instances)
+ if not instances:
+ return
+ tpool = ThreadPool(len(instances), queue_size = 1, wait_timeout = 1)
+ for onos in instances:
+ tpool.addTask(onos.start_async)
+ tpool.cleanUpThreads()
+
+ def start_async(self):
+ print('Starting ONOS container %s' %self.name)
+ self.start(ports = self.ports, environment = self.env,
+ host_config = self.host_config, volumes = self.volumes, tty = True)
+ time.sleep(3)
self.ipaddr = self.ip()
- if cluster is False:
- self.install_cord_apps(self.ipaddr)
+ print('Waiting for ONOS container %s to start' %self.name)
+ self.wait_for_onos_start(self.ipaddr)
+ self.running = True
+ print('ONOS container %s started' %self.name)
@classmethod
def wait_for_onos_start(cls, ip, tries = 30):