Support source-specific IGMP joins.

Change-Id: I422f54f908998460ceff994f9b3bfbf3d2f81a56
diff --git a/src/main/java/org/onosproject/igmp/IgmpSnoop.java b/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index c473f25..96a94cb 100644
--- a/src/main/java/org/onosproject/igmp/IgmpSnoop.java
+++ b/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -320,32 +320,57 @@
 
             IGMPMembership membership = (IGMPMembership) group;
 
-            // TODO allow pulling source from IGMP packet
-            IpAddress source = ssmTranslateTable.get(group.getGaddr());
-            if (source == null) {
-                log.warn("No source found in SSM translate table for {}", group.getGaddr());
-                return;
-            }
-
-            McastRoute route = new McastRoute(source,
-                    group.getGaddr(),
-                    McastRoute.Type.IGMP);
+            IpAddress groupAddress = membership.getGaddr();
 
             if (membership.getRecordType() == IGMPMembership.MODE_IS_INCLUDE ||
                     membership.getRecordType() == IGMPMembership.CHANGE_TO_INCLUDE_MODE) {
 
-                multicastService.removeSink(route, location);
-                // TODO remove route if all sinks are gone
+                if (membership.getSources().isEmpty()) {
+                    McastRoute route = ssmTranslateRoute(groupAddress);
+                    if (route != null) {
+                        removeRoute(route, location);
+                    }
+                } else {
+                    membership.getSources().stream()
+                            .map(source -> new McastRoute(source, groupAddress, McastRoute.Type.IGMP))
+                            .forEach(route -> addRoute(route, location));
+                }
             } else if (membership.getRecordType() == IGMPMembership.MODE_IS_EXCLUDE ||
                     membership.getRecordType() == IGMPMembership.CHANGE_TO_EXCLUDE_MODE) {
 
-                multicastService.add(route);
-                multicastService.addSink(route, location);
+                if (membership.getSources().isEmpty()) {
+                    McastRoute route = ssmTranslateRoute(groupAddress);
+                    if (route != null) {
+                        addRoute(route, location);
+                    }
+                } else {
+                    membership.getSources().stream()
+                            .map(source -> new McastRoute(source, groupAddress, McastRoute.Type.IGMP))
+                            .forEach(route -> removeRoute(route, location));
+                }
             }
-
         });
     }
 
+    private McastRoute ssmTranslateRoute(IpAddress group) {
+        IpAddress source = ssmTranslateTable.get(group);
+        if (source == null) {
+            log.warn("No SSM translate source found for group {}", group);
+            return null;
+        }
+        return new McastRoute(source, group, McastRoute.Type.IGMP);
+    }
+
+    private void addRoute(McastRoute route, ConnectPoint location) {
+        multicastService.add(route);
+        multicastService.addSink(route, location);
+    }
+
+    private void removeRoute(McastRoute route, ConnectPoint location) {
+        multicastService.removeSink(route, location);
+        // TODO remove route if all sinks are gone
+    }
+
     private ByteBuffer buildQueryPacket() {
         IGMP igmp = new IGMP();
         igmp.setIgmpType(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY);