Implement parallel start of ONOS instances when forming/setting up a ONOS cluster.
For now, restrict it to -async or --async-mode option for cord-test.py setup or run mode.

Change-Id: If366e7b7370ede3574e070a23c5e17e723abe677
diff --git a/src/test/setup/cord-test.py b/src/test/setup/cord-test.py
index bfa2a8d..ae10f9d 100755
--- a/src/test/setup/cord-test.py
+++ b/src/test/setup/cord-test.py
@@ -400,8 +400,10 @@
 
     Container.IMAGE_PREFIX = args.prefix
     cluster_mode = True if args.onos_instances > 1 else False
+    async_mode = cluster_mode and args.async_mode
     existing_list = [ c['Names'][0][1:] for c in Container.dckr.containers() if c['Image'] == args.onos ]
     setup_cluster = False if len(existing_list) == args.onos_instances else True
+    onos_ips = []
     if onos_ip is None:
         image_names = args.onos.rsplit(':', 1)
         onos_cnt['image'] = image_names[0]
@@ -417,9 +419,14 @@
         Onos.TAG = onos_cnt['tag']
         data_volume = '{}-data'.format(Onos.NAME) if args.shared_volume else None
         onos = Onos(image = Onos.IMAGE,
-                    tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode, data_volume = data_volume)
-        onos_ip = onos.ip()
-    onos_ips = [ onos_ip ]
+                    tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
+                    data_volume = data_volume, async = async_mode)
+        if onos.running:
+            onos_ip = onos.ipaddr
+            onos_ips.append(onos_ip)
+    else:
+        onos_ips.append(onos_ip)
+
     num_onos_instances = args.onos_instances
     if num_onos_instances > 1 and onos is not None:
         onos_instances = []
@@ -427,10 +434,17 @@
         for i in range(1, num_onos_instances):
             name = '{}-{}'.format(Onos.NAME, i+1)
             data_volume = '{}-data'.format(name) if args.shared_volume else None
+            quagga_config = Onos.get_quagga_config(i)
             onos = Onos(name = name, image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
-                        data_volume = data_volume)
+                        data_volume = data_volume, async = async_mode, quagga_config = quagga_config)
             onos_instances.append(onos)
-            onos_ips.append(onos.ipaddr)
+            if onos.running:
+                onos_ips.append(onos.ipaddr)
+        if async_mode is True:
+            Onos.start_cluster_async(onos_instances)
+        if not onos_ips:
+            for onos in onos_instances:
+                onos_ips.append(onos.ipaddr)
         try:
             for ip in onos_ips:
                 print('Installing cord tester ONOS app %s in ONOS instance %s' %(args.app,ip))
@@ -640,6 +654,7 @@
     Onos.PREFIX = args.prefix
     Onos.TAG = onos_cnt['tag']
     cluster_mode = True if args.onos_instances > 1 else False
+    async_mode = cluster_mode and args.async_mode
     existing_list = [ c['Names'][0][1:] for c in Container.dckr.containers() if c['Image'] == args.onos ]
     setup_cluster = False if len(existing_list) == args.onos_instances else True
     #cleanup existing volumes before forming a new cluster
@@ -651,24 +666,35 @@
         except: pass
 
     onos = None
+    onos_ips = []
     if onos_ip is None:
         data_volume = '{}-data'.format(Onos.NAME) if args.shared_volume else None
         onos = Onos(image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
-                    data_volume = data_volume)
-        onos_ip = onos.ip()
+                    data_volume = data_volume, async = async_mode)
+        if onos.running:
+            onos_ip = onos.ipaddr
+            onos_ips.append(onos_ip)
+    else:
+        onos_ips.append(onos_ip)
 
     num_onos_instances = args.onos_instances
-    onos_ips = [ onos_ip ]
     if num_onos_instances > 1 and onos is not None:
         onos_instances = []
         onos_instances.append(onos)
         for i in range(1, num_onos_instances):
             name = '{}-{}'.format(Onos.NAME, i+1)
             data_volume = '{}-data'.format(name) if args.shared_volume else None
+            quagga_config = Onos.get_quagga_config(i)
             onos = Onos(name = name, image = Onos.IMAGE, tag = Onos.TAG, boot_delay = 60, cluster = cluster_mode,
-                        data_volume = data_volume)
+                        data_volume = data_volume, async = async_mode, quagga_config = quagga_config)
             onos_instances.append(onos)
