CordMcast clears remote routes on startup to prevent being out of sync.

It also reads existing routes on startup.

Change-Id: I13b8ffae2b57d1e82181a8a745bda185d56f368d
diff --git a/src/main/java/org/onosproject/cordmcast/CordMcast.java b/src/main/java/org/onosproject/cordmcast/CordMcast.java
index 8fd533a..565d76b 100644
--- a/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -15,7 +15,10 @@
  */
 package org.onosproject.cordmcast;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
@@ -32,6 +35,7 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.ConnectPoint;
@@ -46,7 +50,6 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.GroupService;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastListener;
 import org.onosproject.net.mcast.McastRoute;
@@ -56,19 +59,24 @@
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static com.google.common.net.MediaType.JSON_UTF_8;
 import static org.onlab.util.Tools.get;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * CORD multicast provisoning application. Operates by listening to
+ * CORD multicast provisioning application. Operates by listening to
  * events on the multicast rib and provisioning groups to program multicast
  * flows on the dataplane.
  */
@@ -87,9 +95,6 @@
     protected MulticastRouteService mcastService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected GroupService groupService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -139,21 +144,30 @@
 
     @Activate
     public void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
         modified(context);
 
         appId = coreService.registerApplication("org.onosproject.cordmcast");
-        componentConfigService.registerProperties(getClass());
-        mcastService.addListener(listener);
 
         fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast";
 
-        //TODO: obtain all existing mcast routes
+        clearRemoteRoutes();
+
+        mcastService.addListener(listener);
+
+        for (McastRoute route : mcastService.getRoutes()) {
+            Set<ConnectPoint> sinks = mcastService.fetchSinks(route);
+            if (!sinks.isEmpty()) {
+                sinks.forEach(s -> provisionGroup(route, s));
+            }
+        }
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), true);
+        componentConfigService.unregisterProperties(getClass(), false);
         mcastService.removeListener(listener);
         log.info("Stopped");
     }
@@ -162,7 +176,6 @@
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
 
-
         try {
             String s = get(properties, "username");
             user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
@@ -189,13 +202,12 @@
             vlanEnabled = false;
             priority = DEFAULT_PRIORITY;
         }
-
-
     }
 
     private class InternalMulticastListener implements McastListener {
         @Override
         public void event(McastEvent event) {
+            McastRouteInfo info = event.subject();
             switch (event.type()) {
                 case ROUTE_ADDED:
                     break;
@@ -204,7 +216,11 @@
                 case SOURCE_ADDED:
                     break;
                 case SINK_ADDED:
-                    provisionGroup(event.subject());
+                    if (!info.sink().isPresent()) {
+                        log.warn("No sink given after sink added event: {}", info);
+                        return;
+                    }
+                    provisionGroup(info.route(), info.sink().get());
                     break;
                 case SINK_REMOVED:
                     unprovisionGroup(event.subject());
@@ -217,7 +233,7 @@
 
     private void unprovisionGroup(McastRouteInfo info) {
         if (info.sinks().isEmpty()) {
-            removeSyncedRoute(info);
+            removeRemoteRoute(info.route());
         }
 
         if (!info.sink().isPresent()) {
@@ -250,21 +266,18 @@
         flowObjectiveService.next(loc.deviceId(), next);
     }
 
-    private void provisionGroup(McastRouteInfo info) {
-        if (!info.sink().isPresent()) {
-            log.warn("No sink given after sink added event: {}", info);
-            return;
-        }
-        ConnectPoint loc = info.sink().get();
+    private void provisionGroup(McastRoute route, ConnectPoint sink) {
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(sink, "Sink cannot be null");
 
         final AtomicBoolean sync = new AtomicBoolean(false);
 
-        Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
+        Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
             Integer id = allocateId();
 
             NextObjective next = DefaultNextObjective.builder()
                     .fromApp(appId)
-                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
                     .withType(NextObjective.Type.BROADCAST)
                     .withId(id)
                     .add(new ObjectiveContext() {
@@ -283,18 +296,16 @@
                         }
                     });
 
-            flowObjectiveService.next(loc.deviceId(), next);
+            flowObjectiveService.next(sink.deviceId(), next);
 
             TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
                     .matchEthType(Ethernet.TYPE_IPV4)
                     .matchIPDst(g.toIpPrefix());
 
-
             if (vlanEnabled) {
                 mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
             }
 
-
             ForwardingObjective fwd = DefaultForwardingObjective.builder()
                     .fromApp(appId)
                     .nextStep(id)
@@ -316,7 +327,7 @@
                         }
                     });
 
-            flowObjectiveService.forward(loc.deviceId(), fwd);
+            flowObjectiveService.forward(sink.deviceId(), fwd);
 
             sync.set(true);
 
@@ -326,7 +337,7 @@
         if (!sync.get()) {
             NextObjective next = DefaultNextObjective.builder()
                     .fromApp(appId)
-                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
                     .withType(NextObjective.Type.BROADCAST)
                     .withId(nextId)
                     .addToExisting(new ObjectiveContext() {
@@ -345,44 +356,73 @@
                         }
                     });
 
-            flowObjectiveService.next(loc.deviceId(), next);
+            flowObjectiveService.next(sink.deviceId(), next);
         }
 
-        if (sync.get()) {
-            syncRoute(info);
-        }
+        addRemoteRoute(route);
     }
 
-    private void syncRoute(McastRouteInfo info) {
+    private void addRemoteRoute(McastRoute route) {
+        checkNotNull(route);
         if (syncHost == null) {
             log.warn("No host configured for synchronization; route will be dropped");
             return;
         }
 
-        log.debug("Sending route to other ONOS: {}", info.route());
+        log.debug("Sending route to other ONOS: {}", route);
 
         WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
 
         ObjectNode json = codecService.getCodec(McastRoute.class)
-                .encode(info.route(), new AbstractWebResource());
+                .encode(route, new AbstractWebResource());
         builder.post(json.toString());
     }
 
-    private void removeSyncedRoute(McastRouteInfo info) {
+    private void removeRemoteRoute(McastRoute route) {
         if (syncHost == null) {
             log.warn("No host configured for synchronization; route will be dropped");
             return;
         }
 
-        log.debug("Removing route from other ONOS: {}", info.route());
+        log.debug("Removing route from other ONOS: {}", route);
 
         WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
 
         ObjectNode json = codecService.getCodec(McastRoute.class)
-                .encode(info.route(), new AbstractWebResource());
+                .encode(route, new AbstractWebResource());
         builder.delete(json.toString());
     }
 
+    private void clearRemoteRoutes() {
+        if (syncHost == null) {
+            log.warn("No host configured for synchronization");
+            return;
+        }
+
+        log.debug("Clearing remote multicast routes");
+
+        WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
+
+        String response = builder
+                .accept(MediaType.APPLICATION_JSON_TYPE)
+                .get(String.class);
+
+        JsonCodec<McastRoute> routeCodec = codecService.getCodec(McastRoute.class);
+        ObjectMapper mapper = new ObjectMapper();
+        List<McastRoute> mcastRoutes = Lists.newArrayList();
+        try {
+            ObjectNode node = (ObjectNode) mapper.readTree(response);
+            ArrayNode list = (ArrayNode) node.path("routes");
+
+            list.forEach(n -> mcastRoutes.add(
+                    routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
+        } catch (IOException e) {
+            log.warn("Error clearing remote routes", e);
+        }
+
+        mcastRoutes.forEach(this::removeRemoteRoute);
+    }
+
     private Integer allocateId() {
         return channels.getAndIncrement();
     }