CordMcast clears remote routes on startup to prevent being out of sync.
It also reads existing routes on startup.
Change-Id: I13b8ffae2b57d1e82181a8a745bda185d56f368d
diff --git a/src/main/java/org/onosproject/cordmcast/CordMcast.java b/src/main/java/org/onosproject/cordmcast/CordMcast.java
index 8fd533a..565d76b 100644
--- a/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -15,7 +15,10 @@
*/
package org.onosproject.cordmcast;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
@@ -32,6 +35,7 @@
import org.onlab.packet.VlanId;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
@@ -46,7 +50,6 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.GroupService;
import org.onosproject.net.mcast.McastEvent;
import org.onosproject.net.mcast.McastListener;
import org.onosproject.net.mcast.McastRoute;
@@ -56,19 +59,24 @@
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
import java.util.Dictionary;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.MediaType.JSON_UTF_8;
import static org.onlab.util.Tools.get;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * CORD multicast provisoning application. Operates by listening to
+ * CORD multicast provisioning application. Operates by listening to
* events on the multicast rib and provisioning groups to program multicast
* flows on the dataplane.
*/
@@ -87,9 +95,6 @@
protected MulticastRouteService mcastService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected GroupService groupService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -139,21 +144,30 @@
@Activate
public void activate(ComponentContext context) {
+ componentConfigService.registerProperties(getClass());
modified(context);
appId = coreService.registerApplication("org.onosproject.cordmcast");
- componentConfigService.registerProperties(getClass());
- mcastService.addListener(listener);
fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast";
- //TODO: obtain all existing mcast routes
+ clearRemoteRoutes();
+
+ mcastService.addListener(listener);
+
+ for (McastRoute route : mcastService.getRoutes()) {
+ Set<ConnectPoint> sinks = mcastService.fetchSinks(route);
+ if (!sinks.isEmpty()) {
+ sinks.forEach(s -> provisionGroup(route, s));
+ }
+ }
+
log.info("Started");
}
@Deactivate
public void deactivate() {
- componentConfigService.unregisterProperties(getClass(), true);
+ componentConfigService.unregisterProperties(getClass(), false);
mcastService.removeListener(listener);
log.info("Stopped");
}
@@ -162,7 +176,6 @@
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
-
try {
String s = get(properties, "username");
user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
@@ -189,13 +202,12 @@
vlanEnabled = false;
priority = DEFAULT_PRIORITY;
}
-
-
}
private class InternalMulticastListener implements McastListener {
@Override
public void event(McastEvent event) {
+ McastRouteInfo info = event.subject();
switch (event.type()) {
case ROUTE_ADDED:
break;
@@ -204,7 +216,11 @@
case SOURCE_ADDED:
break;
case SINK_ADDED:
- provisionGroup(event.subject());
+ if (!info.sink().isPresent()) {
+ log.warn("No sink given after sink added event: {}", info);
+ return;
+ }
+ provisionGroup(info.route(), info.sink().get());
break;
case SINK_REMOVED:
unprovisionGroup(event.subject());
@@ -217,7 +233,7 @@
private void unprovisionGroup(McastRouteInfo info) {
if (info.sinks().isEmpty()) {
- removeSyncedRoute(info);
+ removeRemoteRoute(info.route());
}
if (!info.sink().isPresent()) {
@@ -250,21 +266,18 @@
flowObjectiveService.next(loc.deviceId(), next);
}
- private void provisionGroup(McastRouteInfo info) {
- if (!info.sink().isPresent()) {
- log.warn("No sink given after sink added event: {}", info);
- return;
- }
- ConnectPoint loc = info.sink().get();
+ private void provisionGroup(McastRoute route, ConnectPoint sink) {
+ checkNotNull(route, "Route cannot be null");
+ checkNotNull(sink, "Sink cannot be null");
final AtomicBoolean sync = new AtomicBoolean(false);
- Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
+ Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
Integer id = allocateId();
NextObjective next = DefaultNextObjective.builder()
.fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
.withType(NextObjective.Type.BROADCAST)
.withId(id)
.add(new ObjectiveContext() {
@@ -283,18 +296,16 @@
}
});
- flowObjectiveService.next(loc.deviceId(), next);
+ flowObjectiveService.next(sink.deviceId(), next);
TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(g.toIpPrefix());
-
if (vlanEnabled) {
mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
}
-
ForwardingObjective fwd = DefaultForwardingObjective.builder()
.fromApp(appId)
.nextStep(id)
@@ -316,7 +327,7 @@
}
});
- flowObjectiveService.forward(loc.deviceId(), fwd);
+ flowObjectiveService.forward(sink.deviceId(), fwd);
sync.set(true);
@@ -326,7 +337,7 @@
if (!sync.get()) {
NextObjective next = DefaultNextObjective.builder()
.fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
.withType(NextObjective.Type.BROADCAST)
.withId(nextId)
.addToExisting(new ObjectiveContext() {
@@ -345,44 +356,73 @@
}
});
- flowObjectiveService.next(loc.deviceId(), next);
+ flowObjectiveService.next(sink.deviceId(), next);
}
- if (sync.get()) {
- syncRoute(info);
- }
+ addRemoteRoute(route);
}
- private void syncRoute(McastRouteInfo info) {
+ private void addRemoteRoute(McastRoute route) {
+ checkNotNull(route);
if (syncHost == null) {
log.warn("No host configured for synchronization; route will be dropped");
return;
}
- log.debug("Sending route to other ONOS: {}", info.route());
+ log.debug("Sending route to other ONOS: {}", route);
WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
ObjectNode json = codecService.getCodec(McastRoute.class)
- .encode(info.route(), new AbstractWebResource());
+ .encode(route, new AbstractWebResource());
builder.post(json.toString());
}
- private void removeSyncedRoute(McastRouteInfo info) {
+ private void removeRemoteRoute(McastRoute route) {
if (syncHost == null) {
log.warn("No host configured for synchronization; route will be dropped");
return;
}
- log.debug("Removing route from other ONOS: {}", info.route());
+ log.debug("Removing route from other ONOS: {}", route);
WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
ObjectNode json = codecService.getCodec(McastRoute.class)
- .encode(info.route(), new AbstractWebResource());
+ .encode(route, new AbstractWebResource());
builder.delete(json.toString());
}
+ private void clearRemoteRoutes() {
+ if (syncHost == null) {
+ log.warn("No host configured for synchronization");
+ return;
+ }
+
+ log.debug("Clearing remote multicast routes");
+
+ WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
+
+ String response = builder
+ .accept(MediaType.APPLICATION_JSON_TYPE)
+ .get(String.class);
+
+ JsonCodec<McastRoute> routeCodec = codecService.getCodec(McastRoute.class);
+ ObjectMapper mapper = new ObjectMapper();
+ List<McastRoute> mcastRoutes = Lists.newArrayList();
+ try {
+ ObjectNode node = (ObjectNode) mapper.readTree(response);
+ ArrayNode list = (ArrayNode) node.path("routes");
+
+ list.forEach(n -> mcastRoutes.add(
+ routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
+ } catch (IOException e) {
+ log.warn("Error clearing remote routes", e);
+ }
+
+ mcastRoutes.forEach(this::removeRemoteRoute);
+ }
+
private Integer allocateId() {
return channels.getAndIncrement();
}