Add basic multicast RIB synchronization over REST

Change-Id: I75f22956b6b73427ca657f5ab58330b1417fdf43
diff --git a/pom.xml b/pom.xml
index ebd0620..b977c51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,5 +63,10 @@
             <artifactId>org.osgi.compendium</artifactId>
             <version>5.0.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.19</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/src/main/java/org/onosproject/cordmcast/CordMcast.java b/src/main/java/org/onosproject/cordmcast/CordMcast.java
index 82ce28e..b0392f4 100644
--- a/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -15,16 +15,25 @@
  */
 package org.onosproject.cordmcast;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.ConnectPoint;
@@ -42,18 +51,24 @@
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastListener;
+import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.mcast.McastRouteInfo;
 import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.rest.AbstractWebResource;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.net.MediaType.JSON_UTF_8;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * CORD multicast provisoning application. Operates by listening to
- * events on the multicast rib and provsioning groups to program multicast
+ * events on the multicast rib and provisioning groups to program multicast
  * flows on the dataplane.
  */
 @Component(immediate = true)
@@ -75,6 +90,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CodecService codecService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService componentConfigService;
+
+
     protected McastListener listener = new InternalMulticastListener();
 
 
@@ -93,20 +115,50 @@
     // TODO component config this
     private int priority = DEFAULT_PRIORITY;
 
+    private static final String DEFAULT_USER = "karaf";
+    private static final String DEFAULT_PASSWORD = "karaf";
+
+    @Property(name = "syncHost", value = "",
+            label = "host:port to synchronize routes to")
+    private String syncHost = "10.90.0.8:8181";
+
+    @Property(name = "username", value = DEFAULT_USER,
+            label = "Username for REST password authentication")
+    private String user = DEFAULT_USER;
+
+    @Property(name = "password", value = DEFAULT_PASSWORD,
+            label = "Password for REST authentication")
+    private String password = DEFAULT_PASSWORD;
+
+    private String fabricOnosUrl;
+
     @Activate
     public void activate() {
         appId = coreService.registerApplication("org.onosproject.cordmcast");
+        componentConfigService.registerProperties(getClass());
         mcastService.addListener(listener);
+
+        fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast";
+
         //TODO: obtain all existing mcast routes
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), true);
         mcastService.removeListener(listener);
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+        user = Tools.get(properties, "username");
+        password = Tools.get(properties, "password");
+        syncHost = Tools.get(properties, "syncHost");
+    }
+
     private class InternalMulticastListener implements McastListener {
         @Override
         public void event(McastEvent event) {
@@ -135,9 +187,10 @@
         }
         ConnectPoint loc = info.sink().get();
 
+        final AtomicBoolean sync = new AtomicBoolean(false);
 
         Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
-            Integer id = allocateId(g);
+            Integer id = allocateId();
 
             TrafficSelector mcast = DefaultTrafficSelector.builder()
                     .matchVlanId(VlanId.vlanId(mcastVlan))
@@ -170,6 +223,8 @@
 
             flowObjectiveService.forward(loc.deviceId(), fwd);
 
+            sync.set(true);
+
            return id;
         });
 
@@ -195,10 +250,37 @@
                 });
 
         flowObjectiveService.next(loc.deviceId(), next);
+
+        if (sync.get()) {
+            syncRoute(info);
+        }
     }
 
-    private Integer allocateId(IpAddress group) {
-        Integer channel = groups.putIfAbsent(group, channels.getAndIncrement());
-        return channel == null ? groups.get(group) : channel;
+    private void syncRoute(McastRouteInfo info) {
+        if (syncHost == null) {
+            log.warn("No host configured for synchronization; route will be dropped");
+            return;
+        }
+
+        log.debug("Sending route to other ONOS: {}", info.route());
+
+        WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
+
+        ObjectNode json = codecService.getCodec(McastRoute.class)
+                .encode(info.route(), new AbstractWebResource());
+        builder.post(json.toString());
     }
+
+    private Integer allocateId() {
+        return channels.getAndIncrement();
+    }
+
+    private WebResource.Builder getClientBuilder(String uri) {
+        Client client = Client.create();
+        client.addFilter(new HTTPBasicAuthFilter(user, password));
+        WebResource resource = client.resource(uri);
+        return resource.accept(JSON_UTF_8.toString())
+                .type(JSON_UTF_8.toString());
+    }
+
 }