[VOL-3016] Removing group entirely due to last bucket getting removed

Change-Id: Ic6a3aac80e53b580a4e2666d70c2e438bd151ffa
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
index d107f05..c47f9d1 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
@@ -84,6 +84,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -251,7 +252,8 @@
         networkConfig.removeListener(configListener);
         networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
         eventExecutor.shutdown();
-        clearGroups();
+        eventExecutor = null;
+        groups.clear();
         groups.destroy();
         log.info("Stopped");
     }
@@ -264,35 +266,37 @@
         cordMcastStatisticsService.setInnerVlanValue(assignedInnerVlan());
     }
 
-    public void clearGroups() {
+    private void clearGroups() {
         mcastLock();
         try {
             groups.keySet().forEach(groupInfo -> {
+                NextContent next = groups.get(groupInfo).value();
                 if (!isLocalLeader(groupInfo.getDevice())) {
                     return;
                 }
-                NextContent next = groups.get(groupInfo).value();
-
                 if (next != null) {
-                    ObjectiveContext context = new DefaultObjectiveContext(
-                            (objective) -> log.debug("Successfully remove {}",
-                                                     groupInfo.group),
-                            (objective, error) -> log.warn("Failed to remove {}: {}",
-                                                           groupInfo.group, error));
+                    //On Success of removing the fwd objective we remove also the group.
+                    Consumer<Objective> onSuccess = (objective) -> {
+                        log.debug("Successfully removed fwd objective for {} on {}, " +
+                                          "removing next objective {}", groupInfo.group,
+                                  groupInfo.getDevice(), next.getNextId());
+                        eventExecutor.submit(() -> flowObjectiveService.next(groupInfo.getDevice(),
+                                                                             nextObject(next.getNextId(),
+                                                             null,
+                                                             NextType.Remove, groupInfo.group)));
+                    };
+
+                    ObjectiveContext context =
+                            new DefaultObjectiveContext(onSuccess, (objective, error) ->
+                                    log.warn("Failed to remove {} on {}: {}",
+                                             groupInfo.group, next.getNextId(), error));
                     // remove the flow rule
-                    flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
-                                                                                  groupInfo.group).remove(context));
-                    // remove all ports from the group
-                    next.getOutPorts().stream().forEach(portNumber ->
-                        flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
-                                                                                    portNumber,
-                                                                                    NextType.RemoveFromExisting,
-                                                                                    groupInfo.group))
-                    );
+                    flowObjectiveService.forward(groupInfo.getDevice(),
+                                                 fwdObject(next.getNextId(),
+                                                           groupInfo.group).remove(context));
 
                 }
             });
-            groups.clear();
         } finally {
             mcastUnlock();
         }
@@ -407,21 +411,36 @@
 
         NextKey key = new NextKey(sink.deviceId(), group);
         groups.computeIfPresent(key, (k, v) -> {
-            flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
-                                                                  NextType.RemoveFromExisting, group));
 
             Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
             outPorts.remove(sink.port());
 
             if (outPorts.isEmpty()) {
+                log.debug("No more output ports for group {}, removing next and fwd objectives", group);
+
+                //On Success of removing the fwd objective we remove also the group.
+                Consumer<Objective> onSuccess = (objective) -> {
+                    log.debug("Successfully removed fwd objective for {} on {}, " +
+                                      "removing next objective {}", group, sink, v.getNextId());
+                    eventExecutor.execute(() -> {
+                        //No port is needed since it's a remove Operation
+                        flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(),
+                                                                              null,
+                                                                              NextType.Remove, group));
+                    });
+                };
+
                 // this is the last sink
-                ObjectiveContext context = new DefaultObjectiveContext(
-                        (objective) -> log.debug("Successfully remove {} on {}",
-                                                 group, sink),
+                ObjectiveContext context = new DefaultObjectiveContext(onSuccess,
                         (objective, error) -> log.warn("Failed to remove {} on {}: {}",
                                                        group, sink, error));
                 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
                 flowObjectiveService.forward(sink.deviceId(), fwdObj);
+            } else {
+                log.debug("Group {} has remaining {} ports, removing just {} " +
+                                 "from it's sinks", group, outPorts, sink.port());
+                flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
+                                                                      NextType.RemoveFromExisting, group));
             }
             // remove the whole entity if no out port exists in the port list
             return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
@@ -668,17 +687,23 @@
             }
         }
 
-        DefaultNextObjective.Builder build = DefaultNextObjective.builder()
+        DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
                 .fromApp(appId)
-                .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
                 .withType(NextObjective.Type.BROADCAST)
                 .withId(nextId)
                 .withMeta(metadata.build());
 
-        ObjectiveContext content = new ObjectiveContext() {
+        if (port == null && !nextType.equals(NextType.Remove)) {
+            log.error("Port can't be null with operation {}", nextType);
+            return null;
+        } else if (port != null && !nextType.equals(NextType.Remove)) {
+            builder.addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build());
+        }
+
+        ObjectiveContext context = new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                log.debug("Next Objective {} installed", objective.id());
+                log.debug("Success for operation {} on Next Objective {}", objective.id(), nextType);
             }
 
             @Override
@@ -691,13 +716,13 @@
 
         switch (nextType) {
             case AddNew:
-                return build.add(content);
+                return builder.add(context);
             case AddToExisting:
-                return build.addToExisting(content);
+                return builder.addToExisting(context);
             case Remove:
-                return build.remove(content);
+                return builder.remove(context);
             case RemoveFromExisting:
-                return build.removeFromExisting(content);
+                return builder.removeFromExisting(context);
             default:
                 return null;
         }
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
index 9e64dca..84eb986 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
@@ -346,19 +346,9 @@
        McastEvent event = new McastEvent(McastEvent.Type.SINKS_REMOVED, previousSubject, currentSubject);
        cordMcast.listener.event(event);
 
-       // Operation will be REMOVE_FROM_EXISTING and nextMap will be updated.  None --> { }
+       // Operation will be REMOVE and nextMap will be updated.  None --> { }
        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
-
-       // Output port number will be changed to 24 i.e. PORT_C
-       Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
-       assertTrue(1 == traffictreatMentCollection.size());
-       OutputInstruction output = null;
-       for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
-          output = outputPort(trafficTreatment);
-       }
-       assertNotNull(output);
-       assertTrue(PORT_C == output.port());
+       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE));
   }
 
   @Test
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
index 4ae2825..9271976 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
@@ -131,6 +131,9 @@
           public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
               synchronized (forwardMap) {
                 forwardMap.put(deviceId, forwardingObjective);
+                forwardingObjective.context().ifPresent(context -> {
+                    context.onSuccess(forwardingObjective);
+                });
                 forwardMap.notify();
               }
           }