1.fix the bug that 1 IGMP group cannot be added into 2 devices
2. clear mcast groups and flows while app deactivated; mvlan changes.
Change-Id: I466e8549b82a052896b5bc0fed069114255e3ea3
diff --git a/src/main/java/org/opencord/cordmcast/CordMcast.java b/src/main/java/org/opencord/cordmcast/CordMcast.java
index c965476..078bcbf 100644
--- a/src/main/java/org/opencord/cordmcast/CordMcast.java
+++ b/src/main/java/org/opencord/cordmcast/CordMcast.java
@@ -41,6 +41,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
@@ -48,6 +50,7 @@
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.DefaultNextObjective;
@@ -81,6 +84,7 @@
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -105,7 +109,7 @@
private static final int DEFAULT_REST_TIMEOUT_MS = 1000;
private static final int DEFAULT_PRIORITY = 500;
private static final short DEFAULT_MCAST_VLAN = 4000;
- private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
+ private static final String DEFAULT_SYNC_HOST = "";
private static final String DEFAULT_USER = "karaf";
private static final String DEFAULT_PASSWORD = "karaf";
private static final boolean DEFAULT_VLAN_ENABLED = true;
@@ -131,12 +135,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry networkConfig;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowRuleService;
+
protected McastListener listener = new InternalMulticastListener();
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
//TODO: move this to a ec map
- private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
+ private Map<NextKey, Integer> groups = Maps.newConcurrentMap();
private ApplicationId appId;
private ApplicationId coreAppId;
@@ -207,9 +214,19 @@
componentConfigService.unregisterProperties(getClass(), false);
mcastService.removeListener(listener);
networkConfig.removeListener(configListener);
+ networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
+ clearGroups();
log.info("Stopped");
}
+ public void clearGroups() {
+ groups.keySet().forEach(d -> {
+ flowObjectiveService.next(d.getDevice(), nextObject(groups.get(d), PortNumber.ANY, NextType.Remove));
+ });
+ flowRuleService.removeFlowRulesById(appId);
+ groups.clear();
+ }
+
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
@@ -271,7 +288,47 @@
}
}
+ private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
+
+
+ private NextObjective nextObject(Integer id, PortNumber port, NextType nextType) {
+ DefaultNextObjective.Builder build = DefaultNextObjective.builder()
+ .fromApp(appId)
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
+ .withType(NextObjective.Type.BROADCAST)
+ .withId(id);
+ ObjectiveContext content = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ //TODO: change to debug
+ log.info("Next Objective {} installed", objective.id());
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ //TODO: change to debug
+ log.info("Next Objective {} failed, because {}",
+ objective.id(),
+ error);
+ }
+ };
+
+ switch (nextType) {
+ case AddNew:
+ return build.add(content);
+ case AddToExisting:
+ return build.addToExisting(content);
+ case Remove:
+ return build.remove(content);
+ case RemoveFromExisting:
+ return build.removeFromExisting(content);
+ default:
+ return null;
+ }
+ }
+
private void unprovisionGroup(McastRouteInfo info) {
+
if (info.sinks().isEmpty()) {
removeRemoteRoute(info.route());
}
@@ -282,28 +339,12 @@
}
ConnectPoint loc = info.sink().get();
- NextObjective next = DefaultNextObjective.builder()
- .fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
- .withType(NextObjective.Type.BROADCAST)
- .withId(groups.get(info.route().group()))
- .removeFromExisting(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Next Objective {} failed, because {}",
- objective.id(),
- error);
- }
- });
-
- flowObjectiveService.next(loc.deviceId(), next);
+ NextKey key = new NextKey(loc.deviceId(), info.route().group());
+ if (groups.get(key) == null) {
+ log.warn("No groups on device: {}", loc.deviceId());
+ return;
+ }
+ flowObjectiveService.next(loc.deviceId(), nextObject(groups.get(key), loc.port(), NextType.RemoveFromExisting));
}
private void provisionGroup(McastRoute route, ConnectPoint sink) {
@@ -318,37 +359,16 @@
}
final AtomicBoolean sync = new AtomicBoolean(false);
-
- Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
+ NextKey key = new NextKey(sink.deviceId(), route.group());
+ Integer nextId = groups.computeIfAbsent(key, (g) -> {
Integer id = flowObjectiveService.allocateNextId();
- NextObjective next = DefaultNextObjective.builder()
- .fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
- .withType(NextObjective.Type.BROADCAST)
- .withId(id)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Next Objective {} failed, because {}",
- objective.id(),
- error);
- }
- });
-
- flowObjectiveService.next(sink.deviceId(), next);
+ flowObjectiveService.next(sink.deviceId(), nextObject(id, sink.port(), NextType.AddNew));
TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
.matchInPort(oltInfo.get().uplink())
.matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(g.toIpPrefix());
+ .matchIPDst(route.group().toIpPrefix());
if (vlanEnabled) {
mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
@@ -383,28 +403,7 @@
});
if (!sync.get()) {
- NextObjective next = DefaultNextObjective.builder()
- .fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
- .withType(NextObjective.Type.BROADCAST)
- .withId(nextId)
- .addToExisting(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Next Objective {} failed, because {}",
- objective.id(),
- error);
- }
- });
-
- flowObjectiveService.next(sink.deviceId(), next);
+ flowObjectiveService.next(sink.deviceId(), nextObject(nextId, sink.port(), NextType.AddToExisting));
}
addRemoteRoute(route, sink);
@@ -524,6 +523,12 @@
if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
if (config != null) {
+ //TODO: Simply remove flows/groups, hosts will response period query
+ // and re-sent IGMP report, so the flows can be rebuild.
+ // However, better to remove and re-add mcast flow rules here
+ if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
+ clearGroups();
+ }
mcastVlan = config.egressVlan().toShort();
}
}
@@ -537,5 +542,36 @@
}
}
}
+
+ private class NextKey {
+ private DeviceId device;
+ private IpAddress group;
+ public NextKey(DeviceId deviceId, IpAddress groupAddress) {
+ device = deviceId;
+ group = groupAddress;
+ }
+
+ public DeviceId getDevice() {
+ return device;
+ }
+
+ public int hashCode() {
+ return com.google.common.base.Objects.hashCode(new Object[]{this.device, this.group});
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (!(obj instanceof NextKey)) {
+ return false;
+ } else {
+ NextKey that = (NextKey) obj;
+ return this.getClass() == that.getClass() &&
+ Objects.equals(this.device, that.device) &&
+ Objects.equals(this.group, that.group);
+ }
+ }
+ }
}
+