[VOL-3016] Removing group entirely due to last bucket getting removed
Change-Id: Ic6a3aac80e53b580a4e2666d70c2e438bd151ffa
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
index d107f05..c47f9d1 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
@@ -84,6 +84,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -251,7 +252,8 @@
networkConfig.removeListener(configListener);
networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
eventExecutor.shutdown();
- clearGroups();
+ eventExecutor = null;
+ groups.clear();
groups.destroy();
log.info("Stopped");
}
@@ -264,35 +266,37 @@
cordMcastStatisticsService.setInnerVlanValue(assignedInnerVlan());
}
- public void clearGroups() {
+ private void clearGroups() {
mcastLock();
try {
groups.keySet().forEach(groupInfo -> {
+ NextContent next = groups.get(groupInfo).value();
if (!isLocalLeader(groupInfo.getDevice())) {
return;
}
- NextContent next = groups.get(groupInfo).value();
-
if (next != null) {
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully remove {}",
- groupInfo.group),
- (objective, error) -> log.warn("Failed to remove {}: {}",
- groupInfo.group, error));
+ //On Success of removing the fwd objective we remove also the group.
+ Consumer<Objective> onSuccess = (objective) -> {
+ log.debug("Successfully removed fwd objective for {} on {}, " +
+ "removing next objective {}", groupInfo.group,
+ groupInfo.getDevice(), next.getNextId());
+ eventExecutor.submit(() -> flowObjectiveService.next(groupInfo.getDevice(),
+ nextObject(next.getNextId(),
+ null,
+ NextType.Remove, groupInfo.group)));
+ };
+
+ ObjectiveContext context =
+ new DefaultObjectiveContext(onSuccess, (objective, error) ->
+ log.warn("Failed to remove {} on {}: {}",
+ groupInfo.group, next.getNextId(), error));
// remove the flow rule
- flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
- groupInfo.group).remove(context));
- // remove all ports from the group
- next.getOutPorts().stream().forEach(portNumber ->
- flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
- portNumber,
- NextType.RemoveFromExisting,
- groupInfo.group))
- );
+ flowObjectiveService.forward(groupInfo.getDevice(),
+ fwdObject(next.getNextId(),
+ groupInfo.group).remove(context));
}
});
- groups.clear();
} finally {
mcastUnlock();
}
@@ -407,21 +411,36 @@
NextKey key = new NextKey(sink.deviceId(), group);
groups.computeIfPresent(key, (k, v) -> {
- flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
- NextType.RemoveFromExisting, group));
Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
outPorts.remove(sink.port());
if (outPorts.isEmpty()) {
+ log.debug("No more output ports for group {}, removing next and fwd objectives", group);
+
+ //On Success of removing the fwd objective we remove also the group.
+ Consumer<Objective> onSuccess = (objective) -> {
+ log.debug("Successfully removed fwd objective for {} on {}, " +
+ "removing next objective {}", group, sink, v.getNextId());
+ eventExecutor.execute(() -> {
+ //No port is needed since it's a remove Operation
+ flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(),
+ null,
+ NextType.Remove, group));
+ });
+ };
+
// this is the last sink
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully remove {} on {}",
- group, sink),
+ ObjectiveContext context = new DefaultObjectiveContext(onSuccess,
(objective, error) -> log.warn("Failed to remove {} on {}: {}",
group, sink, error));
ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
flowObjectiveService.forward(sink.deviceId(), fwdObj);
+ } else {
+ log.debug("Group {} has remaining {} ports, removing just {} " +
+ "from it's sinks", group, outPorts, sink.port());
+ flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
+ NextType.RemoveFromExisting, group));
}
// remove the whole entity if no out port exists in the port list
return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
@@ -668,17 +687,23 @@
}
}
- DefaultNextObjective.Builder build = DefaultNextObjective.builder()
+ DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
.fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
.withType(NextObjective.Type.BROADCAST)
.withId(nextId)
.withMeta(metadata.build());
- ObjectiveContext content = new ObjectiveContext() {
+ if (port == null && !nextType.equals(NextType.Remove)) {
+ log.error("Port can't be null with operation {}", nextType);
+ return null;
+ } else if (port != null && !nextType.equals(NextType.Remove)) {
+ builder.addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build());
+ }
+
+ ObjectiveContext context = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.debug("Next Objective {} installed", objective.id());
+ log.debug("Success for operation {} on Next Objective {}", objective.id(), nextType);
}
@Override
@@ -691,13 +716,13 @@
switch (nextType) {
case AddNew:
- return build.add(content);
+ return builder.add(context);
case AddToExisting:
- return build.addToExisting(content);
+ return builder.addToExisting(context);
case Remove:
- return build.remove(content);
+ return builder.remove(context);
case RemoveFromExisting:
- return build.removeFromExisting(content);
+ return builder.removeFromExisting(context);
default:
return null;
}
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
index 9e64dca..84eb986 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
@@ -346,19 +346,9 @@
McastEvent event = new McastEvent(McastEvent.Type.SINKS_REMOVED, previousSubject, currentSubject);
cordMcast.listener.event(event);
- // Operation will be REMOVE_FROM_EXISTING and nextMap will be updated. None --> { }
+ // Operation will be REMOVE and nextMap will be updated. None --> { }
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
-
- // Output port number will be changed to 24 i.e. PORT_C
- Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
- OutputInstruction output = null;
- for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
- output = outputPort(trafficTreatment);
- }
- assertNotNull(output);
- assertTrue(PORT_C == output.port());
+ assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE));
}
@Test
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
index 4ae2825..9271976 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
@@ -131,6 +131,9 @@
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
synchronized (forwardMap) {
forwardMap.put(deviceId, forwardingObjective);
+ forwardingObjective.context().ifPresent(context -> {
+ context.onSuccess(forwardingObjective);
+ });
forwardMap.notify();
}
}