SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS
Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
new file mode 100644
index 0000000..8c29a2f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.igmpproxy.impl.store.machine.StateMachineStore;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+@Component(immediate = true, service = StateMachineService.class)
+public class StateMachineManager implements StateMachineService {
+ // Only for tests purposes
+ public static boolean sendQuery = true;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StateMachineStore stateMachineStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpLeadershipService igmpLeadershipService;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ log.info("Stopped.");
+ }
+
+ private StateMachine get(DeviceId devId, Ip4Address groupIp) {
+ StateMachineId id = StateMachineId.of(devId, groupIp);
+ return stateMachineStore.getStateMachine(id);
+ }
+
+ public void destroySingle(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine has already been destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return;
+ }
+ stateMachineStore.removeStateMachine(machine.getStateMachineId());
+ stateMachineStore.removeCounter(machine.getStateMachineId());
+ }
+
+ @Override
+ public boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
+ StateMachine machineInstance = get(devId, groupIp);
+
+ if (machineInstance == null) {
+ machineInstance = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
+ stateMachineStore.putStateMachine(machineInstance);
+ stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+ log.debug("Instance of machine created with id: {}", machineInstance.getStateMachineId());
+ boolean shouldSendJoin = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are already active members of this group
+ // across the entire POD
+ shouldSendJoin = false;
+ }
+ machineInstance.join(shouldSendJoin);
+ stateMachineStore.updateStateMachine(machineInstance);
+ return true;
+ }
+ log.debug("Instance of machine has already been created. deviceId: {} and groupIp: {}",
+ devId, groupIp);
+ stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+ return false;
+ }
+
+ @Override
+ public boolean leave(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine has already been left and destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return false;
+ }
+
+ long count = stateMachineStore.decreaseAndGetCounter(machine.getStateMachineId());
+ // make sure machine instance still exists.
+ // it may be removed by the preceding thread
+ if (count == 0) {
+ boolean shouldSendLeave = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are still active members of this group
+ // across the entire POD
+ shouldSendLeave = false;
+ }
+ machine.leave(shouldSendLeave);
+ destroySingle(devId, groupIp);
+ log.debug("This machine left and destroyed. deviceId: {} and groupIp: {}", devId, groupIp);
+ return true;
+ }
+ log.debug("This machine still has member/members. number of member/members: {}, deviceId: {}, groupIp: {} ",
+ count, devId, groupIp);
+ return false;
+ }
+
+ @Override
+ public void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine is not found. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return;
+ }
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+
+ @Override
+ public void generalQuery(DeviceId devId, int maxResp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ if (devId.equals(machine.getDeviceId())) {
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+ }
+
+ @Override
+ public void generalQuery(int maxResp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+
+ @Override
+ public void timeOut(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine is not found. deviceId: {} and groupIp: {}", devId, groupIp);
+ return;
+ }
+ machine.timeOut(sendQuery);
+ stateMachineStore.updateStateMachine(machine);
+ }
+
+ @Override
+ public void timeOut1s() {
+ Collection<StateMachine> mapSet = stateMachineStore.getAllStateMachines();
+ for (StateMachine machineInfo : mapSet) {
+ if (machineInfo.getTimeOut() != null) {
+ StateMachineId machineId = machineInfo.getStateMachineId();
+ if (igmpLeadershipService.isLocalLeader(machineId.getDeviceId())) {
+ if (machineInfo.getTimeOut() > 0) {
+ stateMachineStore.decreaseTimeout(machineId);
+ } else {
+ StateMachine machine = stateMachineStore.getStateMachine(machineId);
+ machine.timeOut(sendQuery);
+ machine.destroyTimer();
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clearAllMaps() {
+ stateMachineStore.clearAllStateMachines();
+ }
+
+ /**
+ * @param devId id of the device being excluded
+ * @param groupIp group IP address
+ * @return true if this group has at least one listener connected to
+ * any device in the map except for the device specified; false otherwise.
+ */
+ private boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ if (machine.getStateMachineId().getDeviceId().equals(devId)) {
+ continue;
+ }
+ if (machine.getStateMachineId().getGroupIp().equals(groupIp)) {
+ //means group is being listened by other peers in the domain
+ return true;
+ }
+ }
+ return false;
+ }
+}