SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS
Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
index ca68bf6..83f2ef2 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
@@ -15,182 +15,218 @@
*/
package org.opencord.igmpproxy.impl;
-import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.impl.state.DelayMember;
+import org.opencord.igmpproxy.impl.state.IdleMember;
+import org.opencord.igmpproxy.impl.state.NonMember;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.State;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
-import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * State machine for single IGMP group member. The state machine is implemented on
- * RFC 2236 "6. Host State Diagram".
- */
-public class SingleStateMachine {
- // Only for tests purposes
- static boolean sendQuery = true;
- static final int STATE_NON = 0;
- static final int STATE_DELAY = 1;
- static final int STATE_IDLE = 2;
- static final int TRANSITION_JOIN = 0;
- static final int TRANSITION_LEAVE = 1;
- static final int TRANSITION_QUERY = 2;
- static final int TRANSITION_TIMEOUT = 3;
- static final int DEFAULT_MAX_RESP = 0xfffffff;
- static final int DEFAULT_COUNT = 1;
- private DeviceId devId;
- private Ip4Address groupIp;
+/**
+ * State machine implementation.
+ */
+public final class SingleStateMachine implements StateMachine {
+ public static final int DEFAULT_MAX_RESP = 0xfffffff;
+
+ private StateMachineId stateMachineId;
+ private int currentState;
+
private Ip4Address srcIp;
private PortNumber upLinkPort;
- private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
- private int timerId = IgmpTimer.INVALID_TIMER_ID;
- private int timeOut = DEFAULT_MAX_RESP;
- private State[] states =
- {
- new NonMember(), new DelayMember(), new IdleMember()
- };
+ private AtomicInteger timeOut; // unit is 1 second
+ private State[] states;
private int[] nonTransition =
- {STATE_DELAY, STATE_NON, STATE_NON, STATE_NON};
+ {State.STATE_DELAY, State.STATE_NON,
+ State.STATE_NON, State.STATE_NON};
private int[] delayTransition =
- {STATE_DELAY, STATE_NON, STATE_DELAY, STATE_IDLE};
+ {State.STATE_DELAY, State.STATE_NON,
+ State.STATE_DELAY, State.STATE_IDLE};
private int[] idleTransition =
- {STATE_IDLE, STATE_NON, STATE_DELAY, STATE_IDLE};
+ {State.STATE_IDLE, State.STATE_NON,
+ State.STATE_DELAY, State.STATE_IDLE};
//THE TRANSITION TABLE
private int[][] transition =
{nonTransition, delayTransition, idleTransition};
- private int currentState = STATE_NON;
- public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
- this.devId = devId;
- this.groupIp = groupIp;
- this.srcIp = src;
+ /**
+ * Constructor for serializer.
+ */
+ private SingleStateMachine() {
+ this.stateMachineId = null;
+ this.srcIp = null;
+ this.upLinkPort = null;
+ this.timeOut = null;
+ this.states = null;
+ }
+
+ /**
+ * Constructor of single state machine.
+ *
+ * @param deviceId device id of state-machine
+ * @param groupIp group id of state-machine
+ * @param srcIp source ip of state-machine
+ * @param upLinkPort uplink port of state-machine
+ */
+ public SingleStateMachine(DeviceId deviceId,
+ Ip4Address groupIp,
+ Ip4Address srcIp,
+ PortNumber upLinkPort) {
+ this.stateMachineId = StateMachineId.of(deviceId, groupIp);
+ this.srcIp = srcIp;
this.upLinkPort = upLinkPort;
+ this.currentState = State.STATE_NON;
+ this.timeOut = null;
+ this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
}
- public Ip4Address getGroupIp() {
- return groupIp;
- }
-
- public DeviceId getDeviceId() {
- return devId;
- }
- public boolean increaseCounter() {
- count.incrementAndGet();
- return true;
- }
-
- public boolean decreaseCounter() {
- if (count.get() > 0) {
- count.decrementAndGet();
- return true;
+ /**
+ * Constructor of single state machine.
+ *
+ * @param machineId id of state-machine
+ * @param srcIp source ip of state-machine
+ * @param upLinkPort uplink port of state-machine
+ * @param currentState current state of state-machine
+ * @param timeout timeout value of state-machine
+ */
+ public SingleStateMachine(StateMachineId machineId,
+ Ip4Address srcIp,
+ PortNumber upLinkPort,
+ int currentState,
+ Integer timeout) {
+ this.stateMachineId = machineId;
+ this.srcIp = srcIp;
+ this.upLinkPort = upLinkPort;
+ this.currentState = currentState;
+ if (timeout != null) {
+ createTimeOut(timeout);
} else {
- return false;
+ this.timeOut = null;
}
+ this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
}
- public int getCounter() {
- return count.get();
+ @Override
+ public StateMachineId getStateMachineId() {
+ return this.stateMachineId;
}
+
+ @Override
+ public Ip4Address getGroupIp() {
+ return this.stateMachineId.getGroupIp();
+ }
+
+ @Override
+ public DeviceId getDeviceId() {
+ return this.stateMachineId.getDeviceId();
+ }
+
+
+ @Override
+ public Ip4Address getSrcIp() {
+ return srcIp;
+ }
+
+ public PortNumber getUpLinkPort() {
+ return upLinkPort;
+ }
+
+ @Override
public int currentState() {
- return currentState;
+ return this.currentState;
+ }
+
+ public Integer getTimeOut() {
+ return timeOut == null ? null : timeOut.get();
+ }
+
+ @Override
+ public void increaseTimeOut() {
+ this.timeOut.getAndIncrement();
+ }
+
+ @Override
+ public void decreaseTimeOut() {
+ this.timeOut.getAndDecrement();
+ }
+
+ @Override
+ public void setMaxTimeout() {
+ this.timeOut.set(DEFAULT_MAX_RESP);
+ }
+
+ @Override
+ public void startTimer(int timeout) {
+ createTimeOut(timeout);
+ }
+
+ @Override
+ public void resetTimer(int timeout) {
+ setTimeOut(timeout);
+ }
+
+ @Override
+ public void destroyTimer() {
+ setTimeOut(null);
+ }
+
+ @Override
+ public void join(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ State state = states[currentState];
+ state.join();
+ }
+ next(State.TRANSITION_JOIN);
+ }
+
+ @Override
+ public void leave(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ State state = states[currentState];
+ state.leave();
+ }
+ next(State.TRANSITION_LEAVE);
+ }
+
+ @Override
+ public void query(int maxResp) {
+ State state = states[currentState];
+ state.query(maxResp);
+ next(State.TRANSITION_QUERY);
+ }
+
+ @Override
+ public void timeOut(boolean sendQuery) {
+ if (sendQuery) {
+ State state = states[currentState];
+ state.timeOut();
+ }
+ next(State.TRANSITION_TIMEOUT);
}
private void next(int msg) {
- currentState = transition[currentState][msg];
+ this.currentState = transition[currentState][msg];
}
- public void join(boolean messageOutAllowed) {
- states[currentState].join(messageOutAllowed);
- next(TRANSITION_JOIN);
+ private void setTimeOut(int timeout) {
+ timeOut.set(timeout);
}
- public void leave(boolean messageOutAllowed) {
- states[currentState].leave(messageOutAllowed);
- next(TRANSITION_LEAVE);
- }
-
- public void query(int maxResp) {
- states[currentState].query(maxResp);
- next(TRANSITION_QUERY);
- }
-
- public void timeOut() {
- states[currentState].timeOut();
- next(TRANSITION_TIMEOUT);
- }
-
- int getTimeOut(int maxTimeOut) {
- Random random = new Random();
- return Math.abs(random.nextInt()) % maxTimeOut;
- }
-
- protected void cancelTimer() {
- if (IgmpTimer.INVALID_TIMER_ID != timerId) {
- IgmpTimer.cancel(timerId);
+ private void setTimeOut(Integer timeout) {
+ if (timeout == null) {
+ timeOut = null;
+ } else {
+ timeOut.set(timeout);
}
}
- class State {
- public void join(boolean messageOutAllowed) {
- }
-
- public void leave(boolean messageOutAllowed) {
- if (messageOutAllowed) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2Leave(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- }
- }
-
- public void query(int maxResp) {
- }
-
- public void timeOut() {
- }
-
- }
-
- class NonMember extends State {
- public void join(boolean messageOutAllowed) {
- if (messageOutAllowed) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2Join(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
- timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
- }
- }
- }
-
- class DelayMember extends State {
- public void query(int maxResp) {
- if (maxResp < timeOut) {
- timeOut = getTimeOut(maxResp);
- timerId = IgmpTimer.reset(timerId, SingleStateMachine.this, timeOut);
- }
- }
-
- public void timeOut() {
- if (sendQuery) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2ResponseQuery(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- timeOut = DEFAULT_MAX_RESP;
- }
- }
-
- }
-
- class IdleMember extends State {
- public void query(int maxResp) {
- timeOut = getTimeOut(maxResp);
- timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
- }
+ private void createTimeOut(Integer timeout) {
+ timeOut = new AtomicInteger(timeout);
}
}