[VOL-3086][VOLT-3087] Removing flows from all ONOS instances, withdrawing meters

Every instance needs to explicilty purge the local DeviceFlowTable cache because the
FlowRuleManager does not comunicate with the cluster the request to remove all flows
when a device is disconnected.

With this patch also Meters get into PENDING_REMOVE state and get removed upon
VOLTHA/OLT re-connection. A better implementation is to have a new API in ONOS
to forcefully remove the meters.

Change-Id: Ic7afa06e5d7e71017e9b8b795d341058ecbfd054
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 78b8ecf..ad52030 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -50,10 +50,12 @@
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
@@ -144,6 +146,12 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowRuleService flowRuleService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -210,7 +218,7 @@
         // UNI ports
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
-            if (isDeviceMine(d.id())) {
+            if (isLocalLeader(d.id())) {
                 checkAndCreateDeviceFlows(d);
             }
         }
@@ -919,19 +927,20 @@
         return subsService.get(devSerialNo);
     }
 
-    /**
-     * Determines if this instance should handle this device based on
-     * consistent hashing.
-     *
-     * @param id device ID
-     * @return true if this instance should handle the device, otherwise false
-     */
-    private boolean isDeviceMine(DeviceId id) {
-        NodeId nodeId = hasher.hash(id.toString());
-        if (log.isDebugEnabled()) {
-            log.debug("Node that will handle {} is {}", id, nodeId);
+    // Custom-built function, when the device is not available we need a fallback mechanism
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            // When the device is available we just check the mastership
+            if (deviceService.isAvailable(deviceId)) {
+                return false;
+            }
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
         }
-        return nodeId.equals(clusterService.getLocalNode().id());
+        return true;
     }
 
     private boolean isNniPort(Port port) {
@@ -958,8 +967,22 @@
                     return;
                 }
 
+                boolean isLocalLeader = isLocalLeader(devId);
                 // Only handle the event if the device belongs to us
-                if (!isDeviceMine(event.subject().id())) {
+                if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
+                        && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
+                    log.info("Cleaning local state for non master instance upon " +
+                                      "device disconnection {}", devId);
+                    programmedDevices.remove(devId);
+                    // Since no mastership of the device is present upon disconnection
+                    // the method in the FlowRuleManager only empties the local copy
+                    // of the DeviceFlowTable thus this method needs to get called
+                    // on every instance, see how it's done in the InternalDeviceListener
+                    // in FlowRuleManager: no mastership check for purgeOnDisconnection
+                    flowRuleService.purgeFlowRules(devId);
+                    return;
+                } else if (!isLocalLeader) {
+                    log.info("not handling event because of not local leader for {}", devId);
                     return;
                 }
 
@@ -999,7 +1022,7 @@
                     case PORT_REMOVED:
                         if (isUniPort(dev, port)) {
                             removeSubscriber(new ConnectPoint(devId, port.number()));
-                            log.info("eapol will be send for port removed", port);
+                            log.info("eapol will be send for port {} removed", port);
                             oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
                                                                            null,
                                                                            VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);