SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS

Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
new file mode 100644
index 0000000..8c29a2f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.igmpproxy.impl.store.machine.StateMachineStore;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+@Component(immediate = true, service = StateMachineService.class)
+public class StateMachineManager implements StateMachineService {
+    // Only for tests purposes
+    public static boolean sendQuery = true;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StateMachineStore stateMachineStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService igmpLeadershipService;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        log.info("Stopped.");
+    }
+
+    private StateMachine get(DeviceId devId, Ip4Address groupIp) {
+        StateMachineId id = StateMachineId.of(devId, groupIp);
+        return stateMachineStore.getStateMachine(id);
+    }
+
+    public void destroySingle(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine has already been destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return;
+        }
+        stateMachineStore.removeStateMachine(machine.getStateMachineId());
+        stateMachineStore.removeCounter(machine.getStateMachineId());
+    }
+
+    @Override
+    public boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
+        StateMachine machineInstance = get(devId, groupIp);
+
+        if (machineInstance == null) {
+            machineInstance = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
+            stateMachineStore.putStateMachine(machineInstance);
+            stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+            log.debug("Instance of machine created with id: {}", machineInstance.getStateMachineId());
+            boolean shouldSendJoin = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are already active members of this group
+                // across the entire POD
+                shouldSendJoin = false;
+            }
+            machineInstance.join(shouldSendJoin);
+            stateMachineStore.updateStateMachine(machineInstance);
+            return true;
+        }
+        log.debug("Instance of machine has already been created. deviceId: {} and groupIp: {}",
+                devId, groupIp);
+        stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+        return false;
+    }
+
+    @Override
+    public boolean leave(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine has already been left and destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return false;
+        }
+
+        long count = stateMachineStore.decreaseAndGetCounter(machine.getStateMachineId());
+        // make sure machine instance still exists.
+        // it may be removed by the preceding thread
+        if (count == 0) {
+            boolean shouldSendLeave = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are still active members of this group
+                // across the entire POD
+                shouldSendLeave = false;
+            }
+            machine.leave(shouldSendLeave);
+            destroySingle(devId, groupIp);
+            log.debug("This machine left and destroyed. deviceId: {} and groupIp: {}", devId, groupIp);
+            return true;
+        }
+        log.debug("This machine still has member/members. number of member/members: {}, deviceId: {}, groupIp: {} ",
+                count, devId, groupIp);
+        return false;
+    }
+
+    @Override
+    public void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine is not found. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return;
+        }
+        machine.query(maxResp);
+        stateMachineStore.updateStateMachine(machine);
+    }
+
+    @Override
+    public void generalQuery(DeviceId devId, int maxResp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            if (devId.equals(machine.getDeviceId())) {
+                machine.query(maxResp);
+                stateMachineStore.updateStateMachine(machine);
+            }
+        }
+    }
+
+    @Override
+    public void generalQuery(int maxResp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            machine.query(maxResp);
+            stateMachineStore.updateStateMachine(machine);
+        }
+    }
+
+    @Override
+    public void timeOut(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine is not found. deviceId: {} and groupIp: {}", devId, groupIp);
+            return;
+        }
+        machine.timeOut(sendQuery);
+        stateMachineStore.updateStateMachine(machine);
+    }
+
+    @Override
+    public void timeOut1s() {
+        Collection<StateMachine> mapSet = stateMachineStore.getAllStateMachines();
+        for (StateMachine machineInfo : mapSet) {
+            if (machineInfo.getTimeOut() != null) {
+                StateMachineId machineId = machineInfo.getStateMachineId();
+                if (igmpLeadershipService.isLocalLeader(machineId.getDeviceId())) {
+                    if (machineInfo.getTimeOut() > 0) {
+                        stateMachineStore.decreaseTimeout(machineId);
+                    } else {
+                        StateMachine machine = stateMachineStore.getStateMachine(machineId);
+                        machine.timeOut(sendQuery);
+                        machine.destroyTimer();
+                        stateMachineStore.updateStateMachine(machine);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void clearAllMaps() {
+        stateMachineStore.clearAllStateMachines();
+    }
+
+    /**
+     * @param devId   id of the device being excluded
+     * @param groupIp group IP address
+     * @return true if this group has at least one listener connected to
+     * any device in the map except for the device specified; false otherwise.
+     */
+    private boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            if (machine.getStateMachineId().getDeviceId().equals(devId)) {
+                continue;
+            }
+            if (machine.getStateMachineId().getGroupIp().equals(groupIp)) {
+                //means group is being listened by other peers in the domain
+                return true;
+            }
+        }
+        return false;
+    }
+}