SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS
Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java b/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java
new file mode 100644
index 0000000..137a259
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.Objects;
+
+/**
+ * Identification of group member.
+ */
+public final class GroupMemberId {
+ private Ip4Address groupIp;
+ private DeviceId deviceId;
+ private PortNumber portNum;
+
+ /**
+ * Constructor for serializer.
+ */
+ private GroupMemberId() {
+ this.groupIp = null;
+ this.deviceId = null;
+ this.portNum = null;
+ }
+
+ private GroupMemberId(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+ this.groupIp = groupIp;
+ this.deviceId = deviceId;
+ this.portNum = portNum;
+ }
+
+ /**
+ * Creates new group member identification.
+ *
+ * @param groupIp group ip of member
+ * @param deviceId device id
+ * @param portNum port number of connect point
+ * @return new identification of group-member
+ */
+ public static GroupMemberId of(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+ return new GroupMemberId(groupIp, deviceId, portNum);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("groupIp", groupIp)
+ .add("deviceId", deviceId)
+ .add("portNumber", portNum).toString();
+ }
+
+ /**
+ * Get group ip of group.
+ *
+ * @return group ip
+ */
+ public Ip4Address getGroupIp() {
+ return groupIp;
+ }
+
+ /**
+ * Get device id of connect point.
+ *
+ * @return device id
+ */
+ public DeviceId getDeviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Get port number of connect point.
+ *
+ * @return port number
+ */
+ public PortNumber getPortNum() {
+ return portNum;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GroupMemberId that = (GroupMemberId) o;
+ return Objects.equals(groupIp, that.groupIp) &&
+ Objects.equals(deviceId, that.deviceId) &&
+ Objects.equals(portNum, that.portNum);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupIp, deviceId, portNum);
+ }
+}
+
diff --git a/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java b/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java
new file mode 100644
index 0000000..370148a
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * Custom serializer for {@link GroupMemberId}.
+ */
+public class GroupMemberIdSerializer extends Serializer<GroupMemberId> {
+ /**
+ * Creates serializer instance.
+ */
+ public GroupMemberIdSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, GroupMemberId groupMemberId) {
+ output.writeString(groupMemberId.getDeviceId().toString());
+ kryo.writeClassAndObject(output, groupMemberId.getGroupIp());
+ kryo.writeClassAndObject(output, groupMemberId.getPortNum());
+ }
+
+ @Override
+ public GroupMemberId read(Kryo kryo, Input input, Class<GroupMemberId> aClass) {
+ DeviceId deviceId = DeviceId.deviceId(input.readString());
+ Ip4Address groupIp = (Ip4Address) kryo.readClassAndObject(input);
+ PortNumber portNum = (PortNumber) kryo.readClassAndObject(input);
+ return GroupMemberId.of(groupIp, deviceId, portNum);
+ }
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
new file mode 100644
index 0000000..1b8d0fe
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Leadership control service.
+ */
+public interface IgmpLeadershipService {
+ /**
+ * Makes leadership control.
+ *
+ * @param deviceId received deviceId
+ * @return if it is leadership of this device, return true
+ */
+ boolean isLocalLeader(DeviceId deviceId);
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/package-info.java b/api/src/main/java/org/opencord/igmpproxy/package-info.java
index a2f9362..f261bd7 100644
--- a/api/src/main/java/org/opencord/igmpproxy/package-info.java
+++ b/api/src/main/java/org/opencord/igmpproxy/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Created by onos on 17-3-9.
+ * Package of igmpproxy.
*/
package org.opencord.igmpproxy;
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java
new file mode 100644
index 0000000..022d509
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+/**
+ * State of machine.
+ */
+public interface State {
+ /**
+ * Default state.
+ */
+ int STATE_NON = 0;
+ /**
+ * Delay state.
+ */
+ int STATE_DELAY = 1;
+ /**
+ * Idle state.
+ */
+ int STATE_IDLE = 2;
+
+ /**
+ * Transition to join.
+ */
+ int TRANSITION_JOIN = 0;
+ /**
+ * Transition to leave.
+ */
+ int TRANSITION_LEAVE = 1;
+ /**
+ * Transition to query.
+ */
+ int TRANSITION_QUERY = 2;
+ /**
+ * Transition to timeout.
+ */
+ int TRANSITION_TIMEOUT = 3;
+
+ /**
+ * Makes the requirements of join request.
+ */
+ void join();
+
+ /**
+ * Makes the requirements of leave request.
+ */
+ void leave();
+
+ /**
+ * Makes the requirements of query request.
+ *
+ * @param maxResp maximum resp
+ */
+ void query(int maxResp);
+
+ /**
+ * Makes the requirements of timeout .
+ */
+ void timeOut();
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java
new file mode 100644
index 0000000..ae79931
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * State machine object.
+ */
+public interface StateMachine {
+ /**
+ * Returns identification of state-machine.
+ *
+ * @return identification
+ */
+ StateMachineId getStateMachineId();
+
+ /**
+ * Returns group ip of state-machine.
+ *
+ * @return ip of group
+ */
+ Ip4Address getGroupIp();
+
+ /**
+ * Returns device id of state-machine.
+ *
+ * @return device id
+ */
+ DeviceId getDeviceId();
+
+ /**
+ * Returns source ip of state-machine.
+ *
+ * @return source ip
+ */
+ Ip4Address getSrcIp();
+
+ /**
+ * Returns up-link port of state-machine.
+ *
+ * @return up-link port
+ */
+ PortNumber getUpLinkPort();
+
+ /**
+ * Returns timeout. it is nullable.
+ *
+ * @return timeout
+ */
+ Integer getTimeOut();
+
+ /**
+ * Set timer to max timeout.
+ */
+ void setMaxTimeout();
+
+ /**
+ * Start timer.
+ *
+ * @param timeout timeout
+ */
+ void startTimer(int timeout);
+
+ /**
+ * Resets timer.
+ *
+ * @param timeout timeout of timer
+ */
+ void resetTimer(int timeout);
+
+ /**
+ * Destroy timeout.
+ */
+ void destroyTimer();
+
+ /**
+ * Increases timeout of timer.
+ */
+ void increaseTimeOut();
+
+ /**
+ * Decreases timeout of timer.
+ */
+ void decreaseTimeOut();
+
+ /**
+ * Returns current state.
+ *
+ * @return current state
+ */
+ int currentState();
+
+ /**
+ * Makes the requirements of join request and transition to next station.
+ *
+ * @param messageOutAllowed message out allowed
+ */
+ void join(boolean messageOutAllowed);
+
+ /**
+ * Makes the requirements of leave request and transition to next station.
+ *
+ * @param messageOutAllowed message out allowed
+ */
+ void leave(boolean messageOutAllowed);
+
+ /**
+ * Makes the requirements of query request and transition to next station.
+ *
+ * @param maxResp maximum resp
+ */
+ void query(int maxResp);
+
+ /**
+ * Makes the requirements of timeout request and transition to next station.
+ *
+ * @param sendQuery send query for test
+ */
+ void timeOut(boolean sendQuery);
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java
new file mode 100644
index 0000000..38d2bf1
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+
+import java.util.Objects;
+
+/**
+ * Identification of state machine.
+ */
+public final class StateMachineId {
+ private DeviceId deviceId;
+ private Ip4Address groupIp;
+
+ /**
+ * Constructor for serializer.
+ */
+ private StateMachineId() {
+ this.deviceId = null;
+ this.groupIp = null;
+ }
+
+ private StateMachineId(DeviceId deviceId, Ip4Address groupIp) {
+ this.deviceId = deviceId;
+ this.groupIp = groupIp;
+ }
+
+ /**
+ * Creates new state-machine identification using device-id and group-ip.
+ *
+ * @param deviceId device id of state-machie
+ * @param groupIp group ip of state-machine
+ * @return created identification
+ */
+ public static StateMachineId of(DeviceId deviceId, Ip4Address groupIp) {
+ return new StateMachineId(deviceId, groupIp);
+ }
+
+ /**
+ * Returns device id of identification.
+ *
+ * @return device id
+ */
+ public DeviceId getDeviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Returns group ip of identification.
+ *
+ * @return group ip
+ */
+ public Ip4Address getGroupIp() {
+ return groupIp;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("deviceId", deviceId)
+ .add("groupIp", groupIp)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StateMachineId that = (StateMachineId) o;
+ return Objects.equals(deviceId, that.deviceId) &&
+ Objects.equals(groupIp, that.groupIp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, groupIp);
+ }
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java
new file mode 100644
index 0000000..f7c747e
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Custom serializer for {@link StateMachineId}.
+ */
+public class StateMachineIdSerializer extends Serializer<StateMachineId> {
+ /**
+ * Creates serializer instance.
+ */
+ public StateMachineIdSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, StateMachineId stateMachineId) {
+ output.writeString(stateMachineId.getDeviceId().toString());
+ kryo.writeClassAndObject(output, stateMachineId.getGroupIp());
+ }
+
+ @Override
+ public StateMachineId read(Kryo kryo, Input input, Class<StateMachineId> aClass) {
+ DeviceId deviceId = DeviceId.deviceId(input.readString());
+ Ip4Address groupIp = (Ip4Address) kryo.readClassAndObject(input);
+ return StateMachineId.of(deviceId, groupIp);
+ }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java
new file mode 100644
index 0000000..c8ed49e
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * Manage State Machine and Igmp Timer.
+ */
+public interface StateMachineService {
+
+ /**
+ * Makes the requirements of igmp-join request.
+ *
+ * @param devId device id of connect point
+ * @param groupIp group ip of igmp group
+ * @param srcIP source ip of device
+ * @param upLinkPort uplink port of device
+ * @return if this is first join from the device and group-ip return true
+ */
+ boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort);
+
+ /**
+ * Makes the requirements of igmp-leave request.
+ *
+ * @param devId device id of connect point
+ * @param groupIp group ip of group-member
+ * @return If there are no more members of that device and group returns true
+ */
+ boolean leave(DeviceId devId, Ip4Address groupIp);
+
+ /**
+ * clear map of state-machine.
+ */
+ void clearAllMaps();
+
+ /**
+ * Makes the requirements of special query.
+ *
+ * @param devId device id
+ * @param groupIp group ip of igmp group
+ * @param maxResp maximum resp of igmp
+ */
+ void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp);
+
+ /**
+ * Makes the requirements of igmp-query request.
+ *
+ * @param devId device id
+ * @param maxResp maximum resp of igmp
+ */
+ void generalQuery(DeviceId devId, int maxResp);
+
+ /**
+ * Makes the requirements of igmp-query request.
+ *
+ * @param maxResp maximum resp of igmp
+ */
+ void generalQuery(int maxResp);
+
+ /**
+ * Makes the requirements of timeout.
+ *
+ * @param devId device id
+ * @param groupIp group ip of igmp group
+ */
+ void timeOut(DeviceId devId, Ip4Address groupIp);
+
+ /**
+ * Checks all state-machines periodically.
+ */
+ void timeOut1s();
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java
new file mode 100644
index 0000000..3270750
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package of state machine.
+ */
+package org.opencord.igmpproxy.statemachine;
\ No newline at end of file
diff --git a/app/pom.xml b/app/pom.xml
index 8848b78..f9b38ea 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -61,6 +61,19 @@
<dependency>
<groupId>org.onosproject</groupId>
+ <artifactId>onos-core-common</artifactId>
+ <version>${onos.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${onos.version}</version>
<scope>provided</scope>
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
new file mode 100644
index 0000000..4f2e7bb
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Leadership service implementation.
+ */
+@Component(immediate = true, service = IgmpLeadershipService.class)
+public class IgmpLeadershipManager implements IgmpLeadershipService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ log.info("Stopped.");
+ }
+
+ @Override
+ public boolean isLocalLeader(DeviceId deviceId) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ if (deviceService.isAvailable(deviceId)) {
+ return false;
+ }
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ return clusterService.getLocalNode().id().equals(leader);
+ }
+ return true;
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index b985b14..97e97cc 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -15,10 +15,14 @@
*/
package org.opencord.igmpproxy.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.net.Device;
+import org.opencord.igmpproxy.IgmpLeadershipService;
import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -114,7 +118,6 @@
private static final Class<McastConfig> MCAST_CONFIG_CLASS =
McastConfig.class;
- public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
private static ApplicationId appId;
private static int unSolicitedTimeout = 3; // unit is 1 sec
@@ -176,6 +179,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected IgmpStatisticsService igmpStatisticsManager;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected GroupMemberStore groupMemberStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StateMachineService stateMachineService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpLeadershipService igmpLeadershipService;
+
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
@@ -217,15 +229,15 @@
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
- CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
+ private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
+ CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
@Activate
protected void activate() {
appId = coreService.registerApplication(APP_NAME);
coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
packetService.addProcessor(processor, PacketProcessor.director(4));
- IgmpSender.init(packetService, mastershipService, igmpStatisticsManager);
+ IgmpSender.init(packetService, igmpLeadershipService, igmpStatisticsManager);
networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
networkConfig.registerConfigFactory(igmpproxyConfigFactory);
@@ -249,7 +261,7 @@
deviceService.addListener(deviceListener);
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
- "events-igmp-%d", log));
+ "events-igmp-%d", log));
log.info("Started");
}
@@ -285,10 +297,10 @@
Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
maxResp = calculateMaxResp(maxResp);
if (gAddr != null && !gAddr.isZero()) {
- StateMachine.specialQuery(deviceId, gAddr, maxResp);
+ stateMachineService.specialQuery(deviceId, gAddr, maxResp);
igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
} else {
- StateMachine.generalQuery(deviceId, maxResp);
+ stateMachineService.generalQuery(deviceId, maxResp);
igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
}
}
@@ -303,7 +315,7 @@
deviceService.getAvailableDevices().forEach(device -> {
Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
if (accessDevice.isPresent()) {
- StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+ stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
}
});
@@ -311,7 +323,7 @@
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
- StateMachine.generalQuery(maxResponseTime);
+ stateMachineService.generalQuery(maxResponseTime);
igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
}
}
@@ -367,7 +379,7 @@
if (pimSSmInterworking) {
src = ssmTranslateRoute(groupIp);
if (src == null) {
- log.info("no ssm translate for group " + groupIp.toString());
+ log.info("no ssm translate for group {}", groupIp);
return;
}
} else {
@@ -384,8 +396,8 @@
join = false;
}
}
- String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
- GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+ GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+ GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
if (join) {
igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
@@ -394,7 +406,7 @@
if (!sourceConfigured.isPresent()) {
igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
log.warn("Unable to process IGMP Join from {} since no source " +
- "configuration is found.", deviceId);
+ "configuration is found.", deviceId);
igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
return;
}
@@ -402,7 +414,7 @@
Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
if (deviceUplink.isEmpty()) {
log.warn("Unable to process IGMP Join since uplink port " +
- "of the device {} is not found.", deviceId);
+ "of the device {} is not found.", deviceId);
return;
}
@@ -414,14 +426,15 @@
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
if (isJoined) {
igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
} else {
igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
}
- groupMemberMap.put(groupMemberKey, groupMember);
+ groupMemberStore.putGroupMember(groupMember);
+ log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
@@ -438,11 +451,13 @@
groupMember.resetAllTimers();
groupMember.updateList(recordType, sourceList);
groupMember.setLeave(false);
+ //put updated member to the store
+ groupMemberStore.putGroupMember(groupMember);
} else {
igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
if (groupMember == null) {
- log.info("receive leave but no instance, group " + groupIp.toString() +
- " device:" + deviceId.toString() + " port:" + portNumber.toString());
+ log.info("receive leave but no instance, group {} device: {} port:{}",
+ groupIp, deviceId, portNumber);
igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
return;
} else {
@@ -451,6 +466,8 @@
leaveAction(groupMember);
} else {
sendQuery(groupMember);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
}
}
}
@@ -459,11 +476,11 @@
private void leaveAction(GroupMember groupMember) {
igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
- StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+ stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
new McastRoute(source, groupMember.getGroupIp(),
- McastRoute.Type.IGMP), Sets.newHashSet(cp)));
- groupMemberMap.remove(groupMember.getId());
+ McastRoute.Type.IGMP), Sets.newHashSet(cp)));
+ groupMemberStore.removeGroupMember(groupMember.getGroupMemberId());
}
private void sendQuery(GroupMember groupMember) {
@@ -517,7 +534,7 @@
if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
!getSubscriberAndDeviceInformation(deviceId).isPresent()) {
- log.error("Device not registered in netcfg :" + deviceId.toString());
+ log.error("Device not registered in netcfg : {}", deviceId);
igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
return;
}
@@ -525,7 +542,7 @@
IGMP igmp = (IGMP) ipv4Pkt.getPayload();
Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
- PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+ PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
switch (igmp.getIgmpType()) {
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
@@ -535,8 +552,8 @@
log.info("IGMP Picked up query from connectPoint");
//OK to process packet
processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
- pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
break;
} else {
//Not OK to process packet
@@ -546,7 +563,7 @@
}
processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ 0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
@@ -566,14 +583,14 @@
break;
default:
- log.warn("Unknown IGMP message type:" + igmp.getIgmpType());
+ log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
break;
}
} catch (Exception ex) {
- log.error("igmp process error : {} ", ex);
+ log.error("igmp process error : ", ex);
}
});
}
@@ -592,14 +609,14 @@
IGMPGroup group = itr.next();
if (group instanceof IGMPMembership) {
processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
} else {
IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE :
- IGMPMembership.MODE_IS_INCLUDE);
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
}
}
@@ -608,7 +625,7 @@
private class IgmpProxyTimerTask extends TimerTask {
public void run() {
try {
- IgmpTimer.timeOut1s();
+ stateMachineService.timeOut1s();
queryMembers();
} catch (Exception ex) {
log.warn("Igmp timer task error : {}", ex.getMessage());
@@ -617,13 +634,14 @@
private void queryMembers() {
GroupMember groupMember;
- Set groupMemberSet = groupMemberMap.entrySet();
- Iterator itr = groupMemberSet.iterator();
- while (itr.hasNext()) {
- Map.Entry entry = (Map.Entry) itr.next();
- groupMember = (GroupMember) entry.getValue();
+ Set<GroupMemberId> keySet = groupMemberStore.getAllGroupMemberIds();
+ for (GroupMemberId key : keySet) {
+ groupMember = groupMemberStore.getGroupMember(key);
+ if (groupMember == null) {
+ continue;
+ }
DeviceId did = groupMember.getDeviceId();
- if (mastershipService.isLocalMaster(did)) {
+ if (igmpLeadershipService.isLocalLeader(did)) {
if (groupMember.isLeave()) {
lastQuery(groupMember);
} else if (periodicQuery) {
@@ -636,10 +654,14 @@
private void lastQuery(GroupMember groupMember) {
if (groupMember.getLastQueryInterval() < lastQueryInterval) {
groupMember.lastQueryInterval(true); // count times
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
sendQuery(groupMember);
groupMember.lastQueryInterval(false); // reset count number
groupMember.lastQueryCount(true); //count times
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
leaveAction(groupMember);
}
@@ -648,10 +670,14 @@
private void periodicQuery(GroupMember groupMember) {
if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
groupMember.keepAliveInterval(true);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
sendQuery(groupMember);
groupMember.keepAliveInterval(false);
groupMember.keepAliveQueryCount(true);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
leaveAction(groupMember);
}
@@ -670,12 +696,11 @@
}
PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
return validateUpLinkPort(device.id(), portNumber) ?
- Optional.of(portNumber) : Optional.empty();
+ Optional.of(portNumber) : Optional.empty();
}
/**
- *
- * @param deviceId device id
+ * @param deviceId device id
* @param portNumber port number
* @return true if the port name starts with NNI_PREFIX; false otherwise.
*/
@@ -688,8 +713,8 @@
boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
if (!isValid) {
- log.warn("Port cannot be validated; it is not configured as an NNI port." +
- "Device/port: {}/{}", deviceId, portNumber);
+ log.warn("Port cannot be validated; it is not configured as an NNI port. Device/port: {}/{}",
+ deviceId, portNumber);
}
return isValid;
}
@@ -719,14 +744,14 @@
@Override
public void onSuccess(Objective objective) {
log.info("Igmp filter for {} on {} {}.",
- devId, port, (remove) ? REMOVED : INSTALLED);
+ devId, port, (remove) ? REMOVED : INSTALLED);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.info("Igmp filter {} for device {} on port {} failed because of {}",
- (remove) ? INSTALLATION : REMOVAL, devId, port,
- error);
+ (remove) ? INSTALLATION : REMOVAL, devId, port,
+ error);
}
});
@@ -809,7 +834,7 @@
case PORT_ADDED:
port = p.number();
if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
} else if (isUplink(devId, port)) {
provisionUplinkFlows();
@@ -820,7 +845,7 @@
case PORT_UPDATED:
port = p.number();
if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
if (event.port().isEnabled()) {
processFilterObjective(devId, port, false);
} else {
@@ -950,7 +975,7 @@
if (config != null && mvlan != config.egressVlan().toShort()) {
mvlan = config.egressVlan().toShort();
IgmpSender.getInstance().setMvlan(mvlan);
- groupMemberMap.values().forEach(m -> leaveAction(m));
+ groupMemberStore.getAllGroupMembers().forEach(m -> leaveAction(m));
}
}
@@ -1009,6 +1034,7 @@
processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
}
+
private void unprovisionConnectPointFlows() {
if (connectPoint == null) {
return;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
index 39d6046..adf0e9e 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
@@ -24,7 +24,6 @@
import org.onlab.packet.IPv4;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.MacAddress;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -32,6 +31,7 @@
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
import org.opencord.igmpproxy.IgmpStatisticsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@
private static IgmpSender instance = null;
private PacketService packetService;
- private MastershipService mastershipService;
+ private IgmpLeadershipService igmpLeadershipService;
private IgmpStatisticsService igmpStatisticsService;
private boolean withRAUplink = true;
private boolean withRADownlink = false;
@@ -61,16 +61,16 @@
private int maxResp = DEFAULT_MEX_RESP;
private Logger log = LoggerFactory.getLogger(getClass());
- private IgmpSender(PacketService packetService, MastershipService mastershipService,
+ private IgmpSender(PacketService packetService, IgmpLeadershipService igmpLeadershipService,
IgmpStatisticsService igmpStatisticsService) {
this.packetService = packetService;
- this.mastershipService = mastershipService;
+ this.igmpLeadershipService = igmpLeadershipService;
this.igmpStatisticsService = igmpStatisticsService;
}
- public static void init(PacketService packetService, MastershipService mastershipService,
+ public static void init(PacketService packetService, IgmpLeadershipService igmpLeadershipService,
IgmpStatisticsService igmpStatisticsService) {
- instance = new IgmpSender(packetService, mastershipService, igmpStatisticsService);
+ instance = new IgmpSender(packetService, igmpLeadershipService, igmpStatisticsService);
}
public static IgmpSender getInstance() {
@@ -170,6 +170,7 @@
case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
if (igmpMembership == null) {
+ log.debug("Igmp membership is not found. igmp-type {} ", type);
return null;
}
igmpPacket.addGroup(igmpMembership);
@@ -183,6 +184,7 @@
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
if (igmpMembership == null) {
+ log.debug("Igmp membership is not found. igmp-type {} ", type);
return null;
}
igmpPacket.addGroup(igmpMembership);
@@ -192,6 +194,7 @@
ip4Packet.setDestinationAddress(dst);
break;
default:
+ log.debug("Unknown igmp type: {} ", type);
igmpStatisticsService.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
return null;
}
@@ -225,7 +228,7 @@
}
public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
- if (!mastershipService.isLocalMaster(deviceId)) {
+ if (!igmpLeadershipService.isLocalLeader(deviceId)) {
return;
}
@@ -241,7 +244,7 @@
}
public void sendIgmpPacket(Ethernet ethPkt, DeviceId deviceId, PortNumber portNumber) {
- if (!mastershipService.isLocalMaster(deviceId)) {
+ if (!igmpLeadershipService.isLocalLeader(deviceId)) {
return;
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
deleted file mode 100644
index c4a17c9..0000000
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.igmpproxy.impl;
-
-import com.google.common.collect.Maps;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implement the timer for igmp state machine.
- */
-public final class IgmpTimer {
-
- public static final int INVALID_TIMER_ID = 0;
- public static int timerId = INVALID_TIMER_ID + 1;
- private static Map<Integer, SingleTimer> igmpTimerMap = Maps.newConcurrentMap();
-
- private IgmpTimer(){
-
- }
- private static int getId() {
- return timerId++;
- }
-
- public static int start(SingleStateMachine machine, int timeOut) {
- int id = getId();
- igmpTimerMap.put(id, new SingleTimer(machine, timeOut));
- return id;
- }
-
- public static int reset(int oldId, SingleStateMachine machine, int timeOut) {
- igmpTimerMap.remove(new Integer(oldId));
- int id = getId();
- igmpTimerMap.put(new Integer(id), new SingleTimer(machine, timeOut));
- return id;
- }
-
- public static void cancel(int id) {
- igmpTimerMap.remove(new Integer(id));
- }
-
-
- static void timeOut1s() {
- Set mapSet = igmpTimerMap.entrySet();
- Iterator itr = mapSet.iterator();
- while (itr.hasNext()) {
- Map.Entry entry = (Map.Entry) itr.next();
- SingleTimer single = (SingleTimer) entry.getValue();
- if (single.timeOut > 0) {
- single.timeOut--;
- } else {
- single.machine.timeOut();
- itr.remove();
- }
- }
- }
-
- static class SingleTimer {
-
- public int timeOut; // unit is 1 second
- public SingleStateMachine machine;
-
- public SingleTimer(SingleStateMachine machine, int timeOut) {
- this.machine = machine;
- this.timeOut = timeOut;
- }
-
- }
-}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
index 94300b5..c99a3b7 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
@@ -26,5 +26,4 @@
public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
-
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
index ca68bf6..83f2ef2 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
@@ -15,182 +15,218 @@
*/
package org.opencord.igmpproxy.impl;
-import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.impl.state.DelayMember;
+import org.opencord.igmpproxy.impl.state.IdleMember;
+import org.opencord.igmpproxy.impl.state.NonMember;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.State;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
-import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * State machine for single IGMP group member. The state machine is implemented on
- * RFC 2236 "6. Host State Diagram".
- */
-public class SingleStateMachine {
- // Only for tests purposes
- static boolean sendQuery = true;
- static final int STATE_NON = 0;
- static final int STATE_DELAY = 1;
- static final int STATE_IDLE = 2;
- static final int TRANSITION_JOIN = 0;
- static final int TRANSITION_LEAVE = 1;
- static final int TRANSITION_QUERY = 2;
- static final int TRANSITION_TIMEOUT = 3;
- static final int DEFAULT_MAX_RESP = 0xfffffff;
- static final int DEFAULT_COUNT = 1;
- private DeviceId devId;
- private Ip4Address groupIp;
+/**
+ * State machine implementation.
+ */
+public final class SingleStateMachine implements StateMachine {
+ public static final int DEFAULT_MAX_RESP = 0xfffffff;
+
+ private StateMachineId stateMachineId;
+ private int currentState;
+
private Ip4Address srcIp;
private PortNumber upLinkPort;
- private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
- private int timerId = IgmpTimer.INVALID_TIMER_ID;
- private int timeOut = DEFAULT_MAX_RESP;
- private State[] states =
- {
- new NonMember(), new DelayMember(), new IdleMember()
- };
+ private AtomicInteger timeOut; // unit is 1 second
+ private State[] states;
private int[] nonTransition =
- {STATE_DELAY, STATE_NON, STATE_NON, STATE_NON};
+ {State.STATE_DELAY, State.STATE_NON,
+ State.STATE_NON, State.STATE_NON};
private int[] delayTransition =
- {STATE_DELAY, STATE_NON, STATE_DELAY, STATE_IDLE};
+ {State.STATE_DELAY, State.STATE_NON,
+ State.STATE_DELAY, State.STATE_IDLE};
private int[] idleTransition =
- {STATE_IDLE, STATE_NON, STATE_DELAY, STATE_IDLE};
+ {State.STATE_IDLE, State.STATE_NON,
+ State.STATE_DELAY, State.STATE_IDLE};
//THE TRANSITION TABLE
private int[][] transition =
{nonTransition, delayTransition, idleTransition};
- private int currentState = STATE_NON;
- public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
- this.devId = devId;
- this.groupIp = groupIp;
- this.srcIp = src;
+ /**
+ * Constructor for serializer.
+ */
+ private SingleStateMachine() {
+ this.stateMachineId = null;
+ this.srcIp = null;
+ this.upLinkPort = null;
+ this.timeOut = null;
+ this.states = null;
+ }
+
+ /**
+ * Constructor of single state machine.
+ *
+ * @param deviceId device id of state-machine
+ * @param groupIp group id of state-machine
+ * @param srcIp source ip of state-machine
+ * @param upLinkPort uplink port of state-machine
+ */
+ public SingleStateMachine(DeviceId deviceId,
+ Ip4Address groupIp,
+ Ip4Address srcIp,
+ PortNumber upLinkPort) {
+ this.stateMachineId = StateMachineId.of(deviceId, groupIp);
+ this.srcIp = srcIp;
this.upLinkPort = upLinkPort;
+ this.currentState = State.STATE_NON;
+ this.timeOut = null;
+ this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
}
- public Ip4Address getGroupIp() {
- return groupIp;
- }
-
- public DeviceId getDeviceId() {
- return devId;
- }
- public boolean increaseCounter() {
- count.incrementAndGet();
- return true;
- }
-
- public boolean decreaseCounter() {
- if (count.get() > 0) {
- count.decrementAndGet();
- return true;
+ /**
+ * Constructor of single state machine.
+ *
+ * @param machineId id of state-machine
+ * @param srcIp source ip of state-machine
+ * @param upLinkPort uplink port of state-machine
+ * @param currentState current state of state-machine
+ * @param timeout timeout value of state-machine
+ */
+ public SingleStateMachine(StateMachineId machineId,
+ Ip4Address srcIp,
+ PortNumber upLinkPort,
+ int currentState,
+ Integer timeout) {
+ this.stateMachineId = machineId;
+ this.srcIp = srcIp;
+ this.upLinkPort = upLinkPort;
+ this.currentState = currentState;
+ if (timeout != null) {
+ createTimeOut(timeout);
} else {
- return false;
+ this.timeOut = null;
}
+ this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
}
- public int getCounter() {
- return count.get();
+ @Override
+ public StateMachineId getStateMachineId() {
+ return this.stateMachineId;
}
+
+ @Override
+ public Ip4Address getGroupIp() {
+ return this.stateMachineId.getGroupIp();
+ }
+
+ @Override
+ public DeviceId getDeviceId() {
+ return this.stateMachineId.getDeviceId();
+ }
+
+
+ @Override
+ public Ip4Address getSrcIp() {
+ return srcIp;
+ }
+
+ public PortNumber getUpLinkPort() {
+ return upLinkPort;
+ }
+
+ @Override
public int currentState() {
- return currentState;
+ return this.currentState;
+ }
+
+ public Integer getTimeOut() {
+ return timeOut == null ? null : timeOut.get();
+ }
+
+ @Override
+ public void increaseTimeOut() {
+ this.timeOut.getAndIncrement();
+ }
+
+ @Override
+ public void decreaseTimeOut() {
+ this.timeOut.getAndDecrement();
+ }
+
+ @Override
+ public void setMaxTimeout() {
+ this.timeOut.set(DEFAULT_MAX_RESP);
+ }
+
+ @Override
+ public void startTimer(int timeout) {
+ createTimeOut(timeout);
+ }
+
+ @Override
+ public void resetTimer(int timeout) {
+ setTimeOut(timeout);
+ }
+
+ @Override
+ public void destroyTimer() {
+ setTimeOut(null);
+ }
+
+ @Override
+ public void join(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ State state = states[currentState];
+ state.join();
+ }
+ next(State.TRANSITION_JOIN);
+ }
+
+ @Override
+ public void leave(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ State state = states[currentState];
+ state.leave();
+ }
+ next(State.TRANSITION_LEAVE);
+ }
+
+ @Override
+ public void query(int maxResp) {
+ State state = states[currentState];
+ state.query(maxResp);
+ next(State.TRANSITION_QUERY);
+ }
+
+ @Override
+ public void timeOut(boolean sendQuery) {
+ if (sendQuery) {
+ State state = states[currentState];
+ state.timeOut();
+ }
+ next(State.TRANSITION_TIMEOUT);
}
private void next(int msg) {
- currentState = transition[currentState][msg];
+ this.currentState = transition[currentState][msg];
}
- public void join(boolean messageOutAllowed) {
- states[currentState].join(messageOutAllowed);
- next(TRANSITION_JOIN);
+ private void setTimeOut(int timeout) {
+ timeOut.set(timeout);
}
- public void leave(boolean messageOutAllowed) {
- states[currentState].leave(messageOutAllowed);
- next(TRANSITION_LEAVE);
- }
-
- public void query(int maxResp) {
- states[currentState].query(maxResp);
- next(TRANSITION_QUERY);
- }
-
- public void timeOut() {
- states[currentState].timeOut();
- next(TRANSITION_TIMEOUT);
- }
-
- int getTimeOut(int maxTimeOut) {
- Random random = new Random();
- return Math.abs(random.nextInt()) % maxTimeOut;
- }
-
- protected void cancelTimer() {
- if (IgmpTimer.INVALID_TIMER_ID != timerId) {
- IgmpTimer.cancel(timerId);
+ private void setTimeOut(Integer timeout) {
+ if (timeout == null) {
+ timeOut = null;
+ } else {
+ timeOut.set(timeout);
}
}
- class State {
- public void join(boolean messageOutAllowed) {
- }
-
- public void leave(boolean messageOutAllowed) {
- if (messageOutAllowed) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2Leave(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- }
- }
-
- public void query(int maxResp) {
- }
-
- public void timeOut() {
- }
-
- }
-
- class NonMember extends State {
- public void join(boolean messageOutAllowed) {
- if (messageOutAllowed) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2Join(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
- timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
- }
- }
- }
-
- class DelayMember extends State {
- public void query(int maxResp) {
- if (maxResp < timeOut) {
- timeOut = getTimeOut(maxResp);
- timerId = IgmpTimer.reset(timerId, SingleStateMachine.this, timeOut);
- }
- }
-
- public void timeOut() {
- if (sendQuery) {
- Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
- IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp) :
- IgmpSender.getInstance().buildIgmpV2ResponseQuery(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
- timeOut = DEFAULT_MAX_RESP;
- }
- }
-
- }
-
- class IdleMember extends State {
- public void query(int maxResp) {
- timeOut = getTimeOut(maxResp);
- timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
- }
+ private void createTimeOut(Integer timeout) {
+ timeOut = new AtomicInteger(timeout);
}
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java
new file mode 100644
index 0000000..5cc255d
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+/**
+ * Custom serializer for {@link SingleStateMachine}.
+ */
+public class SingleStateMachineSerializer extends Serializer<SingleStateMachine> {
+ /**
+ * Creates serializer instance.
+ */
+ public SingleStateMachineSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, SingleStateMachine singleStateMachine) {
+ kryo.writeClassAndObject(output, singleStateMachine.getStateMachineId());
+ kryo.writeClassAndObject(output, singleStateMachine.getSrcIp());
+ kryo.writeClassAndObject(output, singleStateMachine.getUpLinkPort());
+ kryo.writeClassAndObject(output, singleStateMachine.getTimeOut());
+ kryo.writeObject(output, singleStateMachine.currentState());
+ }
+
+ @Override
+ public SingleStateMachine read(Kryo kryo, Input input, Class<SingleStateMachine> aClass) {
+ StateMachineId stateMachineId = (StateMachineId) kryo.readClassAndObject(input);
+ Ip4Address srcIp = (Ip4Address) kryo.readClassAndObject(input);
+ PortNumber uplinkPort = (PortNumber) kryo.readClassAndObject(input);
+ Integer timeout = (Integer) kryo.readClassAndObject(input);
+ int currentState = kryo.readObject(input, int.class);
+ return new SingleStateMachine(stateMachineId, srcIp, uplinkPort, currentState, timeout);
+ }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
deleted file mode 100644
index e87756d..0000000
--- a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.igmpproxy.impl;
-
-import com.google.common.collect.Maps;
-import org.onlab.packet.Ip4Address;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * State machine for whole IGMP process. The state machine is implemented on
- * RFC 2236 "6. Host State Diagram".
- */
-public final class StateMachine {
-
- private static final String GROUP = "Group";
-
- private StateMachine() {
-
- }
- private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
-
- private static String getId(DeviceId devId, Ip4Address groupIp) {
- return devId.toString() + GROUP + groupIp.toString();
- }
-
- private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
- String id = getId(devId, groupIp);
- return map.get(id);
- }
-
- public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
- SingleStateMachine machine = get(devId, groupIp);
- if (null == machine) {
- return;
- }
- machine.cancelTimer();
- map.remove(getId(devId, groupIp));
- }
-
- public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
- SingleStateMachine machine = get(devId, groupIp);
-
- if (null == machine) {
- machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
- map.put(getId(devId, groupIp), machine);
-
- boolean shouldSendJoin = true;
- if (IgmpManager.isIgmpOnPodBasis() &&
- groupListenedByOtherDevices(devId, groupIp)) {
- // unset the flag if igmp messages are evaluated on POD basis
- // and there are already active members of this group
- // across the entire POD
- shouldSendJoin = false;
- }
- machine.join(shouldSendJoin);
- return true;
- }
- machine.increaseCounter();
- return false;
- }
-
- public static boolean leave(DeviceId devId, Ip4Address groupIp) {
- SingleStateMachine machine = get(devId, groupIp);
- if (null == machine) {
- return false;
- }
-
- machine.decreaseCounter();
- // make sure machine instance still exists.
- // it may be removed by the preceding thread
- if (machine.getCounter() == 0) {
- boolean shouldSendLeave = true;
- if (IgmpManager.isIgmpOnPodBasis() &&
- groupListenedByOtherDevices(devId, groupIp)) {
- // unset the flag if igmp messages are evaluated on POD basis
- // and there are still active members of this group
- // across the entire POD
- shouldSendLeave = false;
- }
- machine.leave(shouldSendLeave);
- destroySingle(devId, groupIp);
- return true;
- }
- return false;
- }
-
- static void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
- SingleStateMachine machine = get(devId, groupIp);
- if (null == machine) {
- return;
- }
- machine.query(maxResp);
- }
-
- static void generalQuery(DeviceId devId, int maxResp) {
- for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
- SingleStateMachine machine = entry.getValue();
- if (devId.equals(machine.getDeviceId())) {
- machine.query(maxResp);
- }
- }
- }
-
- static void generalQuery(int maxResp) {
- for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
- SingleStateMachine machine = entry.getValue();
- machine.query(maxResp);
- }
- }
-
- public static Set<Map.Entry<String, SingleStateMachine>> entrySet() {
- return map.entrySet();
- }
-
- public static void timeOut(DeviceId devId, Ip4Address groupIp) {
- SingleStateMachine machine = get(devId, groupIp);
- if (null == machine) {
- return;
- }
- machine.timeOut();
- }
-
- public static void clearMap() {
- map.clear();
- }
-
- /**
- * @param devId id of the device being excluded
- * @param groupIp group IP address
- * @return true if this group has at least one listener connected to
- * any device in the map except for the device specified; false otherwise.
- */
- private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
- for (SingleStateMachine machine : map.values()) {
- if (machine.getDeviceId().equals(devId)) {
- continue;
- }
- if (machine.getGroupIp().equals(groupIp)) {
- //means group is being listened by other peers in the domain
- return true;
- }
- }
- return false;
- }
-
-}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
new file mode 100644
index 0000000..8c29a2f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.igmpproxy.impl.store.machine.StateMachineStore;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+@Component(immediate = true, service = StateMachineService.class)
+public class StateMachineManager implements StateMachineService {
+ // Only for tests purposes
+ public static boolean sendQuery = true;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StateMachineStore stateMachineStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpLeadershipService igmpLeadershipService;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ log.info("Stopped.");
+ }
+
+ private StateMachine get(DeviceId devId, Ip4Address groupIp) {
+ StateMachineId id = StateMachineId.of(devId, groupIp);
+ return stateMachineStore.getStateMachine(id);
+ }
+
+ public void destroySingle(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine has already been destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return;
+ }
+ stateMachineStore.removeStateMachine(machine.getStateMachineId());
+ stateMachineStore.removeCounter(machine.getStateMachineId());
+ }
+
+ @Override
+ public boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
+ StateMachine machineInstance = get(devId, groupIp);
+
+ if (machineInstance == null) {
+ machineInstance = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
+ stateMachineStore.putStateMachine(machineInstance);
+ stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+ log.debug("Instance of machine created with id: {}", machineInstance.getStateMachineId());
+ boolean shouldSendJoin = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are already active members of this group
+ // across the entire POD
+ shouldSendJoin = false;
+ }
+ machineInstance.join(shouldSendJoin);
+ stateMachineStore.updateStateMachine(machineInstance);
+ return true;
+ }
+ log.debug("Instance of machine has already been created. deviceId: {} and groupIp: {}",
+ devId, groupIp);
+ stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+ return false;
+ }
+
+ @Override
+ public boolean leave(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine has already been left and destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return false;
+ }
+
+ long count = stateMachineStore.decreaseAndGetCounter(machine.getStateMachineId());
+ // make sure machine instance still exists.
+ // it may be removed by the preceding thread
+ if (count == 0) {
+ boolean shouldSendLeave = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are still active members of this group
+ // across the entire POD
+ shouldSendLeave = false;
+ }
+ machine.leave(shouldSendLeave);
+ destroySingle(devId, groupIp);
+ log.debug("This machine left and destroyed. deviceId: {} and groupIp: {}", devId, groupIp);
+ return true;
+ }
+ log.debug("This machine still has member/members. number of member/members: {}, deviceId: {}, groupIp: {} ",
+ count, devId, groupIp);
+ return false;
+ }
+
+ @Override
+ public void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine is not found. deviceId: {} and groupIp: {} ", devId, groupIp);
+ return;
+ }
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+
+ @Override
+ public void generalQuery(DeviceId devId, int maxResp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ if (devId.equals(machine.getDeviceId())) {
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+ }
+
+ @Override
+ public void generalQuery(int maxResp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ machine.query(maxResp);
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+
+ @Override
+ public void timeOut(DeviceId devId, Ip4Address groupIp) {
+ StateMachine machine = get(devId, groupIp);
+ if (machine == null) {
+ log.debug("Machine is not found. deviceId: {} and groupIp: {}", devId, groupIp);
+ return;
+ }
+ machine.timeOut(sendQuery);
+ stateMachineStore.updateStateMachine(machine);
+ }
+
+ @Override
+ public void timeOut1s() {
+ Collection<StateMachine> mapSet = stateMachineStore.getAllStateMachines();
+ for (StateMachine machineInfo : mapSet) {
+ if (machineInfo.getTimeOut() != null) {
+ StateMachineId machineId = machineInfo.getStateMachineId();
+ if (igmpLeadershipService.isLocalLeader(machineId.getDeviceId())) {
+ if (machineInfo.getTimeOut() > 0) {
+ stateMachineStore.decreaseTimeout(machineId);
+ } else {
+ StateMachine machine = stateMachineStore.getStateMachine(machineId);
+ machine.timeOut(sendQuery);
+ machine.destroyTimer();
+ stateMachineStore.updateStateMachine(machine);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clearAllMaps() {
+ stateMachineStore.clearAllStateMachines();
+ }
+
+ /**
+ * @param devId id of the device being excluded
+ * @param groupIp group IP address
+ * @return true if this group has at least one listener connected to
+ * any device in the map except for the device specified; false otherwise.
+ */
+ private boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+ for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+ if (machine.getStateMachineId().getDeviceId().equals(devId)) {
+ continue;
+ }
+ if (machine.getStateMachineId().getGroupIp().equals(groupIp)) {
+ //means group is being listened by other peers in the domain
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java
new file mode 100644
index 0000000..40fe4e5
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.State;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+import java.util.Random;
+
+/**
+ * Abstract implementation of state.
+ */
+public abstract class AbstractState implements State {
+
+ protected StateMachine machine;
+
+ protected AbstractState(StateMachine machine) {
+ this.machine = machine;
+ }
+
+ public void join() {
+ }
+
+ public void leave() {
+ DeviceId devId = machine.getStateMachineId().getDeviceId();
+ Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+ Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+ IgmpSender.getInstance().buildIgmpV3Leave(groupIp, machine.getSrcIp()) :
+ IgmpSender.getInstance().buildIgmpV2Leave(groupIp, machine.getSrcIp());
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+ }
+
+ public void query(int maxResp) {
+ }
+
+ public void timeOut() {
+ }
+
+ protected int getTimeOut(int maxTimeOut) {
+ Random random = new Random();
+ return Math.abs(random.nextInt()) % maxTimeOut;
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java
new file mode 100644
index 0000000..6d6b7e0
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of delay-member state.
+ */
+public class DelayMember extends AbstractState {
+ public DelayMember(StateMachine machine) {
+ super(machine);
+ }
+
+ @Override
+ public void query(int maxResp) {
+ if (maxResp < machine.getTimeOut()) {
+ int timeout = getTimeOut(maxResp);
+ machine.resetTimer(timeout);
+ }
+ }
+
+ @Override
+ public void timeOut() {
+ DeviceId devId = machine.getStateMachineId().getDeviceId();
+ Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+ Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+ IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, machine.getSrcIp()) :
+ IgmpSender.getInstance().buildIgmpV2ResponseQuery(groupIp, machine.getSrcIp());
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+ machine.setMaxTimeout();
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java
new file mode 100644
index 0000000..63b030c
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of idle-member state.
+ */
+public class IdleMember extends AbstractState {
+ public IdleMember(StateMachine machine) {
+ super(machine);
+ }
+
+ @Override
+ public void query(int maxResp) {
+ int timeout = getTimeOut(maxResp);
+ machine.startTimer(timeout);
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java
new file mode 100644
index 0000000..1437a6f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of non-member state.
+ */
+public class NonMember extends AbstractState {
+ public NonMember(StateMachine machine) {
+ super(machine);
+ }
+
+ @Override
+ public void join() {
+ DeviceId devId = machine.getStateMachineId().getDeviceId();
+ Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+ Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+ IgmpSender.getInstance().buildIgmpV3Join(groupIp, machine.getSrcIp()) :
+ IgmpSender.getInstance().buildIgmpV2Join(groupIp, machine.getSrcIp());
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+ int timeout = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+ machine.startTimer(timeout);
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java
new file mode 100644
index 0000000..706a222
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of IGMPProxy application.
+ */
+package org.opencord.igmpproxy.impl.state;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java
new file mode 100644
index 0000000..9a3af33
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.store.AbstractStore;
+import org.opencord.igmpproxy.GroupMemberId;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract implementation of group-member store.
+ */
+public abstract class AbstractGroupMemberStore extends AbstractStore<GroupMemberEvent, GroupMemberStoreDelegate>
+ implements GroupMemberStore {
+
+ protected Map<GroupMemberId, GroupMember> groupMemberMap;
+
+ protected AbstractGroupMemberStore() {
+
+ }
+
+ protected AbstractGroupMemberStore(Map<GroupMemberId, GroupMember> groupMemberMap) {
+ this.groupMemberMap = groupMemberMap;
+ }
+
+ @Override
+ public GroupMember putGroupMember(GroupMember groupMember) {
+ return groupMemberMap.put(groupMember.getGroupMemberId(), groupMember);
+ }
+
+ @Override
+ public GroupMember updateGroupMember(GroupMember groupMember) {
+ return groupMemberMap.replace(groupMember.getGroupMemberId(), groupMember);
+ }
+
+ @Override
+ public GroupMember removeGroupMember(GroupMemberId groupMemberId) {
+ return groupMemberMap.remove(groupMemberId);
+ }
+
+ @Override
+ public GroupMember getGroupMember(GroupMemberId groupMemberId) {
+ return groupMemberMap.getOrDefault(groupMemberId, null);
+ }
+
+ @Override
+ public Set<GroupMemberId> getAllGroupMemberIds() {
+ return ImmutableSet.copyOf(groupMemberMap.keySet());
+ }
+
+ @Override
+ public Collection<GroupMember> getAllGroupMembers() {
+ return ImmutableList.copyOf(groupMemberMap.values());
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java
new file mode 100644
index 0000000..01f9af9
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.GroupMemberIdSerializer;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Group member store based on distributed storage.
+ */
+@Component(service = GroupMemberStore.class)
+public class DistributedGroupMemberStore extends AbstractGroupMemberStore {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String GROUP_MEMBER_MAP_NAME = "onos-igmpproxy-groupmember-table";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private ConsistentMap<GroupMemberId, GroupMember> consistentMap;
+
+ public DistributedGroupMemberStore() {
+ super();
+ }
+
+ @Activate
+ public void activate() {
+ KryoNamespace groupMemberSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(new GroupMemberIdSerializer(), GroupMemberId.class)
+ .register(GroupMember.class)
+ .build();
+ consistentMap = storageService.<GroupMemberId, GroupMember>consistentMapBuilder()
+ .withName(GROUP_MEMBER_MAP_NAME)
+ .withSerializer(Serializer.using(groupMemberSerializer))
+ .build();
+ groupMemberMap = consistentMap.asJavaMap();
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ groupMemberMap.clear();
+ groupMemberMap = null;
+ consistentMap.destroy();
+ log.info("Stopped.");
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
similarity index 63%
rename from app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
rename to app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
index a05b962..0add46c 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
@@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.opencord.igmpproxy.impl;
+package org.opencord.igmpproxy.impl.store.groupmember;
import org.onlab.packet.IGMPMembership;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.VlanId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.GroupMemberId;
import java.util.ArrayList;
import java.util.Iterator;
@@ -27,12 +28,10 @@
/**
* Date struct to keep Igmp member infomations.
*/
-public final class GroupMember {
+public final class GroupMember {
private final VlanId vlan;
- private final DeviceId deviceId;
- private final PortNumber portNumber;
- private final Ip4Address groupIp;
+ private final GroupMemberId groupMemberId;
private final boolean v2;
private byte recordType = IGMPMembership.MODE_IS_INCLUDE;
private ArrayList<Ip4Address> sourceList = new ArrayList<>();
@@ -43,23 +42,17 @@
private boolean leave = false;
public GroupMember(Ip4Address groupIp, VlanId vlan, DeviceId deviceId, PortNumber portNum, boolean isV2) {
- this.groupIp = groupIp;
this.vlan = vlan;
- this.deviceId = deviceId;
- this.portNumber = portNum;
+ this.groupMemberId = GroupMemberId.of(groupIp, deviceId, portNum);
v2 = isV2;
}
- static String getkey(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
- return groupIp.toString() + deviceId.toString() + portNum.toString();
- }
-
public String getkey() {
- return GroupMember.getkey(groupIp, deviceId, portNumber);
+ return groupMemberId.toString();
}
- public String getId() {
- return getkey();
+ public GroupMemberId getGroupMemberId() {
+ return groupMemberId;
}
public VlanId getvlan() {
@@ -67,15 +60,15 @@
}
public DeviceId getDeviceId() {
- return deviceId;
+ return groupMemberId.getDeviceId();
}
public PortNumber getPortNumber() {
- return portNumber;
+ return groupMemberId.getPortNum();
}
public Ip4Address getGroupIp() {
- return groupIp;
+ return groupMemberId.getGroupIp();
}
public byte getRecordType() {
@@ -95,63 +88,8 @@
this.recordType = recordType;
this.sourceList.clear();
this.sourceList.addAll(newSourceList);
-
- /*TODO : support SSM
- if (this.recordType == IGMPMembership.MODE_IS_INCLUDE) {
- switch (recordType) {
- case IGMPMembership.MODE_IS_INCLUDE:
- case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
- //however , set to include<B> anyway
- this.sourceList = sourceList;
- this.recordType = IGMPMembership.MODE_IS_INCLUDE;
- break;
- case IGMPMembership.MODE_IS_EXCLUDE:
- case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
- //set to exclude<B>
- this.sourceList = sourceList;
- this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
- break;
- case IGMPMembership.ALLOW_NEW_SOURCES:
- //set to include <A+B>
- join(this.sourceList, sourceList);
- break;
- case IGMPMembership.BLOCK_OLD_SOURCES:
- //set to include <A-B>
- exclude(this.sourceList, sourceList);
- break;
- default:
- break;
- }
- } else if (this.recordType == IGMPMembership.MODE_IS_EXCLUDE) {
- switch (recordType) {
- case IGMPMembership.MODE_IS_INCLUDE:
- case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
- //set to include<B>
- this.recordType = IGMPMembership.MODE_IS_INCLUDE;
- this.sourceList = sourceList;
- break;
- case IGMPMembership.MODE_IS_EXCLUDE:
- case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
- this.sourceList = sourceList;
- this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
- break;
- case IGMPMembership.ALLOW_NEW_SOURCES:
- //set to exclude <A-B>
- exclude(this.sourceList, sourceList);
- break;
- case IGMPMembership.BLOCK_OLD_SOURCES:
- //set to exclude <A+B>
- join(this.sourceList, sourceList);
- break;
- default:
- break;
- }
- }*/
-
- return;
}
-
/*join B to A (A+B)*/
private void join(ArrayList<Integer> listA, ArrayList<Integer> listB) {
Iterator<Integer> iterA = null;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java
new file mode 100644
index 0000000..494b89b
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Group-member event.
+ */
+public class GroupMemberEvent extends
+ AbstractEvent<GroupMemberEvent.Type, GroupMember> {
+
+ /**
+ * Type of group-member event.
+ */
+ public enum Type {
+ /**
+ * Signifies that group-member added to store.
+ */
+ GROUP_MEMBER_ADDED,
+ /**
+ * Signifies that group-member updated in store.
+ */
+ GROUP_MEMBER_UPDATED,
+ /**
+ * Signifies that group-member has been removed from store.
+ */
+ GROUP_MEMBER_REMOVED
+ }
+
+ /**
+ *
+ * @param type group-member event type.
+ * @param subject group-member.
+ */
+ public GroupMemberEvent(GroupMemberEvent.Type type, GroupMember subject) {
+ super(type, subject);
+ }
+
+ protected GroupMemberEvent(GroupMemberEvent.Type type, GroupMember subject, long time) {
+ super(type, subject, time);
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java
new file mode 100644
index 0000000..74f4ca7
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.opencord.igmpproxy.GroupMemberId;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Group Member Store Service.
+ */
+public interface GroupMemberStore {
+
+ /**
+ * Add new group-member to store.
+ *
+ * @param groupMember given group-member.
+ * @return added group-member.
+ */
+ GroupMember putGroupMember(GroupMember groupMember);
+
+ /**
+ * Update group-member in store.
+ *
+ * @param groupMember given group-member.
+ * @return updated group-member.
+ */
+ GroupMember updateGroupMember(GroupMember groupMember);
+
+ /**
+ * removed group-member from store.
+ *
+ * @param groupMemberId given group-member id
+ * @return removed group-member
+ */
+ GroupMember removeGroupMember(GroupMemberId groupMemberId);
+
+ /**
+ * get group-member from store.
+ *
+ * @param groupMemberId given group-member identification
+ * @return group-member or null if not found
+ */
+ GroupMember getGroupMember(GroupMemberId groupMemberId);
+
+ /**
+ * Returns all group member ids.
+ *
+ * @return set of group member ids
+ */
+ Set<GroupMemberId> getAllGroupMemberIds();
+
+ /**
+ * Returns all group members.
+ *
+ * @return all group members
+ */
+ Collection<GroupMember> getAllGroupMembers();
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java
new file mode 100644
index 0000000..e9f5509
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Group member store delegate abstraction.
+ */
+public interface GroupMemberStoreDelegate extends StoreDelegate<GroupMemberEvent> {
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java
new file mode 100644
index 0000000..7c025a6
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of group-member store.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java
new file mode 100644
index 0000000..0ecbb14
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.store.AbstractStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract implementation of state-machine store.
+ */
+public abstract class AbstractStateMachineStore
+ extends AbstractStore<StateMachineEvent, StateMachineStoreDelegate>
+ implements StateMachineStore {
+
+ protected Map<StateMachineId, StateMachine> stateMachineMap;
+
+ protected AbstractStateMachineStore() {
+ }
+
+ protected AbstractStateMachineStore(Map<StateMachineId, StateMachine> stateMachineMap) {
+ this.stateMachineMap = stateMachineMap;
+ }
+
+ @Override
+ public StateMachine putStateMachine(StateMachine stateMachine) {
+ return stateMachineMap.put(stateMachine.getStateMachineId(), stateMachine);
+ }
+
+ @Override
+ public StateMachine updateStateMachine(StateMachine machine) {
+ return stateMachineMap.replace(machine.getStateMachineId(), machine);
+ }
+
+ @Override
+ public StateMachine removeStateMachine(StateMachineId id) {
+ return stateMachineMap.remove(id);
+ }
+
+ @Override
+ public StateMachine getStateMachine(StateMachineId id) {
+ return stateMachineMap.get(id);
+ }
+
+ @Override
+ public Collection<StateMachine> getAllStateMachines() {
+ return ImmutableList.copyOf(stateMachineMap.values());
+ }
+
+ @Override
+ public void clearAllStateMachines() {
+ stateMachineMap.clear();
+ stateMachineMap = null;
+ }
+
+ @Override
+ public void decreaseTimeout(StateMachineId machineId) {
+ StateMachine machine = stateMachineMap.get(machineId);
+ machine.decreaseTimeOut();
+ updateStateMachine(machine);
+ }
+
+ @Override
+ public void increaseTimeout(StateMachineId machineId) {
+ StateMachine machine = stateMachineMap.get(machineId);
+ machine.increaseTimeOut();
+ updateStateMachine(machine);
+ }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java
new file mode 100644
index 0000000..f337b09
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.igmpproxy.impl.SingleStateMachine;
+import org.opencord.igmpproxy.impl.SingleStateMachineSerializer;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineIdSerializer;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * State machine store based on distributed storage.
+ */
+@Component(service = StateMachineStore.class)
+public class DistributedStateMachineStore extends AbstractStateMachineStore {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String STATE_MACHINE_COUNTER_STORE = "onos-state-machine-counter-store";
+ private static final String STATE_MACHINE_MAP_NAME = "onos-state-machine-store";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private AtomicCounterMap<StateMachineId> stateMachineCounters;
+
+ private ConsistentMap<StateMachineId, StateMachine> consistentMap;
+
+ public DistributedStateMachineStore() {
+ super();
+ }
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder stateMachineKryoBuilder = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(new StateMachineIdSerializer(), StateMachineId.class);
+
+ stateMachineCounters = storageService.<StateMachineId>atomicCounterMapBuilder()
+ .withName(STATE_MACHINE_COUNTER_STORE)
+ .withSerializer(Serializer.using(stateMachineKryoBuilder.build())).build();
+
+ stateMachineKryoBuilder
+ .register(new SingleStateMachineSerializer(), SingleStateMachine.class);
+ this.consistentMap = storageService.<StateMachineId, StateMachine>consistentMapBuilder()
+ .withName(STATE_MACHINE_MAP_NAME)
+ .withSerializer(Serializer.using(stateMachineKryoBuilder.build()))
+ .build();
+ super.stateMachineMap = consistentMap.asJavaMap();
+
+
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ stateMachineMap.clear();
+ stateMachineMap = null;
+ consistentMap.destroy();
+ stateMachineCounters.clear();
+ stateMachineCounters.destroy();
+ log.info("Stopped.");
+ }
+
+ @Override
+ public long increaseAndGetCounter(StateMachineId stateMachineId) {
+ return stateMachineCounters.incrementAndGet(stateMachineId);
+ }
+
+ @Override
+ public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+ if (stateMachineCounters.get(stateMachineId) > 0) {
+ return stateMachineCounters.decrementAndGet(stateMachineId);
+ } else {
+ return stateMachineCounters.get(stateMachineId);
+ }
+ }
+
+ @Override
+ public long getCounter(StateMachineId stateMachineId) {
+ return stateMachineCounters.get(stateMachineId);
+ }
+
+ @Override
+ public boolean removeCounter(StateMachineId stateMachineId) {
+ stateMachineCounters.remove(stateMachineId);
+ return true;
+ }
+
+ @Override
+ public void clearAllStateMachines() {
+ super.clearAllStateMachines();
+ stateMachineCounters.clear();
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java
new file mode 100644
index 0000000..ab9df23
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onosproject.event.AbstractEvent;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * State machine event.
+ */
+public class StateMachineEvent extends
+ AbstractEvent<StateMachineEvent.Type, StateMachine> {
+ /**
+ * Internal state-machine event type.
+ */
+ public enum Type {
+ /**
+ * Signifies that state-machine added to store.
+ */
+ STATE_MACHINE_ADDED,
+ /**
+ * Signifies that state-machine updated in store.
+ */
+ STATE_MACHINE_UPDATED,
+ /**
+ * Signifies that state-machine removed from store.
+ */
+ STATE_MACHINE_REMOVED
+ }
+
+ /**
+ * Creates new state machine event.
+ *
+ * @param type state-machine event type.
+ * @param subject state machine.
+ */
+ public StateMachineEvent(Type type, StateMachine subject) {
+ super(type, subject);
+ }
+
+ protected StateMachineEvent(Type type, StateMachine subject, long time) {
+ super(type, subject, time);
+ }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java
new file mode 100644
index 0000000..ddb566a
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+import java.util.Collection;
+
+/**
+ * State Machine Store.
+ */
+public interface StateMachineStore {
+ /**
+ * Increases counter state-machine.
+ *
+ * @param stateMachineId identification of machine
+ * @return counter
+ */
+ long increaseAndGetCounter(StateMachineId stateMachineId);
+
+ /**
+ * Decreases counter of state-machine.
+ *
+ * @param stateMachineId identification of machine
+ * @return if counter greater than zero, decreases counter and return counter
+ */
+ long decreaseAndGetCounter(StateMachineId stateMachineId);
+
+ /**
+ * Removes counter from counter-map.
+ *
+ * @param stateMachineId identification of machine
+ * @return true
+ */
+ boolean removeCounter(StateMachineId stateMachineId);
+
+ /**
+ * Get counter using state-machine id.
+ *
+ * @param stateMachineId identification of machine
+ * @return counter of member that use this id
+ */
+ long getCounter(StateMachineId stateMachineId);
+
+ /**
+ * Add new machine to store.
+ *
+ * @param stateMachine given machine
+ * @return added informations
+ */
+ StateMachine putStateMachine(StateMachine stateMachine);
+
+ /**
+ * Update information in store.
+ *
+ * @param stateMachine given machine
+ * @return updated info
+ */
+ StateMachine updateStateMachine(StateMachine stateMachine);
+
+ /**
+ * Get information from store.
+ *
+ * @param id given identification of machine
+ * @return information of machine
+ */
+ StateMachine getStateMachine(StateMachineId id);
+
+ /**
+ * Remove machine from store.
+ *
+ * @param id given identification of machine
+ * @return removed info
+ */
+ StateMachine removeStateMachine(StateMachineId id);
+
+ /**
+ * Get informations of all machine.
+ *
+ * @return machine informations
+ */
+ Collection<StateMachine> getAllStateMachines();
+
+ /**
+ * clear information map.
+ */
+ void clearAllStateMachines();
+
+ /**
+ * Decreases timeout of timer.
+ *
+ * @param machineId given machine id
+ */
+ void decreaseTimeout(StateMachineId machineId);
+
+ /**
+ * Increases timeout of timer.
+ *
+ * @param machineId given machine id
+ */
+ void increaseTimeout(StateMachineId machineId);
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java
new file mode 100644
index 0000000..f8db089
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * State machine store delegate abstraction.
+ */
+public interface StateMachineStoreDelegate extends StoreDelegate<StateMachineEvent> {
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java
new file mode 100644
index 0000000..2b9bc9e
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of state-machine store.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index b301f94..6283c08 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
package org.opencord.igmpproxy.impl;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
import org.onlab.packet.IGMPMembership;
@@ -59,6 +60,11 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketServiceAdapter;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.impl.store.groupmember.AbstractGroupMemberStore;
+import org.opencord.igmpproxy.impl.store.machine.AbstractStateMachineStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -78,6 +84,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
public class IgmpManagerBase {
@@ -96,7 +104,7 @@
// Common connect point of aggregation switch used by all devices.
protected static final ConnectPoint COMMON_CONNECT_POINT =
- ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
+ ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
// Uplink ports for two olts A and B
protected static final PortNumber PORT_A = PortNumber.portNumber(1);
protected static final PortNumber PORT_B = PortNumber.portNumber(2);
@@ -135,23 +143,24 @@
@Override
public Device getDevice(DeviceId deviceId) {
- if (flagForDevice) {
- DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
- .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
- SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
- Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
- flagForDevice = false;
- return deviceA;
+ if (flagForDevice) {
+ DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+ .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
+ SparseAnnotations annotations = annotationsBuilder.build();
+ Annotations[] da = {annotations};
+ Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
+ flagForDevice = false;
+ return deviceA;
} else {
- DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
- .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
- SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
- Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
- return deviceB;
- }
+ DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+ .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
+ SparseAnnotations annotations = annotationsBuilder.build();
+ Annotations[] da = {annotations};
+ Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
+ return deviceB;
+ }
}
+
@Override
public List<Port> getPorts(DeviceId deviceId) {
return lsPorts;
@@ -162,7 +171,7 @@
DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
.set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
+ Annotations[] da = {annotations};
Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_C, Device.Type.OTHER, "", "", "", "", null, da);
lsDevices.add(deviceA);
return lsDevices;
@@ -184,31 +193,32 @@
static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
- new ConfigFactory<ApplicationId, IgmpproxyConfig>(
- SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
- @Override
- public IgmpproxyConfig createConfig() {
- return new IgmpproxyConfig();
- }
- };
+ new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+ @Override
+ public IgmpproxyConfig createConfig() {
+ return new IgmpproxyConfig();
+ }
+ };
ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
- new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
- SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+ new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
- @Override
- public IgmpproxySsmTranslateConfig createConfig() {
- return new IgmpproxySsmTranslateConfig();
- }
- };
+ @Override
+ public IgmpproxySsmTranslateConfig createConfig() {
+ return new IgmpproxySsmTranslateConfig();
+ }
+ };
class MockIgmpProxyConfig extends IgmpproxyConfig {
boolean igmpOnPodBasis = true;
MockIgmpProxyConfig(boolean igmpFlagValue) {
- igmpOnPodBasis = igmpFlagValue;
- }
+ igmpOnPodBasis = igmpFlagValue;
+ }
+
@Override
public boolean igmpOnPodBasis() {
return igmpOnPodBasis;
@@ -224,17 +234,18 @@
@Override
public ConnectPoint connectPoint() {
- return COMMON_CONNECT_POINT;
+ return COMMON_CONNECT_POINT;
}
}
- class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
- Boolean igmpOnPodFlag = false;
+ class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+ Boolean igmpOnPodFlag = false;
- TestNetworkConfigRegistry(Boolean igmpFlag) {
- igmpOnPodFlag = igmpFlag;
- }
+ TestNetworkConfigRegistry(Boolean igmpFlag) {
+ igmpOnPodFlag = igmpFlag;
+ }
+
@SuppressWarnings("unchecked")
@Override
public <S> Set<S> getSubjects(Class<S> subjectClass) {
@@ -242,19 +253,19 @@
return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
}
return null;
- }
+ }
- @SuppressWarnings("unchecked")
- @Override
- public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
- if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
- IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
- return (C) igmpproxyConfig;
- } else {
- super.getConfig(subject, configClass);
- }
- return null;
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+ if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
+ IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+ return (C) igmpproxyConfig;
+ } else {
+ super.getConfig(subject, configClass);
+ }
+ return null;
+ }
}
@@ -276,12 +287,18 @@
@Override
public void emit(OutboundPacket packet) {
synchronized (savedPackets) {
- savedPackets.add(packet);
- savedPackets.notify();
+ savedPackets.add(packet);
+ savedPackets.notify();
}
}
- }
+ }
+ class TestIgmpLeaderShipService implements IgmpLeadershipService {
+ @Override
+ public boolean isLocalLeader(DeviceId deviceId) {
+ return true;
+ }
+ }
class MockMastershipService extends MastershipServiceAdapter {
@Override
@@ -407,9 +424,9 @@
* Mocks the DefaultPacketContext.
*/
final class TestPacketContext extends DefaultPacketContext {
- TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
- super(time, inPkt, outPkt, block);
- }
+ TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
+ super(time, inPkt, outPkt, block);
+ }
@Override
public void send() {
@@ -423,31 +440,31 @@
* @param reply Ethernet packet
* @throws InterruptedException
*/
- void sendPacket(Ethernet reply) {
+ void sendPacket(Ethernet reply) {
- if (reply != null) {
- final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
+ if (reply != null) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
- if (flagForQueryPacket) {
- InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
- context = new TestPacketContext(127L, inBoundPacket, null, false);
- packetProcessor.process(context);
- } else {
- if (flagForPacket) {
- InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
- context = new TestPacketContext(127L, inPacket, null, false);
- flagForPacket = false;
+ if (flagForQueryPacket) {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
+ packetProcessor.process(context);
+ } else {
+ if (flagForPacket) {
+ InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+ context = new TestPacketContext(127L, inPacket, null, false);
+ flagForPacket = false;
- packetProcessor.process(context);
- } else {
- InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
- context = new TestPacketContext(127L, inBoundPacket, null, false);
- flagForPacket = true;
+ packetProcessor.process(context);
+ } else {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
+ flagForPacket = true;
- packetProcessor.process(context);
- }
- }
- }
+ packetProcessor.process(context);
+ }
+ }
+ }
}
protected class MockSadisService implements SadisService {
@@ -498,7 +515,7 @@
private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
MockSubscriberAndDeviceInformation sub =
new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
- CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+ CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
@Override
public SubscriberAndDeviceInformation get(String id) {
@@ -653,15 +670,15 @@
}
@Override
- public void disableComponent(String name) {
- // TODO Auto-generated method stub
- }
+ public void disableComponent(String name) {
+ // TODO Auto-generated method stub
+ }
- @Override
- public ServiceReference getServiceReference() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public ServiceReference getServiceReference() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Ethernet buildWrongIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -669,7 +686,7 @@
igmpMembership.setRecordType((byte) 0x33);
return IgmpSender.getInstance().buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp,
- igmpMembership, sourceIp, false);
+ igmpMembership, sourceIp, false);
}
Ethernet buildUnknownIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -679,4 +696,53 @@
return IgmpSender.getInstance().buildIgmpPacket((byte) 0x44, groupIp, igmpMembership, sourceIp, false);
}
+ class TestStateMachineStoreService extends AbstractStateMachineStore {
+ private static final int DEFAULT_COUNT = 0;
+ private Map<StateMachineId, AtomicLong> countsMap;
+
+ public TestStateMachineStoreService(Map<StateMachineId, StateMachine> map) {
+ super();
+ stateMachineMap = Maps.newConcurrentMap();
+ countsMap = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public long increaseAndGetCounter(StateMachineId stateMachineId) {
+ AtomicLong count = countsMap.get(stateMachineId);
+ if (count == null) {
+ count = new AtomicLong(DEFAULT_COUNT);
+ countsMap.put(stateMachineId, count);
+ }
+ return count.incrementAndGet();
+ }
+
+ @Override
+ public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+ AtomicLong count = countsMap.get(stateMachineId);
+ if (count.get() > 0) {
+ return count.decrementAndGet();
+ } else {
+ return count.get();
+ }
+ }
+
+ @Override
+ public boolean removeCounter(StateMachineId stateMachineId) {
+ countsMap.remove(stateMachineId);
+ return true;
+ }
+
+ @Override
+ public long getCounter(StateMachineId stateMachineId) {
+ return countsMap.get(stateMachineId).get();
+ }
+
+ }
+
+
+ class TestGroupMemberStoreService extends AbstractGroupMemberStore {
+ public TestGroupMemberStoreService() {
+ super(Maps.newConcurrentMap());
+ }
+ }
}
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
index dc220f8..e0daa14 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
@@ -15,6 +15,7 @@
*/
package org.opencord.igmpproxy.impl;
+import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -23,7 +24,6 @@
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.net.flow.FlowRuleServiceAdapter;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
-import org.opencord.igmpproxy.impl.IgmpManagerBase.MockComponentContext;
import static org.junit.Assert.*;
@@ -42,6 +42,7 @@
@Before
public void setUp() {
igmpManager = new IgmpManager();
+ igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
igmpManager.coreService = new CoreServiceAdapter();
igmpManager.mastershipService = new MockMastershipService();
igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -55,16 +56,22 @@
TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
igmpStatisticsManager.activate(new MockComponentContext());
igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
+
+ igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+ StateMachineManager stateMachineManager = new StateMachineManager();
+ stateMachineManager.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+ stateMachineManager.activate(new MockComponentContext());
+ igmpManager.stateMachineService = stateMachineManager;
+
// By default - we send query messages
- SingleStateMachine.sendQuery = true;
+ StateMachineManager.sendQuery = true;
}
// Tear Down the IGMP application.
@After
public void tearDown() {
igmpManager.deactivate();
- IgmpManager.groupMemberMap.clear();
- StateMachine.clearMap();
+ igmpManager.stateMachineService.clearAllMaps();
}
// Checking the Default value of IGMP_ON_POD_BASIS.
@@ -88,7 +95,7 @@
@Test
public void testIgmpOnPodBasisDefaultValue() throws InterruptedException {
// We need to count join messages sent on the upstream
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
igmpManager.activate();
@@ -118,7 +125,7 @@
@Test
public void testIgmpOnPodBasisTrueValue() throws InterruptedException {
// We need to count join messages
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(true);
igmpManager.activate();
@@ -129,7 +136,7 @@
sendPacket(firstPacket);
// Emitted packet is stored in list savedPackets
synchronized (savedPackets) {
- savedPackets.wait(WAIT_TIMEOUT);
+ savedPackets.wait(WAIT_TIMEOUT);
}
assertNotNull(savedPackets);
assertEquals(1, savedPackets.size());
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
index 2677511..b2e8271 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
@@ -21,6 +21,7 @@
import java.util.List;
+import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,6 +54,7 @@
@Before
public void setUp() {
igmpManager = new IgmpManager();
+ igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
igmpManager.coreService = new CoreServiceAdapter();
igmpManager.mastershipService = new MockMastershipService();
igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -61,6 +63,11 @@
igmpManager.flowRuleService = new FlowRuleServiceAdapter();
igmpManager.multicastService = new TestMulticastRouteService();
igmpManager.sadisService = new MockSadisService();
+ igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+ StateMachineManager stateMachineService = new StateMachineManager();
+ stateMachineService.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+ stateMachineService.activate(new MockComponentContext());
+ igmpManager.stateMachineService = stateMachineService;
igmpStatisticsManager = new IgmpStatisticsManager();
igmpStatisticsManager.cfgService = new MockCfgService();
igmpStatisticsManager.addListener(mockListener);
@@ -68,7 +75,7 @@
igmpStatisticsManager.activate(new MockComponentContext());
igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
// By default - we send query messages
- SingleStateMachine.sendQuery = true;
+ StateMachineManager.sendQuery = true;
}
// Tear Down the IGMP application.
@@ -76,14 +83,13 @@
public void tearDown() {
igmpStatisticsManager.removeListener(mockListener);
igmpStatisticsManager.deactivate();
- IgmpManager.groupMemberMap.clear();
- StateMachine.clearMap();
+ igmpManager.stateMachineService.clearAllMaps();
}
//Test Igmp Statistics.
@Test
public void testIgmpStatistics() throws InterruptedException {
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
igmpManager.activate();
@@ -103,7 +109,7 @@
}
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
+ assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
@@ -119,18 +125,18 @@
//Test packet with Unknown Multicast IpAddress
@Test
public void testIgmpUnknownMulticastIpAddress() throws InterruptedException {
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
igmpManager.activate();
Ethernet firstPacket =
- IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
+ IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
// Sending first packet
sendPacket(firstPacket);
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals((long) 1,
- igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
+ assertEquals((long) 1,
+ igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
}
//Test Igmp Query Statistics.
@@ -146,15 +152,15 @@
//IGMPV3 General Membership Query packet
Ethernet igmpv3MembershipQueryPkt1 =
- IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
+ IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
sendPacket(igmpv3MembershipQueryPkt1);
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+ assertEquals(igmpStatisticsManager.getIgmpStats()
+ .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
assertEquals(igmpStatisticsManager.getIgmpStats()
- .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
+ .getIgmpGeneralMembershipQuery().longValue(), 1);
assertEquals(igmpStatisticsManager.getIgmpStats()
- .getIgmpGeneralMembershipQuery().longValue(), 1);
- assertEquals(igmpStatisticsManager.getIgmpStats()
- .getCurrentGrpNumCounter().longValue(), 1);
+ .getCurrentGrpNumCounter().longValue(), 1);
}
//Test Events
@@ -163,10 +169,10 @@
final int waitEventGeneration = igmpStatisticsManager.statisticsGenerationPeriodInSeconds * 1000;
//assert that event listened as the app activates
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals(mockListener.events.size(), 1));
+ assertEquals(mockListener.events.size(), 1));
assertAfter(waitEventGeneration / 2, waitEventGeneration, () ->
- assertEquals(mockListener.events.size(), 2));
+ assertEquals(mockListener.events.size(), 2));
for (IgmpStatisticsEvent event : mockListener.events) {
assertEquals(event.type(), IgmpStatisticsEvent.Type.STATS_UPDATE);
@@ -176,7 +182,7 @@
//Test packet with Unknown Wrong Membership mode
@Test
public void testWrongIgmpPacket() throws InterruptedException {
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
igmpManager.activate();
@@ -185,14 +191,14 @@
// Sending first packet
sendPacket(firstPacket);
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals((long) 1,
- igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
+ assertEquals((long) 1,
+ igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
}
//Test packet with Unknown IGMP type.
@Test
public void testUnknownIgmpPacket() throws InterruptedException {
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
igmpManager.activate();
@@ -201,14 +207,14 @@
// Sending first packet
sendPacket(firstPacket);
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals((long) 1,
- igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
+ assertEquals((long) 1,
+ igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
}
//Test packet with Insufficient Permission.
@Test
public void testSufficientPermission() throws InterruptedException {
- SingleStateMachine.sendQuery = false;
+ StateMachineManager.sendQuery = false;
flagForPermission = true;
igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
@@ -218,8 +224,9 @@
// Sending first packet
sendPacket(firstPacket);
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertEquals((long) 1,
- igmpStatisticsManager.getIgmpStats().getFailJoinReqInsuffPermissionAccessCounter().longValue()));
+ assertEquals((long) 1,
+ igmpStatisticsManager.getIgmpStats()
+ .getFailJoinReqInsuffPermissionAccessCounter().longValue()));
}
public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {