SEBA-646 cordmcast should listen to multicast events
from the new multicast store.
- switched to ONOS 1.13.9-rc4
- started listening to events from the new multicast
store in mcast app of ONOS
- stopped provisioning ONOS mcast application remotely
- obsolete dependencies removed from the pom.xml
- started using ONOS storage sub-system
Change-Id: I0c9e27bbe6f7c6c32a7475a2e7e4c8061c599829
diff --git a/pom.xml b/pom.xml
index 74ef231..9fb771b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-dependencies</artifactId>
- <version>1.13.1</version>
+ <version>1.13.9-rc4</version>
<relativePath></relativePath>
</parent>
@@ -35,7 +35,7 @@
<properties>
<onos.app.name>org.opencord.mcast</onos.app.name>
- <onos.version>1.13.1</onos.version>
+ <onos.version>1.13.9-rc4</onos.version>
<onos.app.category>Traffic Steering</onos.app.category>
<onos.app.title>CORD Multicast App</onos.app.title>
<onos.app.url>http://opencord.org</onos.app.url>
@@ -47,61 +47,23 @@
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-cli</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-misc</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${onos.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-api</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-net</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<version>${onos.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-core-common</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
<dependency>
<groupId>org.opencord</groupId>
<artifactId>cord-config</artifactId>
<version>${cord.config.version}</version>
</dependency>
-
<dependency>
- <groupId>org.opencord</groupId>
- <artifactId>olt-api</artifactId>
- <version>${olt.api.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-client</artifactId>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-mcast-api</artifactId>
+ <version>${onos.version}</version>
</dependency>
</dependencies>
@@ -110,35 +72,16 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- <version>3.0.1</version>
+ <version>4.1.0</version>
<extensions>true</extensions>
+ <inherited>true</inherited>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>2.5.1</version>
+ <version>3.8.0</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
- <version>1.21.0</version>
- <executions>
- <execution>
- <id>generate-scr-srcdescriptor</id>
- <goals>
- <goal>scr</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <supportedProjectTypes>
- <supportedProjectType>bundle</supportedProjectType>
- <supportedProjectType>war</supportedProjectType>
- </supportedProjectTypes>
+ <release>11</release>
</configuration>
</plugin>
<plugin>
diff --git a/src/main/java/org/opencord/cordmcast/CordMcast.java b/src/main/java/org/opencord/cordmcast/CordMcast.java
index 99dea51..3bb4d72 100644
--- a/src/main/java/org/opencord/cordmcast/CordMcast.java
+++ b/src/main/java/org/opencord/cordmcast/CordMcast.java
@@ -15,11 +15,8 @@
*/
package org.opencord.cordmcast;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,73 +25,72 @@
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.glassfish.jersey.client.ClientConfig;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.codec.CodecService;
-import org.onosproject.codec.JsonCodec;
-
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastListener;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.McastConfig;
import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.mcast.McastEvent;
-import org.onosproject.net.mcast.McastListener;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.McastRouteInfo;
-import org.onosproject.net.mcast.MulticastRouteService;
-import org.onosproject.rest.AbstractWebResource;
-import org.opencord.cordconfig.access.AccessAgentData;
-import org.opencord.cordconfig.access.AccessDeviceData;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.opencord.cordconfig.CordConfigService;
+import org.opencord.cordconfig.access.AccessDeviceData;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import org.onosproject.net.config.basics.McastConfig;
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
import java.util.Dictionary;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static com.google.common.net.MediaType.JSON_UTF_8;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+
/**
* CORD multicast provisioning application. Operates by listening to
* events on the multicast rib and provisioning groups to program multicast
@@ -106,12 +102,8 @@
private final Logger log = getLogger(getClass());
- private static final int DEFAULT_REST_TIMEOUT_MS = 1000;
private static final int DEFAULT_PRIORITY = 500;
private static final short DEFAULT_MCAST_VLAN = 4000;
- private static final String DEFAULT_SYNC_HOST = "";
- private static final String DEFAULT_USER = "karaf";
- private static final String DEFAULT_PASSWORD = "karaf";
private static final boolean DEFAULT_VLAN_ENABLED = true;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -124,9 +116,6 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CodecService codecService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -138,16 +127,30 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private LeadershipService leadershipService;
+
protected McastListener listener = new InternalMulticastListener();
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
- //TODO: move this to a ec map
- private Map<NextKey, Integer> groups = Maps.newConcurrentMap();
+ private ConsistentMap<NextKey, NextContent> groups;
private ApplicationId appId;
private ApplicationId coreAppId;
- private int mcastVlan = DEFAULT_MCAST_VLAN;
+ private short mcastVlan = DEFAULT_MCAST_VLAN;
@Property(name = "vlanEnabled", boolValue = DEFAULT_VLAN_ENABLED,
label = "Use vlan for multicast traffic?")
@@ -157,19 +160,6 @@
label = "Priority for multicast rules")
private int priority = DEFAULT_PRIORITY;
- @Property(name = "syncHost", value = DEFAULT_SYNC_HOST,
- label = "host:port to synchronize routes to")
- private String syncHost = null;
-
- @Property(name = "username", value = DEFAULT_USER,
- label = "Username for REST password authentication")
- private String user = DEFAULT_USER;
-
- @Property(name = "password", value = DEFAULT_PASSWORD,
- label = "Password for REST authentication")
- private String password = DEFAULT_PASSWORD;
-
- private String fabricOnosUrl;
private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
McastConfig.class;
@@ -182,6 +172,16 @@
}
};
+ // lock to synchronize local operations
+ private final Lock mcastLock = new ReentrantLock();
+ private void mcastLock() {
+ mcastLock.lock();
+ }
+ private void mcastUnlock() {
+ mcastLock.unlock();
+ }
+ private ExecutorService eventExecutor;
+
@Activate
public void activate(ComponentContext context) {
componentConfigService.registerProperties(getClass());
@@ -190,21 +190,31 @@
appId = coreService.registerApplication(APP_NAME);
coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
- clearRemoteRoutes();
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
+ "events-mcast-%d", log));
+
+ KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(NextKey.class)
+ .register(NextContent.class);
+ groups = storageService
+ .<NextKey, NextContent>consistentMapBuilder()
+ .withName("cord-mcast-groups-store")
+ .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
+ .build();
+
networkConfig.registerConfigFactory(cordMcastConfigFactory);
networkConfig.addListener(configListener);
mcastService.addListener(listener);
mcastService.getRoutes().stream()
- .map(r -> new ImmutablePair<>(r, mcastService.fetchSinks(r)))
+ .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
.filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
- .forEach(pair -> pair.getRight().forEach(sink -> provisionGroup(pair.getLeft(),
- sink)));
+ .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
+ sink)));
McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
- if (config != null) {
- mcastVlan = config.egressVlan().toShort();
- }
+ updateConfig(config);
log.info("Started");
}
@@ -215,16 +225,49 @@
mcastService.removeListener(listener);
networkConfig.removeListener(configListener);
networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
+ eventExecutor.shutdown();
clearGroups();
+ groups.destroy();
log.info("Stopped");
}
public void clearGroups() {
- groups.keySet().forEach(d -> {
- flowObjectiveService.next(d.getDevice(), nextObject(groups.get(d), PortNumber.ANY, NextType.Remove));
- });
- flowRuleService.removeFlowRulesById(appId);
- groups.clear();
+ mcastLock();
+ try {
+ groups.keySet().forEach(groupInfo -> {
+ if (!isLocalLeader(groupInfo.getDevice())) {
+ return;
+ }
+ NextContent next = groups.get(groupInfo).value();
+
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully remove {}",
+ groupInfo.group),
+ (objective, error) -> log.warn("Failed to remove {}: {}",
+ groupInfo.group, error));
+ // remove the flow rule
+ flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
+ groupInfo.group).remove(context));
+ // remove all ports from the group
+ next.getOutPorts().stream().forEach(portNumber ->
+ flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
+ portNumber, NextType.RemoveFromExisting,
+ groupInfo.group))
+ );
+
+ });
+ groups.clear();
+ } finally {
+ mcastUnlock();
+ }
+ }
+
+ private VlanId multicastVlan() {
+ return VlanId.vlanId(mcastVlan);
+ }
+
+ private VlanId assignedVlan() {
+ return vlanEnabled ? multicastVlan() : VlanId.NONE;
}
@Modified
@@ -232,84 +275,351 @@
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
try {
- String s = get(properties, "username");
- user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
-
- s = get(properties, "password");
- password = isNullOrEmpty(s) ? DEFAULT_PASSWORD : s.trim();
-
- s = get(properties, "vlanEnabled");
+ String s = get(properties, "vlanEnabled");
vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
s = get(properties, "priority");
priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
- s = get(properties, "syncHost");
- syncHost = isNullOrEmpty(s) ? null : s.trim();
} catch (Exception e) {
- user = DEFAULT_USER;
- password = DEFAULT_PASSWORD;
- syncHost = null;
- mcastVlan = DEFAULT_MCAST_VLAN;
+ log.error("Unable to parse configuration parameter.", e);
vlanEnabled = false;
priority = DEFAULT_PRIORITY;
}
- fabricOnosUrl = createRemoteUrl(syncHost);
- }
-
- private static String createRemoteUrl(String remoteHost) {
- return "http://" + remoteHost + "/onos/v1/mcast";
}
private class InternalMulticastListener implements McastListener {
@Override
public void event(McastEvent event) {
- McastRouteInfo info = event.subject();
- switch (event.type()) {
- case ROUTE_ADDED:
- break;
- case ROUTE_REMOVED:
- break;
- case SOURCE_ADDED:
- break;
- case SINK_ADDED:
- if (!info.sink().isPresent()) {
- log.warn("No sink given after sink added event: {}", info);
- return;
- }
- provisionGroup(info.route(), info.sink().get());
- break;
- case SINK_REMOVED:
- unprovisionGroup(event.subject());
- break;
- default:
- log.warn("Unknown mcast event {}", event.type());
+ eventExecutor.execute(() -> {
+ switch (event.type()) {
+ case ROUTE_ADDED:
+ case ROUTE_REMOVED:
+ case SOURCES_ADDED:
+ break;
+ case SINKS_ADDED:
+ addSinks(event);
+ break;
+ case SINKS_REMOVED:
+ removeSinks(event);
+ break;
+ default:
+ log.warn("Unknown mcast event {}", event.type());
+ }
+ });
+ }
+ }
+
+ /**
+ * Processes previous, and new sinks then finds the sinks to be removed.
+ * @param prevSinks the previous sinks to be evaluated
+ * @param newSinks the new sinks to be evaluated
+ * @returnt the set of the sinks to be removed
+ */
+ private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
+ Map<HostId, Set<ConnectPoint>> newSinks) {
+ return getSinksToBeProcessed(prevSinks, newSinks);
+ }
+
+
+ /**
+ * Processes previous, and new sinks then finds the sinks to be added.
+ * @param newSinks the new sinks to be processed
+ * @param allPrevSinks all previous sinks
+ * @return the set of the sinks to be added
+ */
+ private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
+ Map<HostId, Set<ConnectPoint>> allPrevSinks) {
+ return getSinksToBeProcessed(newSinks, allPrevSinks);
+ }
+
+ /**
+ * Gets single-homed sinks that are in set1 but not in set2.
+ * @param sinkSet1 the first sink map
+ * @param sinkSet2 the second sink map
+ * @return a set containing all the single-homed sinks found in set1 but not in set2
+ */
+ private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
+ Map<HostId, Set<ConnectPoint>> sinkSet2) {
+ final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+ sinkSet1.forEach(((hostId, connectPoints) -> {
+ if (HostId.NONE.equals(hostId)) {
+ //assume all connect points associated with HostId.NONE are single homed sinks
+ sinksToBeProcessed.addAll(connectPoints);
+ return;
+ }
+ }));
+ Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
+ Sets.newHashSet() :
+ sinkSet2.get(HostId.NONE);
+ return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
+ };
+
+
+ private void removeSinks(McastEvent event) {
+ mcastLock();
+ try {
+ Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
+ event.subject().sinks());
+ sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
+ } finally {
+ mcastUnlock();
+ }
+ }
+
+ private void removeSink(IpAddress group, ConnectPoint sink) {
+ if (!isLocalLeader(sink.deviceId())) {
+ log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
+ sink.deviceId(), sink, group);
+ return;
+ }
+
+ Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
+
+ if (!oltInfo.isPresent()) {
+ log.warn("Unknown OLT device : {}", sink.deviceId());
+ return;
+ }
+
+ log.debug("Removing sink {} from the group {}", sink, group);
+
+ NextKey key = new NextKey(sink.deviceId(), group);
+ groups.computeIfPresent(key, (k, v) -> {
+ flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
+ NextType.RemoveFromExisting, group));
+
+ Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
+ outPorts.remove(sink.port());
+
+ if (outPorts.isEmpty()) {
+ // this is the last sink
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully remove {} on {}",
+ group, sink),
+ (objective, error) -> log.warn("Failed to remove {} on {}: {}",
+ group, sink, error));
+ ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
+ flowObjectiveService.forward(sink.deviceId(), fwdObj);
+ }
+ // remove the whole entity if no out port exists in the port list
+ return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
+ ImmutableSet.copyOf(outPorts));
+ });
+ }
+
+ private void addSinks(McastEvent event) {
+ mcastLock();
+ try {
+ Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
+ event.prevSubject().sinks());
+ sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
+ } finally {
+ mcastUnlock();
+ }
+ }
+
+ private void addSink(McastRoute route, ConnectPoint sink) {
+ if (!isLocalLeader(sink.deviceId())) {
+ log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
+ sink.deviceId(), sink, route.group());
+ return;
+ }
+
+ Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
+
+ if (!oltInfo.isPresent()) {
+ log.warn("Unknown OLT device : {}", sink.deviceId());
+ return;
+ }
+
+ log.debug("Adding sink {} to the group {}", sink, route.group());
+
+ NextKey key = new NextKey(sink.deviceId(), route.group());
+ NextObjective newNextObj;
+
+ boolean theFirstSinkOfGroup = false;
+ if (!groups.containsKey(key)) {
+ // First time someone request this mcast group via this device
+ Integer nextId = flowObjectiveService.allocateNextId();
+ newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
+ // Store the new port
+ groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
+ theFirstSinkOfGroup = true;
+ } else {
+ // This device already serves some subscribers of this mcast group
+ Versioned<NextContent> nextObj = groups.get(key);
+ if (nextObj.value().getOutPorts().contains(sink.port())) {
+ log.info("Group {} already serves the sink connected to {}", route.group(), sink);
+ return;
+ }
+ newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
+ NextType.AddToExisting, route.group());
+ // add new port to the group
+ Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
+ outPorts.add(sink.port());
+ groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
+ }
+
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+ route.group(), sink.deviceId(), sink.port().toLong(),
+ assignedVlan()),
+ (objective, error) -> {
+ log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+ route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
+ error);
+ });
+
+ flowObjectiveService.next(sink.deviceId(), newNextObj);
+
+ if (theFirstSinkOfGroup) {
+ // create the necessary flow rule if this is the first sink request for the group
+ // on this device
+ flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
+ route.group()).add(context));
+ }
+ }
+
+ private class InternalNetworkConfigListener implements NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+ eventExecutor.execute(() -> {
+ switch (event.type()) {
+
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
+ McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
+ if (config != null) {
+ //TODO: Simply remove flows/groups, hosts will response period query
+ // and re-sent IGMP report, so the flows can be rebuild.
+ // However, better to remove and re-add mcast flow rules here
+ if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
+ clearGroups();
+ }
+ updateConfig(config);
+ }
+ }
+ break;
+ case CONFIG_REGISTERED:
+ case CONFIG_UNREGISTERED:
+ case CONFIG_REMOVED:
+ break;
+ default:
+ break;
+ }
+ });
+ }
+ }
+
+ private void updateConfig(McastConfig config) {
+ if (config == null) {
+ return;
+ }
+ log.debug("multicast config received: {}", config);
+
+ if (config.egressVlan() != null) {
+ mcastVlan = config.egressVlan().toShort();
+ }
+ }
+
+ private class NextKey {
+ private DeviceId device;
+ private IpAddress group;
+
+ public NextKey(DeviceId deviceId, IpAddress groupAddress) {
+ device = deviceId;
+ group = groupAddress;
+ }
+
+ public DeviceId getDevice() {
+ return device;
+ }
+
+ public int hashCode() {
+ return Objects.hash(this.device, this.group);
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (!(obj instanceof NextKey)) {
+ return false;
+ } else {
+ NextKey that = (NextKey) obj;
+ return this.getClass() == that.getClass() &&
+ Objects.equals(this.device, that.device) &&
+ Objects.equals(this.group, that.group);
+ }
+ }
+ }
+
+ private class NextContent {
+ private Integer nextId;
+ private Set<PortNumber> outPorts;
+
+ public NextContent(Integer nextId, Set<PortNumber> outPorts) {
+ this.nextId = nextId;
+ this.outPorts = outPorts;
+ }
+
+ public Integer getNextId() {
+ return nextId;
+ }
+
+ public Set<PortNumber> getOutPorts() {
+ return ImmutableSet.copyOf(outPorts);
+ }
+
+ public int hashCode() {
+ return Objects.hash(this.nextId, this.outPorts);
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (!(obj instanceof NextContent)) {
+ return false;
+ } else {
+ NextContent that = (NextContent) obj;
+ return this.getClass() == that.getClass() &&
+ Objects.equals(this.nextId, that.nextId) &&
+ Objects.equals(this.outPorts, that.outPorts);
}
}
}
private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
+ private NextObjective nextObject(Integer nextId, PortNumber port,
+ NextType nextType, IpAddress mcastIp) {
- private NextObjective nextObject(Integer id, PortNumber port, NextType nextType) {
+ // Build the meta selector with the fwd objective info
+ TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
+ .matchIPDst(mcastIp.toIpPrefix());
+
+ if (vlanEnabled) {
+ metadata.matchVlanId(multicastVlan());
+ }
+
DefaultNextObjective.Builder build = DefaultNextObjective.builder()
.fromApp(appId)
.addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
.withType(NextObjective.Type.BROADCAST)
- .withId(id);
+ .withId(nextId)
+ .withMeta(metadata.build());
+
ObjectiveContext content = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
+ log.debug("Next Objective {} installed", objective.id());
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Next Objective {} failed, because {}",
- objective.id(),
- error);
+ log.debug("Next Objective {} failed, because {}",
+ objective.id(),
+ error);
}
};
@@ -327,251 +637,45 @@
}
}
- private void unprovisionGroup(McastRouteInfo info) {
+ private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
+ TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(mcastIp.toIpPrefix());
- if (info.sinks().isEmpty()) {
- removeRemoteRoute(info.route());
+ //build the meta selector
+ TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+ if (vlanEnabled) {
+ metabuilder.matchVlanId(multicastVlan());
}
- if (!info.sink().isPresent()) {
- log.warn("No sink given after sink removed event: {}", info);
- return;
- }
- ConnectPoint loc = info.sink().get();
+ ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
+ .fromApp(appId)
+ .nextStep(nextId)
+ .makePermanent()
+ .withFlag(ForwardingObjective.Flag.SPECIFIC)
+ .withPriority(priority)
+ .withSelector(mcast.build())
+ .withMeta(metabuilder.build());
- NextKey key = new NextKey(loc.deviceId(), info.route().group());
- if (groups.get(key) == null) {
- log.warn("No groups on device: {}", loc.deviceId());
- return;
- }
- flowObjectiveService.next(loc.deviceId(), nextObject(groups.get(key), loc.port(), NextType.RemoveFromExisting));
+ return fwdBuilder;
}
- private void provisionGroup(McastRoute route, ConnectPoint sink) {
- checkNotNull(route, "Route cannot be null");
- checkNotNull(sink, "Sink cannot be null");
-
- Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
-
- if (!oltInfo.isPresent()) {
- log.warn("Unknown OLT device : {}", sink.deviceId());
- return;
- }
-
- final AtomicBoolean sync = new AtomicBoolean(false);
- NextKey key = new NextKey(sink.deviceId(), route.group());
- Integer nextId = groups.computeIfAbsent(key, (g) -> {
- Integer id = flowObjectiveService.allocateNextId();
-
- flowObjectiveService.next(sink.deviceId(), nextObject(id, sink.port(), NextType.AddNew));
-
- TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
- .matchInPort(oltInfo.get().uplink())
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(route.group().toIpPrefix());
-
- if (vlanEnabled) {
- mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
- }
-
- ForwardingObjective fwd = DefaultForwardingObjective.builder()
- .fromApp(appId)
- .nextStep(id)
- .makePermanent()
- .withFlag(ForwardingObjective.Flag.VERSATILE)
- .withPriority(priority)
- .withSelector(mcast.build())
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Forwarding objective installed {}", objective);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Forwarding objective failed {}", objective);
- }
- });
-
- flowObjectiveService.forward(sink.deviceId(), fwd);
-
- sync.set(true);
-
- return id;
- });
-
- if (!sync.get()) {
- flowObjectiveService.next(sink.deviceId(), nextObject(nextId, sink.port(), NextType.AddToExisting));
- }
-
- addRemoteRoute(route, sink);
- }
-
- private void addRemoteRoute(McastRoute route, ConnectPoint inPort) {
- checkNotNull(route);
- if (syncHost == null) {
- log.warn("No host configured for synchronization; route will be dropped");
- return;
- }
-
- Optional<AccessAgentData> accessAgent = cordConfigService.getAccessAgent(inPort.deviceId());
- if (!accessAgent.isPresent()) {
- log.warn("No accessAgent config found for in port {}", inPort);
- return;
- }
-
- if (!accessAgent.get().getOltConnectPoint(inPort).isPresent()) {
- log.warn("No OLT configured for in port {}", inPort);
- return;
- }
-
- ConnectPoint oltConnectPoint = accessAgent.get().getOltConnectPoint(inPort).get();
-
- log.debug("Sending route {} to other ONOS {}", route, fabricOnosUrl);
-
- Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
-
- ObjectNode json = codecService.getCodec(McastRoute.class)
- .encode(route, new AbstractWebResource());
-
- try {
- builder.post(Entity.json(json.toString()));
-
- builder = getClientBuilder(fabricOnosUrl + "/sinks/" + route.group() + "/" + route.source());
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode obj = mapper.createObjectNode();
- obj.putArray("sinks").add(oltConnectPoint.deviceId() + "/" + oltConnectPoint.port());
-
- builder.post(Entity.json(obj.toString()));
- } catch (ProcessingException e) {
- log.warn("Unable to send route to remote controller: {}", e.getMessage());
- }
- }
-
- private void removeRemoteRoute(McastRoute route) {
- if (syncHost == null) {
- log.warn("No host configured for synchronization; route will be dropped");
- return;
- }
-
- log.debug("Removing route {} from other ONOS {}", route, fabricOnosUrl);
-
- Invocation.Builder builder = getClientBuilder(fabricOnosUrl)
- .property(ClientProperties.SUPPRESS_HTTP_COMPLIANCE_VALIDATION, true);
-
- ObjectNode json = codecService.getCodec(McastRoute.class)
- .encode(route, new AbstractWebResource());
-
- builder.method("DELETE", Entity.entity(json.asText(),
- MediaType.APPLICATION_OCTET_STREAM));
- }
-
- private void clearRemoteRoutes() {
- if (syncHost == null) {
- log.warn("No host configured for synchronization");
- return;
- }
-
- log.debug("Clearing remote multicast routes from {}", fabricOnosUrl);
-
- Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
- List<McastRoute> mcastRoutes = Lists.newArrayList();
-
- try {
- String response = builder
- .accept(MediaType.APPLICATION_JSON_TYPE)
- .get(String.class);
-
- JsonCodec<McastRoute> routeCodec = codecService.getCodec(McastRoute.class);
- ObjectMapper mapper = new ObjectMapper();
-
-
- ObjectNode node = (ObjectNode) mapper.readTree(response);
- ArrayNode list = (ArrayNode) node.path("routes");
-
- list.forEach(n -> mcastRoutes.add(
- routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
-
- } catch (IOException | ProcessingException e) {
- log.warn("Error clearing remote routes", e);
- }
-
- mcastRoutes.forEach(this::removeRemoteRoute);
- }
-
- private Invocation.Builder getClientBuilder(String uri) {
- ClientConfig config = new ClientConfig();
- Client client = ClientBuilder.newClient(config);
-
- client.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_REST_TIMEOUT_MS);
- client.property(ClientProperties.READ_TIMEOUT, DEFAULT_REST_TIMEOUT_MS);
- client.register(HttpAuthenticationFeature.basic(user, password));
-
- WebTarget wt = client.target(uri);
- return wt.request(JSON_UTF_8.toString());
- }
-
- private class InternalNetworkConfigListener implements NetworkConfigListener {
- @Override
- public void event(NetworkConfigEvent event) {
- switch (event.type()) {
-
- case CONFIG_ADDED:
- case CONFIG_UPDATED:
- if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
- McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
- if (config != null) {
- //TODO: Simply remove flows/groups, hosts will response period query
- // and re-sent IGMP report, so the flows can be rebuild.
- // However, better to remove and re-add mcast flow rules here
- if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
- clearGroups();
- }
- mcastVlan = config.egressVlan().toShort();
- }
- }
- break;
- case CONFIG_REGISTERED:
- case CONFIG_UNREGISTERED:
- case CONFIG_REMOVED:
- break;
- default:
- break;
- }
- }
- }
-
- private class NextKey {
- private DeviceId device;
- private IpAddress group;
- public NextKey(DeviceId deviceId, IpAddress groupAddress) {
- device = deviceId;
- group = groupAddress;
- }
-
- public DeviceId getDevice() {
- return device;
- }
-
- public int hashCode() {
- return com.google.common.base.Objects.hashCode(new Object[]{this.device, this.group});
- }
-
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- } else if (!(obj instanceof NextKey)) {
+ // Custom-built function, when the device is not available we need a fallback mechanism
+ private boolean isLocalLeader(DeviceId deviceId) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ // When the device is available we just check the mastership
+ if (deviceService.isAvailable(deviceId)) {
return false;
- } else {
- NextKey that = (NextKey) obj;
- return this.getClass() == that.getClass() &&
- Objects.equals(this.device, that.device) &&
- Objects.equals(this.group, that.group);
}
+ // Fallback with Leadership service - device id is used as topic
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ // Verify if this node is the leader
+ return clusterService.getLocalNode().id().equals(leader);
}
+ return true;
}
+
}