Changes to ciena cordigmp onos app to reference count igmp group entries.
This allows us to test multiple subscribers with single channel.
This is required since ONOS igmp app implement unprovisioning the sink when its empty.
Thus any multicast sink provider like ciena-cordigmp onos app would end up
unprovisioning the flow in case there are other subscribers listening on the same channel.

Changes to subscriber tests to re-use the same channel for multiple subscribers
for single channel, multiple subscriber test case.

In order to use this, one has to kill the existing onos container to force a re-install
of the ciena-cordigmp app through the cord-test.sh script.
diff --git a/src/test/apps/ciena-cordigmp-1.0-SNAPSHOT.oar b/src/test/apps/ciena-cordigmp-1.0-SNAPSHOT.oar
index 6cc8de8..2d843dc 100644
--- a/src/test/apps/ciena-cordigmp-1.0-SNAPSHOT.oar
+++ b/src/test/apps/ciena-cordigmp-1.0-SNAPSHOT.oar
Binary files differ
diff --git a/src/test/apps/ciena-cordigmp/src/main/java/org/ciena/cordigmp/CordIgmp.java b/src/test/apps/ciena-cordigmp/src/main/java/org/ciena/cordigmp/CordIgmp.java
index 4183e2f..ce69b4b 100644
--- a/src/test/apps/ciena-cordigmp/src/main/java/org/ciena/cordigmp/CordIgmp.java
+++ b/src/test/apps/ciena-cordigmp/src/main/java/org/ciena/cordigmp/CordIgmp.java
@@ -20,6 +20,8 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.ConcurrentHashMultiset;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.WebResource;
@@ -142,12 +144,12 @@
             new InternalNetworkConfigListener();
     private DeviceListener deviceListener = new InternalDeviceListener();
 
-    //TODO: move this to a ec map
-    private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
-
     //Map of IGMP groups to port
     private Map<IpAddress, IgmpPortPair> cordIgmpTranslateTable = Maps.newConcurrentMap();
 
+    //Count of group joins
+    private Multiset<IpAddress> cordIgmpCountTable = ConcurrentHashMultiset.create();
+    
     //TODO: move this to distributed atomic long
     private AtomicInteger channels = new AtomicInteger(0);
 
@@ -400,10 +402,6 @@
     }
 
     private void unprovisionGroup(McastRouteInfo info) {
-        if (info.sinks().isEmpty()) {
-            removeRemoteRoute(info.route());
-        }
-
         if (!info.sink().isPresent()) {
             log.warn("No sink given after sink removed event: {}", info);
             return;
@@ -419,25 +417,27 @@
             log.warn("Ignoring unprovisioning for group " + info.route().group() + " with no port map");
             return;
         }
-        final PortNumber inPort = PortNumber.portNumber(portPair.inputPort());
-        final PortNumber outPort = PortNumber.portNumber(portPair.outputPort());
-        TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
-            .matchInPort(inPort)
-            .matchEthType(Ethernet.TYPE_IPV4)
-            .matchIPDst(info.route().group().toIpPrefix());
-        TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
-        FlowEntry.Builder flowEntry = DefaultFlowEntry.builder();
-        treatment.add(Instructions.createOutput(outPort));
-        flowEntry.forDevice(loc.deviceId());
-        flowEntry.withPriority(priority);
-        flowEntry.withSelector(mcast.build());
-        flowEntry.withTreatment(treatment.build());
-        flowEntry.fromApp(appId);
-        flowEntry.makePermanent();
-        flowRuleService.removeFlowRules(flowEntry.build());
-        groups.remove(info.route().group());
-        log.warn("Flow rule removed for for device id " + loc.deviceId());
-        return;
+        if(cordIgmpCountTable.remove(info.route().group(), 1) <= 1) {
+            //Remove flow for last channel leave
+            final PortNumber inPort = PortNumber.portNumber(portPair.inputPort());
+            final PortNumber outPort = PortNumber.portNumber(portPair.outputPort());
+            TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
+                .matchInPort(inPort)
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(info.route().group().toIpPrefix());
+            TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
+            FlowEntry.Builder flowEntry = DefaultFlowEntry.builder();
+            treatment.add(Instructions.createOutput(outPort));
+            flowEntry.forDevice(loc.deviceId());
+            flowEntry.withPriority(priority);
+            flowEntry.withSelector(mcast.build());
+            flowEntry.withTreatment(treatment.build());
+            flowEntry.fromApp(appId);
+            flowEntry.makePermanent();
+            flowRuleService.removeFlowRules(flowEntry.build());
+            removeRemoteRoute(info.route());
+            log.warn("Flow rule removed for for device id " + loc.deviceId());
+        }
     }
 
     private void provisionGroup(McastRoute route, ConnectPoint sink) {
@@ -445,7 +445,6 @@
         checkNotNull(sink, "Sink cannot be null");
 
         AccessDeviceData oltInfo = oltData.get(sink.deviceId());
-        final AtomicBoolean sync = new AtomicBoolean(false);
         if(oltInfo != null) {
             log.warn("Ignoring provisioning mcast route for OLT device: " + sink.deviceId());
             return;
@@ -455,13 +454,14 @@
             log.warn("Ports for Group " + route.group() + " not found in cord igmp map. Skipping provisioning.");
             return;
         }
-        Integer ret = groups.computeIfAbsent(route.group(), (g) -> {
+        if(cordIgmpCountTable.count(route.group()) == 0) {
+            //First group entry. Provision the flows
             final PortNumber inPort = PortNumber.portNumber(portPair.inputPort());
             final PortNumber outPort = PortNumber.portNumber(portPair.outputPort());
             TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
                     .matchInPort(inPort)
                     .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPDst(g.toIpPrefix());
+                    .matchIPDst(route.group().toIpPrefix());
             TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
             FlowEntry.Builder flowEntry = DefaultFlowEntry.builder();
             treatment.add(Instructions.createOutput(outPort));
@@ -473,11 +473,9 @@
             flowEntry.makePermanent();
             flowRuleService.applyFlowRules(flowEntry.build());
             log.warn("Flow rules applied for device id " + sink.deviceId());
-            return 0;
-        });
-        if (ret == 0) {
             addRemoteRoute(route);
         }
+        cordIgmpCountTable.add(route.group());
     }
 
     private void addRemoteRoute(McastRoute route) {
@@ -578,6 +576,7 @@
                                         CORD_IGMP_TRANSLATE_CONFIG_CLASS);
                         if (config != null) {
                             cordIgmpTranslateTable.clear();
+                            cordIgmpCountTable.clear();
                             config.getCordIgmpTranslations().forEach(
                                                                      mcastPorts -> cordIgmpTranslateTable.put(mcastPorts.group(), mcastPorts.portPair()));
                         }
diff --git a/src/test/subscriber/subscriberTest.py b/src/test/subscriber/subscriberTest.py
index 598f609..fca9335 100644
--- a/src/test/subscriber/subscriberTest.py
+++ b/src/test/subscriber/subscriberTest.py
@@ -41,8 +41,8 @@
                   self.tx_intf = self.port_map[tx_port]
                   self.rx_intf = self.port_map[rx_port]
             except:
-                  self.tx_intf = self.port_map[PORT_TX_DEFAULT]
-                  self.rx_intf = self.port_map[PORT_RX_DEFAULT]
+                  self.tx_intf = self.port_map[self.PORT_TX_DEFAULT]
+                  self.rx_intf = self.port_map[self.PORT_RX_DEFAULT]
 
             Channels.__init__(self, num, channel_start = channel_start, 
                               iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
@@ -299,11 +299,15 @@
                   log.info('Interface %s Join Next RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
                   self.test_status = True
 
-      def generate_port_list(self, num):
+      def generate_port_list(self, subscribers, channels):
             port_list = []
-            for i in xrange(num):
-                  rx_port = 2*i+1
-                  tx_port = 2*i+2
+            for i in xrange(subscribers):
+                  if channels > 1:
+                        rx_port = 2*i+1
+                        tx_port = 2*i+2
+                  else:
+                        rx_port = Subscriber.PORT_RX_DEFAULT
+                        tx_port = Subscriber.PORT_TX_DEFAULT
                   port_list.append((tx_port, rx_port))
             return port_list
 
@@ -315,7 +319,7 @@
             self.subscriber_info = self.subscriber_db.read(num)
             self.subscriber_list = []
             if not port_list:
-                  port_list = self.generate_port_list(num)
+                  port_list = self.generate_port_list(num, num_channels)
 
             index = 0
             for info in self.subscriber_info:
@@ -326,7 +330,8 @@
                                                          channel_start = channel_start,
                                                          tx_port = port_list[index][0],
                                                          rx_port = port_list[index][1]))
-                  channel_start += num_channels
+                  if num_channels > 1:
+                        channel_start += num_channels
                   index += 1
 
             #load the ssm list for all subscriber channels
@@ -364,24 +369,28 @@
       def test_subscriber_join_recv(self):
           """Test subscriber join and receive"""
           num_subscribers = 50
+          num_channels = 1
           test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
-                                                    num_channels = 1, port_list = self.generate_port_list(num_subscribers))
+                                                    num_channels = num_channels,
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
           assert_equal(test_status, True)
 
       def test_subscriber_join_jump(self):
           """Test subscriber join and receive for channel surfing""" 
           num_subscribers = 5
+          num_channels = 50
           test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
-                                                    num_channels = 50,
+                                                    num_channels = num_channels,
                                                     cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify),
-                                                    port_list = self.generate_port_list(num_subscribers))
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
           assert_equal(test_status, True)
 
       def test_subscriber_join_next(self):
           """Test subscriber join next for channels"""
           num_subscribers = 5
+          num_channels = 50
           test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
-                                                    num_channels = 50,
+                                                    num_channels = num_channels,
                                                     cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify),
-                                                    port_list = self.generate_port_list(num_subscribers))
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
           assert_equal(test_status, True)