Test: Changes to subscriber multitable changes for single channel, N subscriber tests.
Changes to subscriber onos app to remove metabuilder constraints for next flow objectives.

Change-Id: I6fb91e9706df122c42fce7eb4d6598f3a56b4d71
diff --git a/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar b/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
index 892d066..d22de1b 100644
--- a/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
+++ b/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
Binary files differ
diff --git a/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
index 441a0a4..a2ed133 100644
--- a/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
+++ b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
@@ -233,23 +233,18 @@
             return;
         }
         ConnectPoint loc = info.sink().get();
-        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
-            .matchEthType(Ethernet.TYPE_IPV4)
-            .matchIPDst(info.route().group().toIpPrefix());
-        if (vlanEnabled) {
-            metabuilder.matchVlanId(VlanId.vlanId((short) mcastVlan));
-        }
+        log.info("Removing flow for subscriber port: {}, group {}",
+                loc.port(), info.route().group());
         NextObjective next = DefaultNextObjective.builder()
                 .fromApp(appId)
                 .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
                 .withType(NextObjective.Type.BROADCAST)
-                .withMeta(metabuilder.build())
                 .withId(groups.get(info.route().group()))
                 .removeFromExisting(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
                         //TODO: change to debug
-                        log.info("Next Objective {} installed", objective.id());
+                        log.info("Next Objective {} removed", objective.id());
                     }
 
                     @Override
@@ -281,17 +276,10 @@
 
         Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
             Integer id = flowObjectiveService.allocateNextId();
-            TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPDst(g.toIpPrefix());
-            if (vlanEnabled) {
-                metabuilder.matchVlanId(VlanId.vlanId((short) mcastVlan));
-            }
             NextObjective next = DefaultNextObjective.builder()
                     .fromApp(appId)
                     .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
                     .withType(NextObjective.Type.BROADCAST)
