Test-Voltha:
    Fixing issues in IGMP tests.

Change-Id: Ie20efb43db03f6da0c799b39fecb075d29122b95
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index 594b4f9..83706b3 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -147,6 +147,7 @@
         self.iface_mcast = iface_mcast
         self.mcast_cb = mcast_cb
         self.src_list = src_list
+        self.streams_list = []
         for c in range(self.num):
             self.channel_states[c] = [self.Idle]
         IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface, src_list = src_list)
@@ -173,8 +174,19 @@
         if self.state == self.Stopped:
             if self.streams:
                 self.streams.stop()
-            self.streams = McastTraffic(self.channels, iface=self.iface_mcast, cb = self.mcast_cb)
-            self.streams.start()
+            if self.streams_list:
+               for i in range(len(self.streams_list)):
+                  self.streams_list[i].stop()
+            if self.src_list:
+               for i in range(len(self.src_list)):
+                  self.streams_list.append(McastTraffic(self.channels, iface=self.iface_mcast, cb = self.mcast_cb, src_ip = self.src_list[i]))
+                  self.streams_list[i].start()
+#               self.streams = McastTraffic(self.channels, iface=self.iface_mcast, cb = self.mcast_cb)
+#               self.streams.start()
+
+            else:
+                self.streams = McastTraffic(self.channels, iface=self.iface_mcast, cb = self.mcast_cb)
+                self.streams.start()
             self.state = self.Started
 
     def join(self, chan = None, src_list = None, record_type = None):
@@ -208,8 +220,8 @@
             self.last_chan = None
         return True
 
-    def join_next(self, chan = None, leave_flag = True):
-        if chan is None:
+    def join_next(self, chan = None, src_list = None, leave_flag = True):
+        if chan is None and self.last_chan is not None:
             chan = self.last_chan
             if chan is None:
                 return None
@@ -218,15 +230,18 @@
         else:
             leave = chan - 1
             join = chan
+        else:
+            leave = 0
+            join = 0
 
         if join >= self.num:
             join = 0
 
         if leave >= 0 and leave != join:
             if leave_flag is True:
-                self.leave(leave)
+                self.leave(leave, src_list = src_list)
 
-        return self.join(join)
+        return self.join(join, src_list = src_list)
 
     def jump(self):
         chan = self.last_chan
@@ -281,6 +296,9 @@
     def stop(self):
         if self.streams:
             self.streams.stop()
+        if self.streams_list:
+           for i in range(len(self.streams_list)):
+               self.streams_list[i].stop()
         self.state = self.Stopped
 
     def get_state(self, chan):
diff --git a/src/test/voltha/volthaTest.py b/src/test/voltha/volthaTest.py
index c35e95f..b2c08e1 100644
--- a/src/test/voltha/volthaTest.py
+++ b/src/test/voltha/volthaTest.py
@@ -1,7 +1,7 @@
 import os
 import sys
 import unittest
-import time
+import time, monotonic
 import json
 import requests
 import threading
@@ -33,6 +33,8 @@
 
 class Voltha_olt_subscribers(Channels):
 
+      STATS_RX = 0
+      STATS_TX = 1
       STATS_JOIN = 2
       STATS_LEAVE = 3
 
@@ -40,6 +42,7 @@
           self.tx_port = tx_port
           self.rx_port = rx_port
           self.src_list = src_list
+          self.num_channels = num_channels
           try:
               self.tx_intf = tx_port
               self.rx_intf = rx_port
@@ -74,13 +77,13 @@
             self.channel_join_update(chan, join_time)
             return chan
 
-      def channel_join_next(self, delay = 2, leave_flag = True):
+      def channel_join_next(self, delay = 2, src_list = None, leave_flag = True):
             '''Joins the next channel leaving the last channel'''
             if self.last_chan:
                   if self.join_map.has_key(self.last_chan):
                         del self.join_map[self.last_chan]
             self.delay = delay
-            chan, join_time = self.join_next(leave_flag = leave_flag)
+            chan, join_time = self.join_next(src_list = src_list, leave_flag = leave_flag)
             self.channel_join_update(chan, join_time)
             return chan
 
@@ -109,24 +112,28 @@
                         self.join_map[c][stats_type].update(packets = packets, t = t)
 
       def channel_receive(self, chan, cb = None, count = 1, timeout = 5, src_list = None):
-            log_test.info('Subscriber on port %s receiving from group %s, channel %d' %
+            log_test.info('Subscriber on port %s checking data traffic receiving from group %s, channel %d' %
                      (self.rx_intf, self.gaddr(chan), chan))
             r = self.recv(chan, cb = cb, count = count, timeout = timeout, src_list = src_list)
             if len(r) == 0:
                   log_test.info('Subscriber on port %s timed out' %( self.rx_intf))
                   self.test_status = False
             else:
-                  log_test.info('Subscriber on port %s received %d packets' %(self.rx_intf, len(r)))
+                  self.test_status = True
+                  pass
+#                  log_test.info('Subscriber on port %s received %d packets' %(self.rx_intf, len(r)))
             if self.recv_timeout:
                   ##Negative test case is disabled for now
+                  log_test.info('Subscriber on port %s not received %d packets' %(self.rx_intf, len(r)))
                   assert_equal(len(r), 0)
+                  self.test_status = True
             return self.test_status
 
       def recv_channel_cb(self, pkt, src_list = None):
 
             ##First verify that we have received the packet for the joined instance
-            log_test.info('Packet received for group %s, subscriber, port %s' %
-                     (pkt[IP].dst, self.rx_intf))
+            log_test.info('Packet received for group %s, subscriber, port %s showing full packet %s'%
+                     (pkt[IP].dst, self.rx_intf, pkt.show))
             if self.recv_timeout:
                   return
             chan = self.caddr(pkt[IP].dst)
@@ -209,6 +216,8 @@
     INTF_RX_DEFAULT = 'veth0'
     INTF_2_RX_DEFAULT = 'veth6'
     TESTCASE_TIMEOUT = 300
+    VOLTHA_IGMP_ITERATIONS = 10
+#    VOLTHA_CONFIG_FAKE = True
     VOLTHA_CONFIG_FAKE = False
     VOLTHA_UPLINK_VLAN_MAP = { 'of:0000000000000001' : '222' }
     VOLTHA_UPLINK_VLAN_START = 444
@@ -383,7 +392,7 @@
         cls.num_ports = cls.port_map['num_ports']
         if cls.num_ports > 1:
               cls.num_ports -= 1 ##account for the tx port
-        cls.activate_apps(cls.apps + cls.olt_apps)
+        cls.activate_apps(cls.apps + cls.olt_apps, deactivate = True)
         cls.onos_aaa_load()
 
     @classmethod
@@ -460,12 +469,17 @@
         time.sleep(3)
 
     @classmethod
-    def activate_apps(cls, apps):
+    def activate_apps(cls, apps, deactivate = False):
         for app in apps:
             onos_ctrl = OnosCtrl(app)
-            status, _ = onos_ctrl.activate()
-            assert_equal(status, True)
-            time.sleep(2)
+            if deactivate is True:
+               onos_ctrl.deactivate()
+               time.sleep(2)
+               status, _ = onos_ctrl.activate()
+               assert_equal(status, True)
+               time.sleep(2)
+
+
 
     @classmethod
     def deactivate_apps(cls, apps):
@@ -792,7 +806,7 @@
 
            if cip is not None:
               self.success =  False
-           log_test.info('ONOS dhcp server rejected client discover with invalid source mac as expected = %s '%self.success)
+           log_test.info('ONOS dhcp server rejected client discover with invalid source mac as expected self.success = %s '%self.success)
            assert_equal(cip,None)
            log_test.info('ONOS dhcp server rejected client discover with invalid source mac as expected')
            self.test_status = True
@@ -1023,28 +1037,65 @@
 
     def igmp_flow_check(self, subscriber, multiple_sub = False):
 	chan = 0
-	subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
-	self.num_joins += 1
-	while self.num_joins < self.num_subscribers:
-	      time.sleep(5)
-	log_test.info('All subscribers have joined the channel')
-	for i in range(10):
-	    self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
-	    log_test.info('Leaving channel %d for subscriber on port %s' %(chan, subscriber.rx_port))
-	    subscriber.channel_leave(chan, src_list = subscriber.src_list)
+        for i in range(self.VOLTHA_IGMP_ITERATIONS + subscriber.num_channels):
+            if subscriber.num_channels == 1:
+               chan = subscriber.channel_join(chan, delay = 2, src_list = subscriber.src_list)
+            else:
+               chan = subscriber.channel_join_next(delay = 2, src_list = subscriber.src_list)
+            self.num_joins += 1
+            while self.num_joins < self.num_subscribers:
+	         time.sleep(5)
+            log_test.info('All subscribers have joined the channel')
+    #        for i in range(1):
+    	    time.sleep(0.5)
+	    self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1, src_list = subscriber.src_list)
+	    #log_test.info('Leaving channel %d for subscriber on port %s' %(chan, subscriber.rx_port))
+	    #subscriber.channel_leave(chan, src_list = subscriber.src_list)
 	    time.sleep(5)