-            onos_ips.append(onos.ipaddr)
+            if onos.running:
+                onos_ips.append(onos.ipaddr)
+        if async_mode is True:
+            Onos.start_cluster_async(onos_instances)
+        if not onos_ips:
+            for onos in onos_instances:
+                onos_ips.append(onos.ipaddr)
         if setup_cluster is True:
             Onos.setup_cluster(onos_instances)
 
@@ -975,6 +1001,8 @@
     parser_run.add_argument('-j', '--onos-instances', default=1, type=int,
                             help='Specify number to test onos instances to form cluster')
     parser_run.add_argument('-v', '--shared-volume', action='store_true', help='Start ONOS cluster instances with shared volume')
+    parser_run.add_argument('-async', '--async-mode', action='store_true',
+                            help='Start ONOS cluster instances in async mode')
     parser_run.add_argument('-log', '--log-level', default=onos_log_level,
                             choices=['DEBUG','TRACE','ERROR','WARN','INFO'],
                             type=str,
@@ -1010,6 +1038,8 @@
                               help='Specify number of test onos instances to spawn')
     parser_setup.add_argument('-v', '--shared-volume', action='store_true',
                               help='Start ONOS cluster instances with shared volume')
+    parser_setup.add_argument('-async', '--async-mode', action='store_true',
+                              help='Start ONOS cluster instances in async mode')
     parser_setup.add_argument('-f', '--foreground', action='store_true', help='Run in foreground')
     parser_setup.set_defaults(func=setupCordTester)
 
diff --git a/src/test/utils/CordContainer.py b/src/test/utils/CordContainer.py
index 8391273..24aa6b5 100644
--- a/src/test/utils/CordContainer.py
+++ b/src/test/utils/CordContainer.py
@@ -17,13 +17,16 @@
 import io
 import json
 import yaml
+import errno
 from pyroute2 import IPRoute
+from pyroute2.netlink import NetlinkError
 from itertools import chain
 from nsenter import Namespace
 from docker import Client
 from shutil import rmtree
 from OnosCtrl import OnosCtrl
 from OnosLog import OnosLog
+from threadPool import ThreadPool
 
 class docker_netns(object):
 
@@ -159,7 +162,14 @@
                 ip = IPRoute()
                 br = ip.link_lookup(ifname=quagga_config['bridge'])
                 if len(br) == 0:
-                    ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+                    try:
+                        ip.link_create(ifname=quagga_config['bridge'], kind='bridge')
+                    except NetlinkError as e:
+                        err, _ = e.args
+                        if err == errno.EEXIST:
+                            pass
+                        else:
+                            raise NetlinkError(*e.args)
                     br = ip.link_lookup(ifname=quagga_config['bridge'])
                 br = br[0]
                 ip.link('set', index=br, state='up')
@@ -299,7 +309,7 @@
 
 class Onos(Container):
 
-    quagga_config = ( { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, )
+    quagga_config = [ { 'bridge' : 'quagga-br', 'ip': '10.10.0.4', 'mask' : 16 }, ]
     SYSTEM_MEMORY = (get_mem(),) * 2
     INSTANCE_MEMORY = (get_mem(instances=3),) * 2
     JAVA_OPTS = '-Xms{} -Xmx{} -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode'.format(*SYSTEM_MEMORY)#-XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
@@ -310,7 +320,7 @@
                        ('igmp', '1.1-SNAPSHOT'),
                        #('vtn', '1.1-SNAPSHOT'),
                        )
-    ports = [ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
+    ports = [] #[ 8181, 8101, 9876, 6653, 6633, 2000, 2620 ]
     setup_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'setup')
     host_config_dir = os.path.join(setup_dir, 'onos-config')
     guest_config_dir = '/root/onos/config'
@@ -377,7 +387,7 @@
 
     def __init__(self, name = NAME, image = IMAGE, prefix = PREFIX, tag = TAG,
                  boot_delay = 20, restart = False, network_cfg = None,
-                 cluster = False, data_volume = None):
+                 cluster = False, data_volume = None, async = False, quagga_config = None):
         if restart is True:
             ##Find the right image to restart
             running_image = filter(lambda c: c['Names'][0] == '/{}'.format(name), self.dckr.containers())
@@ -388,6 +398,8 @@
                     tag = image_name.split(':')[1]
                 except: pass
 
+        if quagga_config is not None:
+            self.quagga_config = quagga_config
         super(Onos, self).__init__(name, image, prefix = prefix, tag = tag, quagga_config = self.quagga_config)
         self.boot_delay = boot_delay
         self.data_map = None
@@ -422,31 +434,70 @@
                 json_data = json.dumps(network_cfg, indent=4)
                 with open('{}/network-cfg.json'.format(self.host_config_dir), 'w') as f:
                     f.write(json_data)
-            print('Starting ONOS container %s' %self.name)
-            self.start(ports = self.ports, environment = self.env,
-                       host_config = self.host_config, volumes = self.volumes, tty = True)
-            if not restart:
-                ##wait a bit before fetching IP to regenerate cluster cfg
-                time.sleep(5)
-                ip = self.ip()
-                ##Just a quick hack/check to ensure we don't regenerate in the common case.
-                ##As ONOS is usually the first test container that is started
-                if cluster is False:
-                    if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
-                        print('Regenerating ONOS cluster cfg for ip %s' %ip)
-                        self.generate_cluster_cfg(ip)
-                        self.kill()
-                        self.remove_container(self.name, force=True)
-                        print('Restarting ONOS container %s' %self.name)
-                        self.start(ports = self.ports, environment = self.env,
-                                   host_config = self.host_config, volumes = self.volumes, tty = True)
-            print('Waiting for ONOS to boot')
-            time.sleep(boot_delay)
-            self.wait_for_onos_start(self.ip())
+            if cluster is False or async is False:
+                print('Starting ONOS container %s' %self.name)
+                self.start(ports = self.ports, environment = self.env,
+                           host_config = self.host_config, volumes = self.volumes, tty = True)
+                if not restart:
+                    ##wait a bit before fetching IP to regenerate cluster cfg
+                    time.sleep(5)
+                    ip = self.ip()
+                    ##Just a quick hack/check to ensure we don't regenerate in the common case.
+                    ##As ONOS is usually the first test container that is started
+                    if cluster is False:
+                        if ip != self.CLUSTER_CFG_IP or not os.access(self.cluster_cfg, os.F_OK):
+                            print('Regenerating ONOS cluster cfg for ip %s' %ip)
+                            self.generate_cluster_cfg(ip)
+                            self.kill()
+                            self.remove_container(self.name, force=True)
+                            print('Restarting ONOS container %s' %self.name)
+                            self.start(ports = self.ports, environment = self.env,
+                                       host_config = self.host_config, volumes = self.volumes, tty = True)
+                print('Waiting for ONOS to boot')
+                time.sleep(boot_delay)
+                self.wait_for_onos_start(self.ip())
+                self.running = True
+            else:
+                self.running = False
+        else:
+            self.running = True
+        if self.running:
+            self.ipaddr = self.ip()
+            if cluster is False:
+                self.install_cord_apps(self.ipaddr)
 
+    @classmethod
+    def get_quagga_config(cls, instance = 0):
+        quagga_config = cls.quagga_config[:]
+        if instance == 0:
+            return quagga_config
+        ip = quagga_config[0]['ip']
+        octets = ip.split('.')
+        octets[3] = str((int(octets[3]) + 1) & 255)
+        ip = '.'.join(octets)
+        quagga_config[0]['ip'] = ip
+        return quagga_config
+
+    @classmethod
+    def start_cluster_async(cls, onos_instances):
+        instances = filter(lambda o: o.running == False, onos_instances)
+        if not instances:
+            return
+        tpool = ThreadPool(len(instances), queue_size = 1, wait_timeout = 1)
+        for onos in instances:
+            tpool.addTask(onos.start_async)
+        tpool.cleanUpThreads()
+
+    def start_async(self):
+        print('Starting ONOS container %s' %self.name)
+        self.start(ports = self.ports, environment = self.env,
+                   host_config = self.host_config, volumes = self.volumes, tty = True)
+        time.sleep(3)
         self.ipaddr = self.ip()
-        if cluster is False:
-            self.install_cord_apps(self.ipaddr)
+        print('Waiting for ONOS container %s to start' %self.name)
+        self.wait_for_onos_start(self.ipaddr)
+        self.running = True
+        print('ONOS container %s started' %self.name)
 
     @classmethod
     def wait_for_onos_start(cls, ip, tries = 30):