SEBA-646 cordmcast should listen to multicast events
from the new multicast store.

- switched to ONOS 1.13.9-rc4
- started listening to events from the new multicast
  store in mcast app of ONOS
- stopped provisioning ONOS mcast application remotely
- obsolete dependencies removed from the pom.xml
- started using ONOS storage sub-system

Change-Id: I0c9e27bbe6f7c6c32a7475a2e7e4c8061c599829
diff --git a/pom.xml b/pom.xml
index 30d264c..67b5ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,61 +47,23 @@
     <dependencies>
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-cli</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onlab-misc</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
             <artifactId>onlab-osgi</artifactId>
             <version>${onos.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-incubator-api</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-         <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-incubator-net</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>onos-api</artifactId>
             <version>${onos.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-common</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.opencord</groupId>
             <artifactId>cord-config</artifactId>
             <version>${cord.config.version}</version>
         </dependency>
-
         <dependency>
-            <groupId>org.opencord</groupId>
-            <artifactId>olt-api</artifactId>
-            <version>${olt.api.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-client</artifactId>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-apps-mcast-api</artifactId>
+            <version>${onos.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/src/main/java/org/opencord/cordmcast/CordMcast.java b/src/main/java/org/opencord/cordmcast/CordMcast.java
index 99dea51..3bb4d72 100644
--- a/src/main/java/org/opencord/cordmcast/CordMcast.java
+++ b/src/main/java/org/opencord/cordmcast/CordMcast.java
@@ -15,11 +15,8 @@
  */
 package org.opencord.cordmcast;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -28,73 +25,72 @@
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.glassfish.jersey.client.ClientConfig;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.codec.CodecService;
-import org.onosproject.codec.JsonCodec;
-
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastListener;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.McastConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.mcast.McastEvent;
-import org.onosproject.net.mcast.McastListener;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.McastRouteInfo;
-import org.onosproject.net.mcast.MulticastRouteService;
-import org.onosproject.rest.AbstractWebResource;
-import org.opencord.cordconfig.access.AccessAgentData;
-import org.opencord.cordconfig.access.AccessDeviceData;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.opencord.cordconfig.CordConfigService;
+import org.opencord.cordconfig.access.AccessDeviceData;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
-import org.onosproject.net.config.basics.McastConfig;
 
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
 import java.util.Dictionary;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static com.google.common.net.MediaType.JSON_UTF_8;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
+
 /**
  * CORD multicast provisioning application. Operates by listening to
  * events on the multicast rib and provisioning groups to program multicast
@@ -106,12 +102,8 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int DEFAULT_REST_TIMEOUT_MS = 1000;
     private static final int DEFAULT_PRIORITY = 500;
     private static final short DEFAULT_MCAST_VLAN = 4000;
-    private static final String DEFAULT_SYNC_HOST = "";
-    private static final String DEFAULT_USER = "karaf";
-    private static final String DEFAULT_PASSWORD = "karaf";
     private static final boolean DEFAULT_VLAN_ENABLED = true;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -124,9 +116,6 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CodecService codecService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -138,16 +127,30 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleService flowRuleService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    public DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private LeadershipService leadershipService;
+
     protected McastListener listener = new InternalMulticastListener();
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
 
-    //TODO: move this to a ec map
-    private Map<NextKey, Integer> groups = Maps.newConcurrentMap();
+    private ConsistentMap<NextKey, NextContent> groups;
 
     private ApplicationId appId;
     private ApplicationId coreAppId;
-    private int mcastVlan = DEFAULT_MCAST_VLAN;
+    private short mcastVlan = DEFAULT_MCAST_VLAN;
 
     @Property(name = "vlanEnabled", boolValue = DEFAULT_VLAN_ENABLED,
             label = "Use vlan for multicast traffic?")
@@ -157,19 +160,6 @@
             label = "Priority for multicast rules")
     private int priority = DEFAULT_PRIORITY;
 
-    @Property(name = "syncHost", value = DEFAULT_SYNC_HOST,
-            label = "host:port to synchronize routes to")
-    private String syncHost = null;
-
-    @Property(name = "username", value = DEFAULT_USER,
-            label = "Username for REST password authentication")
-    private String user = DEFAULT_USER;
-
-    @Property(name = "password", value = DEFAULT_PASSWORD,
-            label = "Password for REST authentication")
-    private String password = DEFAULT_PASSWORD;
-
-    private String fabricOnosUrl;
     private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
             McastConfig.class;
 
@@ -182,6 +172,16 @@
                 }
             };
 
+    // lock to synchronize local operations
+    private final Lock mcastLock = new ReentrantLock();
+    private void mcastLock() {
+        mcastLock.lock();
+    }
+    private void mcastUnlock() {
+        mcastLock.unlock();
+    }
+    private ExecutorService eventExecutor;
+
     @Activate
     public void activate(ComponentContext context) {
         componentConfigService.registerProperties(getClass());
@@ -190,21 +190,31 @@
         appId = coreService.registerApplication(APP_NAME);
         coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
 
-        clearRemoteRoutes();
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
+                                                                        "events-mcast-%d", log));
+
+        KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
+                .register(KryoNamespaces.API)
+                .register(NextKey.class)
+                .register(NextContent.class);
+        groups = storageService
+                .<NextKey, NextContent>consistentMapBuilder()
+                .withName("cord-mcast-groups-store")
+                .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
+                .build();
+
         networkConfig.registerConfigFactory(cordMcastConfigFactory);
         networkConfig.addListener(configListener);
         mcastService.addListener(listener);
 
         mcastService.getRoutes().stream()
-                .map(r -> new ImmutablePair<>(r, mcastService.fetchSinks(r)))
+                .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
                 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
-                .forEach(pair -> pair.getRight().forEach(sink -> provisionGroup(pair.getLeft(),
-                        sink)));
+                .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
+                                                                          sink)));
 
         McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
-        if (config != null) {
-            mcastVlan = config.egressVlan().toShort();
-        }
+        updateConfig(config);
 
         log.info("Started");
     }
@@ -215,16 +225,49 @@
         mcastService.removeListener(listener);
         networkConfig.removeListener(configListener);
         networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
+        eventExecutor.shutdown();
         clearGroups();
+        groups.destroy();
         log.info("Stopped");
     }
 
     public void clearGroups() {
-        groups.keySet().forEach(d -> {
-            flowObjectiveService.next(d.getDevice(), nextObject(groups.get(d), PortNumber.ANY, NextType.Remove));
-        });
-        flowRuleService.removeFlowRulesById(appId);
-        groups.clear();
+        mcastLock();
+        try {
+            groups.keySet().forEach(groupInfo -> {
+                if (!isLocalLeader(groupInfo.getDevice())) {
+                    return;
+                }
+                NextContent next = groups.get(groupInfo).value();
+
+                ObjectiveContext context = new DefaultObjectiveContext(
+                        (objective) -> log.debug("Successfully remove {}",
+                                                 groupInfo.group),
+                        (objective, error) -> log.warn("Failed to remove {}: {}",
+                                                       groupInfo.group, error));
+                // remove the flow rule
+                flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
+                                                                              groupInfo.group).remove(context));
+                // remove all ports from the group
+                next.getOutPorts().stream().forEach(portNumber ->
+                    flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
+                                                                                portNumber, NextType.RemoveFromExisting,
+                                                                                groupInfo.group))
+                );
+
+            });
+            groups.clear();
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private VlanId multicastVlan() {
+        return VlanId.vlanId(mcastVlan);
+    }
+
+    private VlanId assignedVlan() {
+        return vlanEnabled ? multicastVlan() : VlanId.NONE;
     }
 
     @Modified
@@ -232,84 +275,351 @@
         Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
 
         try {
-            String s = get(properties, "username");
-            user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
-
-            s = get(properties, "password");
-            password = isNullOrEmpty(s) ? DEFAULT_PASSWORD : s.trim();
-
-            s = get(properties, "vlanEnabled");
+            String s = get(properties, "vlanEnabled");
             vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
 
             s = get(properties, "priority");
             priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
 
-            s = get(properties, "syncHost");
-            syncHost = isNullOrEmpty(s) ? null : s.trim();
         } catch (Exception e) {
-            user = DEFAULT_USER;
-            password = DEFAULT_PASSWORD;
-            syncHost = null;
-            mcastVlan = DEFAULT_MCAST_VLAN;
+            log.error("Unable to parse configuration parameter.", e);
             vlanEnabled = false;
             priority = DEFAULT_PRIORITY;
         }
-        fabricOnosUrl = createRemoteUrl(syncHost);
-    }
-
-    private static String createRemoteUrl(String remoteHost) {
-        return "http://" + remoteHost + "/onos/v1/mcast";
     }
 
     private class InternalMulticastListener implements McastListener {
         @Override
         public void event(McastEvent event) {
-            McastRouteInfo info = event.subject();
-            switch (event.type()) {
-                case ROUTE_ADDED:
-                    break;
-                case ROUTE_REMOVED:
-                    break;
-                case SOURCE_ADDED:
-                    break;
-                case SINK_ADDED:
-                    if (!info.sink().isPresent()) {
-                        log.warn("No sink given after sink added event: {}", info);
-                        return;
-                    }
-                    provisionGroup(info.route(), info.sink().get());
-                    break;
-                case SINK_REMOVED:
-                    unprovisionGroup(event.subject());
-                    break;
-                default:
-                    log.warn("Unknown mcast event {}", event.type());
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case ROUTE_ADDED:
+                    case ROUTE_REMOVED:
+                    case SOURCES_ADDED:
+                        break;
+                    case SINKS_ADDED:
+                        addSinks(event);
+                        break;
+                    case SINKS_REMOVED:
+                        removeSinks(event);
+                        break;
+                    default:
+                        log.warn("Unknown mcast event {}", event.type());
+                }
+            });
+        }
+    }
+
+    /**
+     * Processes previous, and new sinks then finds the sinks to be removed.
+     * @param prevSinks the previous sinks to be evaluated
+     * @param newSinks the new sinks to be evaluated
+     * @returnt the set of the sinks to be removed
+     */
+    private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
+                                                  Map<HostId, Set<ConnectPoint>> newSinks) {
+        return getSinksToBeProcessed(prevSinks, newSinks);
+    }
+
+
+    /**
+     * Processes previous, and new sinks then finds the sinks to be added.
+     * @param newSinks the new sinks to be processed
+     * @param allPrevSinks all previous sinks
+     * @return the set of the sinks to be added
+     */
+    private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
+                                                Map<HostId, Set<ConnectPoint>> allPrevSinks) {
+        return getSinksToBeProcessed(newSinks, allPrevSinks);
+    }
+
+    /**
+     * Gets single-homed sinks that are in set1 but not in set2.
+     * @param sinkSet1 the first sink map
+     * @param sinkSet2 the second sink map
+     * @return a set containing all the single-homed sinks found in set1 but not in set2
+     */
+    private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
+                                                    Map<HostId, Set<ConnectPoint>> sinkSet2) {
+        final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+        sinkSet1.forEach(((hostId, connectPoints) -> {
+            if (HostId.NONE.equals(hostId)) {
+                //assume all connect points associated with HostId.NONE are single homed sinks
+                sinksToBeProcessed.addAll(connectPoints);
+                return;
+            }
+        }));
+        Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
+                Sets.newHashSet() :
+                sinkSet2.get(HostId.NONE);
+        return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
+    };
+
+
+    private void removeSinks(McastEvent event) {
+        mcastLock();
+        try {
+            Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
+                                                                     event.subject().sinks());
+            sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private void removeSink(IpAddress group, ConnectPoint sink) {
+        if (!isLocalLeader(sink.deviceId())) {
+            log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
+                      sink.deviceId(), sink, group);
+            return;
+        }
+
+        Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
+
+        if (!oltInfo.isPresent()) {
+            log.warn("Unknown OLT device : {}", sink.deviceId());
+            return;
+        }
+
+        log.debug("Removing sink {} from the group {}", sink, group);
+
+        NextKey key = new NextKey(sink.deviceId(), group);
+        groups.computeIfPresent(key, (k, v) -> {
+            flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
+                                                                  NextType.RemoveFromExisting, group));
+
+            Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
+            outPorts.remove(sink.port());
+
+            if (outPorts.isEmpty()) {
+                // this is the last sink
+                ObjectiveContext context = new DefaultObjectiveContext(
+                        (objective) -> log.debug("Successfully remove {} on {}",
+                                                 group, sink),
+                        (objective, error) -> log.warn("Failed to remove {} on {}: {}",
+                                                       group, sink, error));
+                ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
+                flowObjectiveService.forward(sink.deviceId(), fwdObj);
+            }
+            // remove the whole entity if no out port exists in the port list
+            return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
+                                                               ImmutableSet.copyOf(outPorts));
+        });
+    }
+
+    private void addSinks(McastEvent event) {
+        mcastLock();
+        try {
+            Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
+                                                                 event.prevSubject().sinks());
+            sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private void addSink(McastRoute route, ConnectPoint sink) {
+        if (!isLocalLeader(sink.deviceId())) {
+            log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
+                      sink.deviceId(), sink, route.group());
+            return;
+        }
+
+        Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
+
+        if (!oltInfo.isPresent()) {
+            log.warn("Unknown OLT device : {}", sink.deviceId());
+            return;
+        }
+
+        log.debug("Adding sink {} to the group {}", sink, route.group());
+
+        NextKey key = new NextKey(sink.deviceId(), route.group());
+        NextObjective newNextObj;
+
+        boolean theFirstSinkOfGroup = false;
+        if (!groups.containsKey(key)) {
+            // First time someone request this mcast group via this device
+            Integer nextId = flowObjectiveService.allocateNextId();
+            newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
+            // Store the new port
+            groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
+            theFirstSinkOfGroup = true;
+        } else {
+            // This device already serves some subscribers of this mcast group
+            Versioned<NextContent> nextObj = groups.get(key);
+            if (nextObj.value().getOutPorts().contains(sink.port())) {
+                log.info("Group {} already serves the sink connected to {}", route.group(), sink);
+                return;
+            }
+            newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
+                                    NextType.AddToExisting, route.group());
+            // add new port to the group
+            Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
+            outPorts.add(sink.port());
+            groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
+        }
+
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+                                         route.group(), sink.deviceId(), sink.port().toLong(),
+                                         assignedVlan()),
+                (objective, error) -> {
+                    log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+                             route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
+                             error);
+                });
+
+        flowObjectiveService.next(sink.deviceId(), newNextObj);
+
+        if (theFirstSinkOfGroup) {
+            // create the necessary flow rule if this is the first sink request for the group
+            // on this device
+            flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
+                                                                    route.group()).add(context));
+        }
+    }
+
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+
+                    case CONFIG_ADDED:
+                    case CONFIG_UPDATED:
+                        if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
+                            McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
+                            if (config != null) {
+                                //TODO: Simply remove flows/groups, hosts will response period query
+                                // and re-sent IGMP report, so the flows can be rebuild.
+                                // However, better to remove and re-add mcast flow rules here
+                                if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
+                                    clearGroups();
+                                }
+                                updateConfig(config);
+                            }
+                        }
+                        break;
+                    case CONFIG_REGISTERED:
+                    case CONFIG_UNREGISTERED:
+                    case CONFIG_REMOVED:
+                        break;
+                    default:
+                        break;
+                }
+            });
+        }
+    }
+
+    private void updateConfig(McastConfig config) {
+        if (config == null) {
+            return;
+        }
+        log.debug("multicast config received: {}", config);
+
+        if (config.egressVlan() != null) {
+            mcastVlan = config.egressVlan().toShort();
+        }
+    }
+
+    private class NextKey {
+        private DeviceId device;
+        private IpAddress group;
+
+        public NextKey(DeviceId deviceId, IpAddress groupAddress) {
+            device = deviceId;
+            group = groupAddress;
+        }
+
+        public DeviceId getDevice() {
+            return device;
+        }
+
+        public int hashCode() {
+            return Objects.hash(this.device, this.group);
+        }
+
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            } else if (!(obj instanceof NextKey)) {
+                return false;
+            } else {
+                NextKey that = (NextKey) obj;
+                return this.getClass() == that.getClass() &&
+                        Objects.equals(this.device, that.device) &&
+                        Objects.equals(this.group, that.group);
+            }
+        }
+    }
+
+    private class NextContent {
+        private Integer nextId;
+        private Set<PortNumber> outPorts;
+
+        public NextContent(Integer nextId, Set<PortNumber> outPorts) {
+            this.nextId = nextId;
+            this.outPorts = outPorts;
+        }
+
+        public Integer getNextId() {
+            return nextId;
+        }
+
+        public Set<PortNumber> getOutPorts() {
+            return ImmutableSet.copyOf(outPorts);
+        }
+
+        public int hashCode() {
+            return Objects.hash(this.nextId, this.outPorts);
+        }
+
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            } else if (!(obj instanceof NextContent)) {
+                return false;
+            } else {
+                NextContent that = (NextContent) obj;
+                return this.getClass() == that.getClass() &&
+                        Objects.equals(this.nextId, that.nextId) &&
+                        Objects.equals(this.outPorts, that.outPorts);
             }
         }
     }
 
     private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
 