-                    .withMeta(metabuilder.build())
                     .withId(id)
                     .add(new ObjectiveContext() {
                         @Override
@@ -303,7 +291,7 @@
                         @Override
                         public void onError(Objective objective, ObjectiveError error) {
                             //TODO: change to debug
-                            log.info("Next Objective {} failed, because {}",
+                            log.info("Next Objective {} failed to add, because {}",
                                     objective.id(),
                                     error);
                         }
@@ -312,9 +300,9 @@
             flowObjectiveService.next(sink.deviceId(), next);
 
             TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
-                    .matchInPort(oltInfo.uplink())
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPDst(g.toIpPrefix());
+                .matchInPort(oltInfo.uplink())
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(g.toIpPrefix());
 
             if (vlanEnabled) {
                 mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
@@ -344,31 +332,28 @@
             flowObjectiveService.forward(sink.deviceId(), fwd);
 
             sync.set(true);
-            log.info("Installed flows for device: {}, id {}, ip {}", sink.deviceId(), id, g.toIpPrefix());
+            log.info("Installed flows for device: {}, id {}, ip {}, port {}",
+                    sink.deviceId(), id, g.toIpPrefix(), sink.port());
             return id;
         });
 
         if (!sync.get()) {
-            TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPDst(route.group().toIpPrefix());
             NextObjective next = DefaultNextObjective.builder()
                     .fromApp(appId)
                     .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
                     .withType(NextObjective.Type.BROADCAST)
                     .withId(nextId)
-                    .withMeta(metabuilder.build())
                     .addToExisting(new ObjectiveContext() {
                         @Override
                         public void onSuccess(Objective objective) {
                             //TODO: change to debug
-                            log.info("Next Objective {} installed", objective.id());
+                            log.info("Next Objective {} installed to existing", objective.id());
                         }
 
                         @Override
                         public void onError(Objective objective, ObjectiveError error) {
                             //TODO: change to debug
-                            log.info("Next Objective {} failed, because {}",
+                            log.info("Next Objective {} failed to install to existing, because {}",
                                     objective.id(),
                                     error);
                         }
@@ -376,8 +361,8 @@
 
             flowObjectiveService.next(sink.deviceId(), next);
 
-            log.info("Append flows for device {}, id {}, ip {}", sink.deviceId(), nextId,
-                     route.group().toIpPrefix());
+            log.info("Append flows for device {}, id {}, ip {}, port {}", sink.deviceId(), nextId,
+                    route.group().toIpPrefix(), sink.port());
         }
 
     }
diff --git a/src/test/setup/olt_config.json b/src/test/setup/olt_config.json
index 2840af6..88380e6 100644
--- a/src/test/setup/olt_config.json
+++ b/src/test/setup/olt_config.json
@@ -1,2 +1,2 @@
-{ "olt" : false , "port_map" : { "ports" : [ "veth0", "veth2" ], "tx" : "veth2", "rx" : "veth0", "host" : "enp0s8", "start_vlan" : 1000 }, "uplink" : 2, "vlan" : 0 }
-  
+{ "olt" : false , "port_map" : { "ports" : [ "veth0", "veth2" ], "tx" : "veth2", "rx" : "veth0", "host" : "eth0", "start_vlan" : 1000 }, "uplink" : 2, "vlan" : 0 }
+
diff --git a/src/test/subscriberMultiTable/subscriberMultiTableTest.py b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
index c245bd0..10f61bd 100644
--- a/src/test/subscriberMultiTable/subscriberMultiTableTest.py
+++ b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
@@ -48,6 +48,7 @@
                   self.tx_intf = self.port_map[self.PORT_TX_DEFAULT]
                   self.rx_intf = self.port_map[self.PORT_RX_DEFAULT]
 
+            log.info('Subscriber %s, rx interface %s, uplink interface %s' %(name, self.rx_intf, self.tx_intf))
             Channels.__init__(self, num, channel_start = channel_start,
                               iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
             self.name = name
@@ -61,6 +62,7 @@
             self.join_map = {}
             ##accumulated join recv stats
             self.join_rx_stats = Stats()
+            self.recv_timeout = False
 
       def has_service(self, service):
             if self.service_map.has_key(service):
@@ -116,13 +118,20 @@
                   if self.join_map.has_key(c):
                         self.join_map[c][stats_type].update(packets = packets, t = t)
 
-      def channel_receive(self, chan, cb = None, count = 1):
-            log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
-            self.recv(chan, cb = cb, count = count)
+      def channel_receive(self, chan, cb = None, count = 1, timeout = 5):
+            log.info('Subscriber %s on port %s receiving from group %s, channel %d' %
+                     (self.name, self.rx_intf, self.gaddr(chan), chan))
+            r = self.recv(chan, cb = cb, count = count, timeout = timeout)
+            if self.recv_timeout:
+                  ##Negative test case is disabled for now
+                  assert_equal(len(r), 0)
 
       def recv_channel_cb(self, pkt):
             ##First verify that we have received the packet for the joined instance
-            log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+            log.info('Packet received for group %s, subscriber %s, port %s' %
+                     (pkt[IP].dst, self.name, self.rx_intf))
+            if self.recv_timeout:
+                  return
             chan = self.caddr(pkt[IP].dst)
             assert_equal(chan in self.join_map.keys(), True)
             recv_time = monotonic.monotonic() * 1000000
@@ -179,6 +188,10 @@
                   },
               }
       test_services = ('IGMP',)
+      num_joins = 0
+      num_subscribers = 0
+      num_channels = 0
+      recv_timeout = False
 
       @classmethod
       def setUpClass(cls):
@@ -250,14 +263,7 @@
             else:
                   config = network_cfg
             log.info('Restarting ONOS with new network configuration')
-            cfg = json.dumps(config)
-            with open('{}/network-cfg.json'.format(cls.onos_config_path), 'w') as f:
-                  f.write(cfg)
-
-            try:
-                  return cord_test_onos_restart()
-            except:
-                  return False
+            return cord_test_onos_restart(config = config)
 
       @classmethod
       def remove_onos_config(cls):
@@ -379,14 +385,28 @@
       def igmp_verify(self, subscriber):
             chan = 0
             if subscriber.has_service('IGMP'):
-                  for i in range(5):
-                        log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
-                        subscriber.channel_join(chan, delay = 0)
-                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                  ##We wait for all the subscribers to join before triggering leaves
+                  if subscriber.rx_port > 1:
+                        time.sleep(5)
+                  subscriber.channel_join(chan, delay = 0)
+                  self.num_joins += 1
+                  while self.num_joins < self.num_subscribers:
+                        time.sleep(5)
+                  log.info('All subscribers have joined the channel')
+                  for i in range(10):
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
                         log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
                         subscriber.channel_leave(chan)
-                        time.sleep(3)
+                        time.sleep(5)
                         log.info('Interface %s Join RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name,subscriber.join_rx_stats))
+                        #Should not receive packets for this subscriber
+                        self.recv_timeout = True
+                        subscriber.recv_timeout = True
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
+                        subscriber.recv_timeout = False
+                        self.recv_timeout = False
+                        log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_join(chan, delay = 0)
                   self.test_status = True
 
       def igmp_jump_verify(self, subscriber):
@@ -454,8 +474,10 @@
                                num_channels = num_channels, channel_start = channel_start, port_list = port_list)
           self.onos_aaa_load()
           self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+          chan_leave = False #for single channel, multiple subscribers
           if cbs is None:
                 cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
+                chan_leave = True
           for subscriber in self.subscriber_list:
                 subscriber.start()
                 pool_object = subscriber_pool(subscriber, cbs)
@@ -463,33 +485,39 @@
           self.thread_pool.cleanUpThreads()
           for subscriber in self.subscriber_list:
                 subscriber.stop()
+                if chan_leave is True:
+                      subscriber.channel_leave(0)
+          self.num_subscribers = 0
           return self.test_status
 
       def test_subscriber_join_recv(self):
           """Test subscriber join and receive"""
-          num_subscribers = 5
-          num_channels = 1
-          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
-                                                    num_channels = num_channels,
-                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+          self.num_subscribers = 5
+          self.num_channels = 1
+          test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+                                                    num_channels = self.num_channels,
+                                                    port_list = self.generate_port_list(self.num_subscribers,
+                                                                                        self.num_channels))
           assert_equal(test_status, True)
 
       def test_subscriber_join_jump(self):
           """Test subscriber join and receive for channel surfing"""
-          num_subscribers = 5
-          num_channels = 5
-          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
-                                                    num_channels = num_channels,
+          self.num_subscribers = 5
+          self.num_channels = 10
+          test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+                                                    num_channels = self.num_channels,
                                                     cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify),
-                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+                                                    port_list = self.generate_port_list(self.num_subscribers,
+                                                                                        self.num_channels))
           assert_equal(test_status, True)
 
       def test_subscriber_join_next(self):
           """Test subscriber join next for channels"""
-          num_subscribers = 5
-          num_channels = 5
-          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
-                                                    num_channels = num_channels,
+          self.num_subscribers = 5
+          self.num_channels = 10
+          test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+                                                    num_channels = self.num_channels,
                                                     cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify),
-                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+                                                    port_list = self.generate_port_list(self.num_subscribers,
+                                                                                        self.num_channels))
           assert_equal(test_status, True)
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index ccfebb5..f0f5960 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -1,12 +1,12 @@
-# 
+#
 # Copyright 2016-present Ciena Corporation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
-# 
+#
 # http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,7 +36,7 @@
     IP_DST = '224.0.1.1'
     igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
     igmp_ip = IP(dst = IP_DST, src = IP_SRC)
-    ssm_list = [] 
+    ssm_list = []
 
     def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2):
         self.iface = iface
@@ -45,7 +45,7 @@
         self.delay = delay
         self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
         self.onos_ctrl.activate()
-    
+
     def igmp_load_ssm_config(self, ssm_list = []):
         if not ssm_list:
             ssm_list = self.ssm_list
@@ -124,13 +124,12 @@
         self.streams = None
         self.channel_states = {}
         self.last_chan = None
-        self.recv_sock = L2Socket(iface = iface, type = ETH_P_IP)
         self.iface_mcast = iface_mcast
         self.mcast_cb = mcast_cb
         for c in range(self.num):
             self.channel_states[c] = [self.Idle]
         IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
-        
+
     def generate(self, num, channel_start = 0):
         start = (225 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
             ( ( (channel_start >> 8) & 0xff ) << 8 ) | (channel_start) & 0xff
@@ -187,7 +186,7 @@
         if chan == self.last_chan:
             self.last_chan = None
         return True
-    
+
     def join_next(self, chan = None):
         if chan is None:
             chan = self.last_chan
@@ -198,7 +197,7 @@
         else:
             leave = chan - 1
             join = chan
-        
+
         if join >= self.num:
             join = 0
 
@@ -238,7 +237,7 @@
         recv_time = monotonic.monotonic()
         log.debug('Packet received in %.3f usecs' %(recv_time - send_time))
 
-    def recv(self, chan, cb = None, count = 1):
+    def recv(self, chan, cb = None, count = 1, timeout = 5):
         if chan is None:
             return None
         if type(chan) == type([]) or type(chan) == type(()):
@@ -248,7 +247,8 @@
             groups = (self.gaddr(chan),)
         if cb is None:
             cb = self.recv_cb
-        sniff(prn = cb, count=count, lfilter = lambda p: IP in p and p[IP].dst in groups, opened_socket = self.recv_sock)
+        return sniff(prn = cb, count=count, timeout = timeout,
+                     lfilter = lambda p: IP in p and p[IP].dst in groups, iface = self.iface)
 
     def stop(self):
         if self.streams: