SEBA-642 igmpproxy should use new mcast store

- switched to onos version 1.13.9-rc4
- configuration of source through IgmpProxyConfig
- new igmpproxy config property with name "enableIgmpProvisioning" added

Change-Id: I78127d2ac1bd86f88bed0fe49d89e1e5bb6d07fa
diff --git a/.gitignore b/.gitignore
index 8bc2e12..321a2aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@
 .settings
 *.swp
 *.iml
+.idea/
\ No newline at end of file
diff --git a/app.xml b/app.xml
index d4abe21..a7b8e52 100644
--- a/app.xml
+++ b/app.xml
@@ -16,7 +16,7 @@
   -->
 <app name="org.opencord.igmpproxy" origin="Nokia" version="${project.version}"
         featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
-        features="${project.artifactId}" apps="org.opencord.mcast">
+        features="${project.artifactId}">
     <description>IGMP PROXY APP</description>
     <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
 </app>
diff --git a/pom.xml b/pom.xml
index 5fbedac..1d115a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.onosproject</groupId>
         <artifactId>onos-dependencies</artifactId>
-        <version>1.13.1</version>
+        <version>1.13.9-rc4</version>
         <relativePath></relativePath>
     </parent>
 
@@ -35,14 +35,14 @@
 
     <properties>
         <onos.app.name>org.opencord.igmpproxy</onos.app.name>
-        <onos.version>1.13.1</onos.version>
+        <onos.version>1.13.9-rc4</onos.version>
         <onos.app.category>Traffic Steering</onos.app.category>
         <onos.app.title>IGMP proxy app</onos.app.title>
         <onos.app.url>http://opencord.org</onos.app.url>
         <onos.app.readme>IGMP implementation.</onos.app.readme>
         <onos.app.requires>
             org.opencord.config,
-            org.opencord.mcast
+            org.onosproject.mcast
         </onos.app.requires>
         <cord.config.version>1.4.0</cord.config.version>
     </properties>
@@ -95,7 +95,13 @@
         <dependency>
             <groupId>org.opencord</groupId>
             <artifactId>cord-config</artifactId>
-	    <version>${cord.config.version}</version>
+            <version>${cord.config.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-apps-mcast-api</artifactId>
+            <version>${onos.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0507ec1..40d390e 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -57,8 +58,8 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
@@ -70,9 +71,11 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +98,7 @@
             IgmpproxySsmTranslateConfig.class;
     private static final Class<McastConfig> MCAST_CONFIG_CLASS =
             McastConfig.class;
+
     public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
     private static ApplicationId appId;
     private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
@@ -110,6 +114,14 @@
     private static byte igmpCos = 7;
     public static boolean connectPointMode = true;
     public static ConnectPoint connectPoint = null;
+    private static ConnectPoint sourceDeviceAndPort = null;
+    private static boolean enableIgmpProvisioning = false;
+
+    private static final Integer MAX_PRIORITY = 10000;
+    private static final String INSTALLED = "installed";
+    private static final String REMOVED = "removed";
+    private static final String INSTALLATION = "installation";
+    private static final String REMOVAL = "removal";
 
     private static boolean pimSSmInterworking = false;
     private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -135,6 +147,7 @@
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
     private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
+
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
     private DeviceListener deviceListener = new InternalDeviceListener();
@@ -155,6 +168,7 @@
                     return new IgmpproxySsmTranslateConfig();
                 }
             };
+
     private int maxResp = 10; //unit is 1 sec
     private int keepAliveInterval = 120; //unit is 1 sec
 
@@ -349,11 +363,28 @@
                 } else {
                     groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
                 }
+
+                Optional<ConnectPoint> sourceConfigured = getSource();
+                if (!sourceConfigured.isPresent()) {
+                    log.warn("Unable to process IGMP Join from {} since no source " +
+                                     "configuration is found.", deviceId);
+                    return;
+                }
+                HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
+
                 StateMachine.join(deviceId, groupIp, srcIp);
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
-                groupMember.getSourceList().forEach(source -> multicastService.addSink(
-                        new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
+                groupMember.getSourceList().forEach(source -> {
+                    McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
+                    //add route
+                    multicastService.add(route);
+                    //add source to the route
+                    multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
+                    //add sink to the route
+                    multicastService.addSinks(route, Sets.newHashSet(cp));
+                });
+
             }
             groupMember.resetAllTimers();
             groupMember.updateList(recordType, sourceList);
@@ -377,9 +408,9 @@
     private void leaveAction(GroupMember groupMember) {
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
         StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
-        groupMember.getSourceList().forEach(source -> multicastService.removeSink(
+        groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
                 new McastRoute(source, groupMember.getGroupIp(),
-                        McastRoute.Type.IGMP), cp));
+                               McastRoute.Type.IGMP), Sets.newHashSet(cp)));
         groupMemberMap.remove(groupMember.getId());
     }
 
@@ -394,35 +425,12 @@
         IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
     }
 
-    private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
-
-        //TODO migrate to packet requests when packet service uses filtering objectives
-        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
-        builder = remove ? builder.deny() : builder.permit();
-
-        FilteringObjective igmp = builder
-                .withKey(Criteria.matchInPort(port.number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
-                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
-                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
-                .fromApp(appId)
-                .withPriority(10000)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("IgmpProxy filter for {} on {} installed.",
-                                devId, port);
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.info("IgmpProxy filter for {} on {} failed because {}.",
-                                devId, port, error);
-                    }
-                });
-
-        flowObjectiveService.filter(devId, igmp);
+    /**
+     * @return connect point of the source if configured; and empty Optional otherwise.
+     */
+    public static Optional<ConnectPoint> getSource() {
+        return sourceDeviceAndPort == null ? Optional.empty() :
+                Optional.of(sourceDeviceAndPort);
     }
 
     /**
@@ -584,6 +592,10 @@
     }
 
     private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
+        if (!enableIgmpProvisioning) {
+            log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
+            return;
+        }
         //TODO migrate to packet requests when packet service uses filtering objectives
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
 
@@ -595,22 +607,24 @@
                 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
                 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
                 .fromApp(appId)
-                .withPriority(10000)
+                .withPriority(MAX_PRIORITY)
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
-                        log.info("IgmpProxy filter for {} on {} installed.",
-                                devId, port);
+                        log.info("Igmp filter for {} on {} {}.",
+                                 devId, port, (remove) ? REMOVED : INSTALLED);
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
-                        log.info("IgmpProxy filter for {} on {} failed because {}.",
-                                devId, port, error);
+                        log.info("Igmp filter {} for device {} on port {} failed because of {}",
+                                 (remove) ? INSTALLATION : REMOVAL, devId, port,
+                                 error);
                     }
                 });
 
         flowObjectiveService.filter(devId, igmp);
+
     }
 
     private boolean isConnectPoint(DeviceId device, PortNumber port) {
@@ -699,12 +713,7 @@
     private class InternalNetworkConfigListener implements NetworkConfigListener {
 
         private void reconfigureNetwork(IgmpproxyConfig cfg) {
-            IgmpproxyConfig newCfg;
-            if (cfg == null) {
-                newCfg = new IgmpproxyConfig();
-            } else {
-                newCfg = cfg;
-            }
+            IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
 
             unSolicitedTimeout = newCfg.unsolicitedTimeOut();
             maxResp = newCfg.maxResp();
@@ -718,6 +727,7 @@
             periodicQuery = newCfg.periodicQuery();
             fastLeave = newCfg.fastLeave();
             pimSSmInterworking = newCfg.pimSsmInterworking();
+            enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
 
             if (connectPointMode != newCfg.connectPointMode() ||
                     connectPoint != newCfg.connectPoint()) {
@@ -732,16 +742,24 @@
                 }
             }
             if (connectPoint != null) {
-                log.info("connect point :" + connectPoint.toString());
+                log.info("connect point : {}", connectPoint);
             }
-            log.info(" mode: " + connectPointMode);
+            log.info("mode: {}", connectPointMode);
+
+            getSourceConnectPoint(newCfg);
 
             IgmpSender.getInstance().setIgmpCos(igmpCos);
             IgmpSender.getInstance().setMaxResp(maxResp);
             IgmpSender.getInstance().setMvlan(mvlan);
             IgmpSender.getInstance().setWithRADownlink(withRADownlink);
             IgmpSender.getInstance().setWithRAUplink(withRAUplink);
+        }
 
+        void getSourceConnectPoint(IgmpproxyConfig cfg) {
+            sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
+            if (sourceDeviceAndPort != null) {
+                log.debug("source parameter configured to {}", sourceDeviceAndPort);
+            }
         }
 
         public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
@@ -750,7 +768,7 @@
             }
             Collection<McastRoute> translations = cfg.getSsmTranslations();
             for (McastRoute route : translations) {
-                ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
+                ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
             }
         }
 
@@ -772,6 +790,7 @@
                     if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
                         IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
                         if (config != null) {
+                            log.info("igmpproxy config received. {}", config);
                             reconfigureNetwork(config);
                         }
                     }
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
index efd39ba..2e36a50 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
@@ -37,6 +37,7 @@
     protected static final String DEFAULT_WITH_RA_DOWNLINK = "true";
     private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
     private static final Boolean DEFAULT_PIMSSM_INTERWORKING = false;
+    private static final Boolean DEFAULT_IGMP_PROVISIONING_SUPPORT = Boolean.FALSE;
 
     protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
     protected static final String CONNECT_POINT = "globalConnectPoint";
@@ -52,6 +53,9 @@
     private static final String WITH_RA_UPLINK = "withRAUpLink";
     private static final String WITH_RA_DOWN_LINK = "withRADownLink";
     private static final String PIMSSM_INTERWORKING = "pimSSmInterworking";
+    private static final String SOURCE_DEV_PORT = "sourceDeviceAndPort";
+    private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
+
 
     /**
      * Gets the value of a string property, protecting for an empty
@@ -181,4 +185,24 @@
         }
         return Boolean.parseBoolean(getStringProperty(PIMSSM_INTERWORKING, DEFAULT_PIMSSM_INTERWORKING.toString()));
     }
+
+    public ConnectPoint getSourceDeviceAndPort() {
+        if (object == null || object.path(SOURCE_DEV_PORT) == null) {
+            return null;
+        }
+
+        try {
+            return ConnectPoint.deviceConnectPoint(getStringProperty(SOURCE_DEV_PORT, ""));
+        } catch (Exception ex) {
+            return null;
+        }
+    }
+
+    public boolean enableIgmpProvisioning() {
+        if (object == null || object.path(ENABLE_IGMP_PROVISIONING) == null) {
+            return DEFAULT_IGMP_PROVISIONING_SUPPORT;
+        }
+        return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
+                                                      DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
+    }
 }
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
index 9c6da25..8565a1b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
@@ -19,8 +19,8 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.onlab.packet.IpAddress;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.net.config.Config;
-import org.onosproject.net.mcast.McastRoute;
 
 import java.util.ArrayList;
 import java.util.List;