-	    log_test.info('Interface %s Join RX stats for subscriber, %s' %(subscriber.iface,subscriber.join_rx_stats))
-	#Should not receive packets for this subscriber
-	    self.recv_timeout = True
-	    subscriber.recv_timeout = True
-	    subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
-	    subscriber.recv_timeout = False
-	    self.recv_timeout = False
-	    log_test.info('Joining channel %d for subscriber port %s' %(chan, subscriber.rx_port))
-	    subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+#	    log_test.info('Interface %s Join RX stats for subscriber, %s' %(subscriber.iface,subscriber.join_rx_stats))
+            if subscriber.num_channels == 1:
+               pass
+            elif chan != 0:
+            #Should not receive packets for this channel
+               self.recv_timeout = True
+               subscriber.recv_timeout = True
+               subscriber.channel_receive(chan-1, cb = subscriber.recv_channel_cb, count = 1, src_list = subscriber.src_list)
+               subscriber.recv_timeout = False
+               self.recv_timeout = False
+#	    log_test.info('Joining channel %d for subscriber port %s' %(chan, subscriber.rx_port))
+#	    subscriber.channel_join(chan, delay = 2, src_list = subscriber.src_list)
+            chan = subscriber.num_channels - i
 #                  self.test_status = True
 	return self.test_status
 
+    def igmp_leave_flow_check(self, subscriber, multiple_sub = False):
+        chan = 0
+        for i in range(self.VOLTHA_IGMP_ITERATIONS):
+            subscriber.channel_join(chan, delay = 2, src_list = subscriber.src_list)
+            self.num_joins += 1
+            while self.num_joins < self.num_subscribers:
+                 time.sleep(5)
+            log_test.info('All subscribers have joined the channel')
+#            for i in range(1):
+            time.sleep(0.5)
+            self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1, src_list = subscriber.src_list)
+            log_test.info('Leaving channel %d for subscriber on port %s' %(chan, subscriber.rx_port))
+            subscriber.channel_leave(chan, src_list = subscriber.src_list)
+            time.sleep(10)
+#           log_test.info('Interface %s Join RX stats for subscriber, %s' %(subscriber.iface,subscriber.join_rx_stats))
+        #Should not receive packets for this subscriber
+            self.recv_timeout = True
+            subscriber.recv_timeout = True
+            subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1, src_list = subscriber.src_list)
+            subscriber.recv_timeout = False
+            self.recv_timeout = False
+#           log_test.info('Joining channel %d for subscriber port %s' %(chan, subscriber.rx_port))
+#           subscriber.channel_join(chan, delay = 2, src_list = subscriber.src_list)
+#                  self.test_status = True
+        return self.test_status
+
+
+
     def igmp_flow_check_join_change_to_exclude(self, subscriber, multiple_sub = False):
 	chan = 2
 	subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
@@ -1267,7 +1318,9 @@
 
     def voltha_subscribers(self, services, cbs = None, num_subscribers = 1, num_channels = 1, src_list = None):
           """Test subscriber join next for channel surfing"""
-          voltha = VolthaCtrl(**self.voltha_attrs)
+          voltha = VolthaCtrl(self.VOLTHA_HOST,
+                              rest_port = self.VOLTHA_REST_PORT,
+                              uplink_vlan_map = self.VOLTHA_UPLINK_VLAN_MAP)
           if self.VOLTHA_OLT_TYPE.startswith('ponsim'):
              ponsim_address = '{}:50060'.format(self.VOLTHA_HOST)
              log_test.info('Enabling ponsim olt')
@@ -1328,9 +1381,8 @@
              port_list = self.generate_port_list(num_subscribers, num_channels)
           subscriber_tx_rx_ports = []
           for i in range(num_subscribers):
-              #subscriber_tx_rx_ports.append((self.port_map['ports'][port_list[i][0]], self.port_map['ports'][port_list[i][1]]))
-              subscriber_tx_rx_ports.append(Voltha_olt_subscribers(tx_port = self.port_map['ports'][port_list[i][0]],
-                                                                   rx_port = self.port_map['ports'][port_list[i][1]],
+              subscriber_tx_rx_ports.append(Voltha_olt_subscribers(tx_port = self.port_map[port_list[i][0]],
+                                                                   rx_port = self.port_map[port_list[i][1]],
                                                                    num_channels = num_channels,src_list = src_list,))
           self.onos_aaa_load()
           #load the ssm list for all subscriber channels
@@ -1351,10 +1403,27 @@
                 cbs = (self.tls_flow_check, self.dhcp_flow_check, self.igmp_flow_check)
                 chan_leave = True
           for subscriber in subscriber_tx_rx_ports:
+                if 'IGMP' in services:
+#                   if src_list:
+#                      for i in range(len(src_list)):
+#                          subscriber.start(src_ip = src_list[i])
+#                   else:
+#                      subscriber.start()
+                    subscriber.start()
                 sub_loop_count = sub_loop_count - 1
                 pool_object = voltha_subscriber_pool(subscriber, cbs)
                 self.thread_pool.addTask(pool_object.pool_cb)
           self.thread_pool.cleanUpThreads()
+          for subscriber in subscriber_tx_rx_ports:
+                if services and 'IGMP' in services:
+#                  if src_list:
+#                     for i in range(len(src_list)):
+#                         subscriber.stop(src_ip = src_list[i])
+#                  else:
+#                     subscriber.stop()
+                   subscriber.stop()
+                if chan_leave is True:
+                      subscriber.channel_leave(0)
           subscribers_count = 0
           return self.test_status
 
@@ -4488,7 +4557,7 @@
         num_subscribers = 1
         num_channels = 1
         services = ('IGMP')
-        cbs = (self.igmp_flow_check, None, None)
+        cbs = (self.igmp_leave_flow_check, None, None)
         self.voltha_subscribers(services, cbs = cbs,
                                     num_subscribers = num_subscribers,
                                     num_channels = num_channels)
@@ -4511,12 +4580,12 @@
         num_subscribers = 1
         num_channels = 1
         services = ('IGMP')
-        cbs = (self.igmp_flow_check, None, None)
+        cbs = (self.igmp_leave_flow_check, None, None)
         self.voltha_subscribers(services, cbs = cbs,
                                     num_subscribers = num_subscribers,
                                     num_channels = num_channels)
 
-    def test_subscriber_with_voltha_for_igmp_2_groups_joins_verifying_traffic(self):
+    def test_subscriber_with_voltha_for_igmp_5_groups_joins_verifying_traffic(self):
         """
         Test Method:
         0. Make sure that voltha is up and running on CORD-POD setup.
@@ -4529,14 +4598,14 @@
         """
         """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
         num_subscribers = 1
-        num_channels = 2
+        num_channels = 5
         services = ('IGMP')
         cbs = (self.igmp_flow_check, None, None)
         self.voltha_subscribers(services, cbs = cbs,
                                     num_subscribers = num_subscribers,
                                     num_channels = num_channels)
 
-    def test_subscriber_with_voltha_for_igmp_2_groups_joins_and_leave_for_one_group_verifying_traffic(self):
+    def test_subscriber_with_voltha_for_igmp_5_groups_joins_and_leave_for_one_group_verifying_traffic(self):
         """
         Test Method:
         0. Make sure that voltha is up and running on CORD-POD setup.
@@ -4552,7 +4621,7 @@
         """
         """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
         num_subscribers = 1
-        num_channels = 2
+        num_channels = 5
         services = ('IGMP')
         cbs = (self.igmp_flow_check, None, None)
         self.voltha_subscribers(services, cbs = cbs,