+    private NextObjective nextObject(Integer nextId, PortNumber port,
+                                     NextType nextType, IpAddress mcastIp) {
 
-    private NextObjective nextObject(Integer id, PortNumber port, NextType nextType) {
+        // Build the meta selector with the fwd objective info
+        TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
+                .matchIPDst(mcastIp.toIpPrefix());
+
+        if (vlanEnabled) {
+            metadata.matchVlanId(multicastVlan());
+        }
+
         DefaultNextObjective.Builder build = DefaultNextObjective.builder()
                 .fromApp(appId)
                 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
                 .withType(NextObjective.Type.BROADCAST)
-                .withId(id);
+                .withId(nextId)
+                .withMeta(metadata.build());
+
         ObjectiveContext content = new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                //TODO: change to debug
-                log.info("Next Objective {} installed", objective.id());
+                log.debug("Next Objective {} installed", objective.id());
             }
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                //TODO: change to debug
-                log.info("Next Objective {} failed, because {}",
-                        objective.id(),
-                        error);
+                log.debug("Next Objective {} failed, because {}",
+                          objective.id(),
+                          error);
             }
         };
 
@@ -327,251 +637,45 @@
         }
     }
 
-    private void unprovisionGroup(McastRouteInfo info) {
+    private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
+        TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(mcastIp.toIpPrefix());
 
-        if (info.sinks().isEmpty()) {
-            removeRemoteRoute(info.route());
+        //build the meta selector
+        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+        if (vlanEnabled) {
+            metabuilder.matchVlanId(multicastVlan());
         }
 
-        if (!info.sink().isPresent()) {
-            log.warn("No sink given after sink removed event: {}", info);
-            return;
-        }
-        ConnectPoint loc = info.sink().get();
+        ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
+                .fromApp(appId)
+                .nextStep(nextId)
+                .makePermanent()
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .withPriority(priority)
+                .withSelector(mcast.build())
+                .withMeta(metabuilder.build());
 
-        NextKey key = new NextKey(loc.deviceId(), info.route().group());
-        if (groups.get(key) == null) {
-            log.warn("No groups on device: {}", loc.deviceId());
-            return;
-        }
-        flowObjectiveService.next(loc.deviceId(), nextObject(groups.get(key), loc.port(), NextType.RemoveFromExisting));
+        return fwdBuilder;
     }
 
-    private void provisionGroup(McastRoute route, ConnectPoint sink) {
-        checkNotNull(route, "Route cannot be null");
-        checkNotNull(sink, "Sink cannot be null");
-
-        Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
-
-        if (!oltInfo.isPresent()) {
-            log.warn("Unknown OLT device : {}", sink.deviceId());
-            return;
-        }
-
-        final AtomicBoolean sync = new AtomicBoolean(false);
-        NextKey key = new NextKey(sink.deviceId(), route.group());
-        Integer nextId = groups.computeIfAbsent(key, (g) -> {
-            Integer id = flowObjectiveService.allocateNextId();
-
-            flowObjectiveService.next(sink.deviceId(), nextObject(id, sink.port(), NextType.AddNew));
-
-            TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
-                    .matchInPort(oltInfo.get().uplink())
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPDst(route.group().toIpPrefix());
-
-            if (vlanEnabled) {
-                mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
-            }
-
-            ForwardingObjective fwd = DefaultForwardingObjective.builder()
-                    .fromApp(appId)
-                    .nextStep(id)
-                    .makePermanent()
-                    .withFlag(ForwardingObjective.Flag.VERSATILE)
-                    .withPriority(priority)
-                    .withSelector(mcast.build())
-                    .add(new ObjectiveContext() {
-                        @Override
-                        public void onSuccess(Objective objective) {
-                            //TODO: change to debug
-                            log.info("Forwarding objective installed {}", objective);
-                        }
-
-                        @Override
-                        public void onError(Objective objective, ObjectiveError error) {
-                            //TODO: change to debug
-                            log.info("Forwarding objective failed {}", objective);
-                        }
-                    });
-
-            flowObjectiveService.forward(sink.deviceId(), fwd);
-
-            sync.set(true);
-
-            return id;
-        });
-
-        if (!sync.get()) {
-            flowObjectiveService.next(sink.deviceId(), nextObject(nextId, sink.port(), NextType.AddToExisting));
-        }
-
-        addRemoteRoute(route, sink);
-    }
-
-    private void addRemoteRoute(McastRoute route, ConnectPoint inPort) {
-        checkNotNull(route);
-        if (syncHost == null) {
-            log.warn("No host configured for synchronization; route will be dropped");
-            return;
-        }
-
-        Optional<AccessAgentData> accessAgent = cordConfigService.getAccessAgent(inPort.deviceId());
-        if (!accessAgent.isPresent()) {
-            log.warn("No accessAgent config found for in port {}", inPort);
-            return;
-        }
-
-        if (!accessAgent.get().getOltConnectPoint(inPort).isPresent()) {
-            log.warn("No OLT configured for in port {}", inPort);
-            return;
-        }
-
-        ConnectPoint oltConnectPoint = accessAgent.get().getOltConnectPoint(inPort).get();
-
-        log.debug("Sending route {} to other ONOS {}", route, fabricOnosUrl);
-
-        Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
-
-        ObjectNode json = codecService.getCodec(McastRoute.class)
-                .encode(route, new AbstractWebResource());
-
-        try {
-            builder.post(Entity.json(json.toString()));
-
-            builder = getClientBuilder(fabricOnosUrl + "/sinks/" + route.group() + "/" + route.source());
-            ObjectMapper mapper = new ObjectMapper();
-            ObjectNode obj = mapper.createObjectNode();
-            obj.putArray("sinks").add(oltConnectPoint.deviceId() + "/" + oltConnectPoint.port());
-
-            builder.post(Entity.json(obj.toString()));
-        } catch (ProcessingException e) {
-            log.warn("Unable to send route to remote controller: {}", e.getMessage());
-        }
-    }
-
-    private void removeRemoteRoute(McastRoute route) {
-        if (syncHost == null) {
-            log.warn("No host configured for synchronization; route will be dropped");
-            return;
-        }
-
-        log.debug("Removing route {} from other ONOS {}", route, fabricOnosUrl);
-
-        Invocation.Builder builder = getClientBuilder(fabricOnosUrl)
-                .property(ClientProperties.SUPPRESS_HTTP_COMPLIANCE_VALIDATION, true);
-
-        ObjectNode json = codecService.getCodec(McastRoute.class)
-                .encode(route, new AbstractWebResource());
-
-        builder.method("DELETE", Entity.entity(json.asText(),
-                MediaType.APPLICATION_OCTET_STREAM));
-    }
-
-    private void clearRemoteRoutes() {
-        if (syncHost == null) {
-            log.warn("No host configured for synchronization");
-            return;
-        }
-
-        log.debug("Clearing remote multicast routes from {}", fabricOnosUrl);
-
-        Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
-        List<McastRoute> mcastRoutes = Lists.newArrayList();
-
-        try {
-            String response = builder
-                    .accept(MediaType.APPLICATION_JSON_TYPE)
-                    .get(String.class);
-
-            JsonCodec<McastRoute> routeCodec = codecService.getCodec(McastRoute.class);
-            ObjectMapper mapper = new ObjectMapper();
-
-
-            ObjectNode node = (ObjectNode) mapper.readTree(response);
-            ArrayNode list = (ArrayNode) node.path("routes");
-
-            list.forEach(n -> mcastRoutes.add(
-                    routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
-
-        } catch (IOException | ProcessingException e) {
-            log.warn("Error clearing remote routes", e);
-        }
-
-        mcastRoutes.forEach(this::removeRemoteRoute);
-    }
-
-    private Invocation.Builder getClientBuilder(String uri) {
-        ClientConfig config = new ClientConfig();
-        Client client = ClientBuilder.newClient(config);
-
-        client.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_REST_TIMEOUT_MS);
-        client.property(ClientProperties.READ_TIMEOUT,    DEFAULT_REST_TIMEOUT_MS);
-        client.register(HttpAuthenticationFeature.basic(user, password));
-
-        WebTarget wt = client.target(uri);
-        return wt.request(JSON_UTF_8.toString());
-    }
-
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-        @Override
-        public void event(NetworkConfigEvent event) {
-            switch (event.type()) {
-
-                case CONFIG_ADDED:
-                case CONFIG_UPDATED:
-                    if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
-                        McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
-                        if (config != null) {
-                            //TODO: Simply remove flows/groups, hosts will response period query
-                            // and re-sent IGMP report, so the flows can be rebuild.
-                            // However, better to remove and re-add mcast flow rules here
-                            if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
-                                clearGroups();
-                            }
-                            mcastVlan = config.egressVlan().toShort();
-                        }
-                    }
-                    break;
-                case CONFIG_REGISTERED:
-                case CONFIG_UNREGISTERED:
-                case CONFIG_REMOVED:
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    private class NextKey {
-        private DeviceId device;
-        private IpAddress group;
-        public NextKey(DeviceId deviceId, IpAddress groupAddress) {
-            device = deviceId;
-            group = groupAddress;
-        }
-
-        public DeviceId getDevice() {
-            return device;
-        }
-
-        public int hashCode() {
-            return com.google.common.base.Objects.hashCode(new Object[]{this.device, this.group});
-        }
-
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            } else if (!(obj instanceof NextKey)) {
+    // Custom-built function, when the device is not available we need a fallback mechanism
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            // When the device is available we just check the mastership
+            if (deviceService.isAvailable(deviceId)) {
                 return false;
-            } else {
-                NextKey that = (NextKey) obj;
-                return this.getClass() == that.getClass() &&
-                        Objects.equals(this.device, that.device) &&
-                        Objects.equals(this.group, that.group);
             }
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
         }
+        return true;
     }
+
 }