SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS

Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java b/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java
new file mode 100644
index 0000000..137a259
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/GroupMemberId.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.Objects;
+
+/**
+ * Identification of group member.
+ */
+public final class GroupMemberId {
+    private Ip4Address groupIp;
+    private DeviceId deviceId;
+    private PortNumber portNum;
+
+    /**
+     * Constructor for serializer.
+     */
+    private GroupMemberId() {
+        this.groupIp = null;
+        this.deviceId = null;
+        this.portNum = null;
+    }
+
+    private GroupMemberId(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+        this.groupIp = groupIp;
+        this.deviceId = deviceId;
+        this.portNum = portNum;
+    }
+
+    /**
+     * Creates new group member identification.
+     *
+     * @param groupIp  group ip of member
+     * @param deviceId device id
+     * @param portNum  port number of connect point
+     * @return new identification of group-member
+     */
+    public static GroupMemberId of(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+        return new GroupMemberId(groupIp, deviceId, portNum);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("groupIp", groupIp)
+                .add("deviceId", deviceId)
+                .add("portNumber", portNum).toString();
+    }
+
+    /**
+     * Get group ip of group.
+     *
+     * @return group ip
+     */
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
+
+    /**
+     * Get device id of connect point.
+     *
+     * @return device id
+     */
+    public DeviceId getDeviceId() {
+        return deviceId;
+    }
+
+    /**
+     * Get port number of connect point.
+     *
+     * @return port number
+     */
+    public PortNumber getPortNum() {
+        return portNum;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GroupMemberId that = (GroupMemberId) o;
+        return Objects.equals(groupIp, that.groupIp) &&
+                Objects.equals(deviceId, that.deviceId) &&
+                Objects.equals(portNum, that.portNum);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupIp, deviceId, portNum);
+    }
+}
+
diff --git a/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java b/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java
new file mode 100644
index 0000000..370148a
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/GroupMemberIdSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * Custom serializer for {@link GroupMemberId}.
+ */
+public class GroupMemberIdSerializer extends Serializer<GroupMemberId> {
+    /**
+     * Creates serializer instance.
+     */
+    public GroupMemberIdSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, GroupMemberId groupMemberId) {
+        output.writeString(groupMemberId.getDeviceId().toString());
+        kryo.writeClassAndObject(output, groupMemberId.getGroupIp());
+        kryo.writeClassAndObject(output, groupMemberId.getPortNum());
+    }
+
+    @Override
+    public GroupMemberId read(Kryo kryo, Input input, Class<GroupMemberId> aClass) {
+        DeviceId deviceId = DeviceId.deviceId(input.readString());
+        Ip4Address groupIp = (Ip4Address) kryo.readClassAndObject(input);
+        PortNumber portNum = (PortNumber) kryo.readClassAndObject(input);
+        return GroupMemberId.of(groupIp, deviceId, portNum);
+    }
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
new file mode 100644
index 0000000..1b8d0fe
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Leadership control service.
+ */
+public interface IgmpLeadershipService {
+    /**
+     * Makes leadership control.
+     *
+     * @param deviceId received deviceId
+     * @return if it is leadership of this device, return true
+     */
+    boolean isLocalLeader(DeviceId deviceId);
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/package-info.java b/api/src/main/java/org/opencord/igmpproxy/package-info.java
index a2f9362..f261bd7 100644
--- a/api/src/main/java/org/opencord/igmpproxy/package-info.java
+++ b/api/src/main/java/org/opencord/igmpproxy/package-info.java
@@ -15,6 +15,6 @@
  */
 
 /**
- * Created by onos on 17-3-9.
+ * Package of igmpproxy.
  */
 package org.opencord.igmpproxy;
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java
new file mode 100644
index 0000000..022d509
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/State.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+/**
+ * State of machine.
+ */
+public interface State {
+    /**
+     * Default state.
+     */
+    int STATE_NON = 0;
+    /**
+     * Delay state.
+     */
+    int STATE_DELAY = 1;
+    /**
+     * Idle state.
+     */
+    int STATE_IDLE = 2;
+
+    /**
+     * Transition to join.
+     */
+    int TRANSITION_JOIN = 0;
+    /**
+     * Transition to leave.
+     */
+    int TRANSITION_LEAVE = 1;
+    /**
+     * Transition to query.
+     */
+    int TRANSITION_QUERY = 2;
+    /**
+     * Transition to timeout.
+     */
+    int TRANSITION_TIMEOUT = 3;
+
+    /**
+     * Makes the requirements of join request.
+     */
+    void join();
+
+    /**
+     * Makes the requirements of leave request.
+     */
+    void leave();
+
+    /**
+     * Makes the requirements of query request.
+     *
+     * @param maxResp maximum resp
+     */
+    void query(int maxResp);
+
+    /**
+     * Makes the requirements of timeout .
+     */
+    void timeOut();
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java
new file mode 100644
index 0000000..ae79931
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachine.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * State machine object.
+ */
+public interface StateMachine {
+    /**
+     * Returns identification of state-machine.
+     *
+     * @return identification
+     */
+    StateMachineId getStateMachineId();
+
+    /**
+     * Returns group ip of state-machine.
+     *
+     * @return ip of group
+     */
+    Ip4Address getGroupIp();
+
+    /**
+     * Returns device id of state-machine.
+     *
+     * @return device id
+     */
+    DeviceId getDeviceId();
+
+    /**
+     * Returns source ip of state-machine.
+     *
+     * @return source ip
+     */
+    Ip4Address getSrcIp();
+
+    /**
+     * Returns up-link port of state-machine.
+     *
+     * @return up-link port
+     */
+    PortNumber getUpLinkPort();
+
+    /**
+     * Returns timeout. it is nullable.
+     *
+     * @return timeout
+     */
+    Integer getTimeOut();
+
+    /**
+     * Set timer to max timeout.
+     */
+    void setMaxTimeout();
+
+    /**
+     * Start timer.
+     *
+     * @param timeout timeout
+     */
+    void startTimer(int timeout);
+
+    /**
+     * Resets timer.
+     *
+     * @param timeout timeout of timer
+     */
+    void resetTimer(int timeout);
+
+    /**
+     * Destroy timeout.
+     */
+    void destroyTimer();
+
+    /**
+     * Increases timeout of timer.
+     */
+    void increaseTimeOut();
+
+    /**
+     * Decreases timeout of timer.
+     */
+    void decreaseTimeOut();
+
+    /**
+     * Returns current state.
+     *
+     * @return current state
+     */
+    int currentState();
+
+    /**
+     * Makes the requirements of join request and transition to next station.
+     *
+     * @param messageOutAllowed message out allowed
+     */
+    void join(boolean messageOutAllowed);
+
+    /**
+     * Makes the requirements of leave request and transition to next station.
+     *
+     * @param messageOutAllowed message out allowed
+     */
+    void leave(boolean messageOutAllowed);
+
+    /**
+     * Makes the requirements of query request and transition to next station.
+     *
+     * @param maxResp maximum resp
+     */
+    void query(int maxResp);
+
+    /**
+     * Makes the requirements of timeout request and transition to next station.
+     *
+     * @param sendQuery send query for test
+     */
+    void timeOut(boolean sendQuery);
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java
new file mode 100644
index 0000000..38d2bf1
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineId.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+
+import java.util.Objects;
+
+/**
+ * Identification of state machine.
+ */
+public final class StateMachineId {
+    private DeviceId deviceId;
+    private Ip4Address groupIp;
+
+    /**
+     * Constructor for serializer.
+     */
+    private StateMachineId() {
+        this.deviceId = null;
+        this.groupIp = null;
+    }
+
+    private StateMachineId(DeviceId deviceId, Ip4Address groupIp) {
+        this.deviceId = deviceId;
+        this.groupIp = groupIp;
+    }
+
+    /**
+     * Creates new state-machine identification using device-id and group-ip.
+     *
+     * @param deviceId device id of state-machie
+     * @param groupIp  group ip of state-machine
+     * @return created identification
+     */
+    public static StateMachineId of(DeviceId deviceId, Ip4Address groupIp) {
+        return new StateMachineId(deviceId, groupIp);
+    }
+
+    /**
+     * Returns device id of identification.
+     *
+     * @return device id
+     */
+    public DeviceId getDeviceId() {
+        return deviceId;
+    }
+
+    /**
+     * Returns group ip of identification.
+     *
+     * @return group ip
+     */
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("deviceId", deviceId)
+                .add("groupIp", groupIp)
+                .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StateMachineId that = (StateMachineId) o;
+        return Objects.equals(deviceId, that.deviceId) &&
+                Objects.equals(groupIp, that.groupIp);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(deviceId, groupIp);
+    }
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java
new file mode 100644
index 0000000..f7c747e
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineIdSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Custom serializer for {@link StateMachineId}.
+ */
+public class StateMachineIdSerializer extends Serializer<StateMachineId> {
+    /**
+     * Creates serializer instance.
+     */
+    public StateMachineIdSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, StateMachineId stateMachineId) {
+        output.writeString(stateMachineId.getDeviceId().toString());
+        kryo.writeClassAndObject(output, stateMachineId.getGroupIp());
+    }
+
+    @Override
+    public StateMachineId read(Kryo kryo, Input input, Class<StateMachineId> aClass) {
+        DeviceId deviceId = DeviceId.deviceId(input.readString());
+        Ip4Address groupIp = (Ip4Address) kryo.readClassAndObject(input);
+        return StateMachineId.of(deviceId, groupIp);
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java
new file mode 100644
index 0000000..c8ed49e
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/StateMachineService.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.statemachine;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+/**
+ * Manage State Machine and Igmp Timer.
+ */
+public interface StateMachineService {
+
+    /**
+     * Makes the requirements of igmp-join request.
+     *
+     * @param devId      device id of connect point
+     * @param groupIp    group ip of igmp group
+     * @param srcIP      source ip of device
+     * @param upLinkPort uplink port of device
+     * @return if this is first join from the device and group-ip return true
+     */
+    boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort);
+
+    /**
+     * Makes the requirements of igmp-leave request.
+     *
+     * @param devId   device id of connect point
+     * @param groupIp group ip of group-member
+     * @return If there are no more members of that device and group returns true
+     */
+    boolean leave(DeviceId devId, Ip4Address groupIp);
+
+    /**
+     * clear map of state-machine.
+     */
+    void clearAllMaps();
+
+    /**
+     * Makes the requirements of special query.
+     *
+     * @param devId   device id
+     * @param groupIp group ip of igmp group
+     * @param maxResp maximum resp of igmp
+     */
+    void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp);
+
+    /**
+     * Makes the requirements of igmp-query request.
+     *
+     * @param devId   device id
+     * @param maxResp maximum resp of igmp
+     */
+    void generalQuery(DeviceId devId, int maxResp);
+
+    /**
+     * Makes the requirements of igmp-query request.
+     *
+     * @param maxResp maximum resp of igmp
+     */
+    void generalQuery(int maxResp);
+
+    /**
+     * Makes the requirements of timeout.
+     *
+     * @param devId   device id
+     * @param groupIp group ip of igmp group
+     */
+    void timeOut(DeviceId devId, Ip4Address groupIp);
+
+    /**
+     * Checks all state-machines periodically.
+     */
+    void timeOut1s();
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java b/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java
new file mode 100644
index 0000000..3270750
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/statemachine/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package of state machine.
+ */
+package org.opencord.igmpproxy.statemachine;
\ No newline at end of file
diff --git a/app/pom.xml b/app/pom.xml
index 8848b78..f9b38ea 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -61,6 +61,19 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-common</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onlab-osgi</artifactId>
             <version>${onos.version}</version>
             <scope>provided</scope>
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
new file mode 100644
index 0000000..4f2e7bb
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Leadership service implementation.
+ */
+@Component(immediate = true, service = IgmpLeadershipService.class)
+public class IgmpLeadershipManager implements IgmpLeadershipService {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        log.info("Stopped.");
+    }
+
+    @Override
+    public boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            if (deviceService.isAvailable(deviceId)) {
+                return false;
+            }
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+        return true;
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index b985b14..97e97cc 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -15,10 +15,14 @@
  */
 package org.opencord.igmpproxy.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onosproject.net.Device;
+import org.opencord.igmpproxy.IgmpLeadershipService;
 import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -114,7 +118,6 @@
     private static final Class<McastConfig> MCAST_CONFIG_CLASS =
             McastConfig.class;
 
-    public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
     private static ApplicationId appId;
 
     private static int unSolicitedTimeout = 3; // unit is 1 sec
@@ -176,6 +179,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected IgmpStatisticsService igmpStatisticsManager;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected GroupMemberStore groupMemberStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StateMachineService stateMachineService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService igmpLeadershipService;
+
     private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
@@ -217,15 +229,15 @@
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
-    private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE,  MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
-                              CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
+    private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
+            CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
 
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(APP_NAME);
         coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
         packetService.addProcessor(processor, PacketProcessor.director(4));
-        IgmpSender.init(packetService, mastershipService, igmpStatisticsManager);
+        IgmpSender.init(packetService, igmpLeadershipService, igmpStatisticsManager);
 
         networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
         networkConfig.registerConfigFactory(igmpproxyConfigFactory);
@@ -249,7 +261,7 @@
         deviceService.addListener(deviceListener);
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
-                                                                        "events-igmp-%d", log));
+                "events-igmp-%d", log));
         log.info("Started");
     }
 
@@ -285,10 +297,10 @@
         Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
         maxResp = calculateMaxResp(maxResp);
         if (gAddr != null && !gAddr.isZero()) {
-            StateMachine.specialQuery(deviceId, gAddr, maxResp);
+            stateMachineService.specialQuery(deviceId, gAddr, maxResp);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
         } else {
-            StateMachine.generalQuery(deviceId, maxResp);
+            stateMachineService.generalQuery(deviceId, maxResp);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
         }
     }
@@ -303,7 +315,7 @@
             deviceService.getAvailableDevices().forEach(device -> {
                 Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
                 if (accessDevice.isPresent()) {
-                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                    stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
                     igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
                 }
             });
@@ -311,7 +323,7 @@
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
-            StateMachine.generalQuery(maxResponseTime);
+            stateMachineService.generalQuery(maxResponseTime);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
         }
     }
@@ -367,7 +379,7 @@
             if (pimSSmInterworking) {
                 src = ssmTranslateRoute(groupIp);
                 if (src == null) {
-                    log.info("no ssm translate for group " + groupIp.toString());
+                    log.info("no ssm translate for group {}", groupIp);
                     return;
                 }
             } else {
@@ -384,8 +396,8 @@
                 join = false;
             }
         }
-        String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
-        GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+        GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+        GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
 
         if (join) {
             igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
@@ -394,7 +406,7 @@
                 if (!sourceConfigured.isPresent()) {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                     log.warn("Unable to process IGMP Join from {} since no source " +
-                                     "configuration is found.", deviceId);
+                            "configuration is found.", deviceId);
                     igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
                     return;
                 }
@@ -402,7 +414,7 @@
                 Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
                 if (deviceUplink.isEmpty()) {
                     log.warn("Unable to process IGMP Join since uplink port " +
-                     "of the device {} is not found.", deviceId);
+                            "of the device {} is not found.", deviceId);
                     return;
                 }
 
@@ -414,14 +426,15 @@
 
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 if (isJoined) {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
                     igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
                 } else {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                 }
-                groupMemberMap.put(groupMemberKey, groupMember);
+                groupMemberStore.putGroupMember(groupMember);
+                log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
                     McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
@@ -438,11 +451,13 @@
             groupMember.resetAllTimers();
             groupMember.updateList(recordType, sourceList);
             groupMember.setLeave(false);
+            //put updated member to the store
+            groupMemberStore.putGroupMember(groupMember);
         } else {
             igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
             if (groupMember == null) {
-                log.info("receive leave but no instance, group " + groupIp.toString() +
-                        " device:" + deviceId.toString() + " port:" + portNumber.toString());
+                log.info("receive leave but no instance, group {} device: {} port:{}",
+                        groupIp, deviceId, portNumber);
                 igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
                 return;
             } else {
@@ -451,6 +466,8 @@
                     leaveAction(groupMember);
                 } else {
                     sendQuery(groupMember);
+                    //put modified group member object to the store
+                    groupMemberStore.updateGroupMember(groupMember);
                 }
             }
         }
@@ -459,11 +476,11 @@
     private void leaveAction(GroupMember groupMember) {
         igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
-        StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+        stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
         groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
                 new McastRoute(source, groupMember.getGroupIp(),
-                               McastRoute.Type.IGMP), Sets.newHashSet(cp)));
-        groupMemberMap.remove(groupMember.getId());
+                        McastRoute.Type.IGMP), Sets.newHashSet(cp)));
+        groupMemberStore.removeGroupMember(groupMember.getGroupMemberId());
     }
 
     private void sendQuery(GroupMember groupMember) {
@@ -517,7 +534,7 @@
 
                     if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
                             !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
-                        log.error("Device not registered in netcfg :" + deviceId.toString());
+                        log.error("Device not registered in netcfg : {}", deviceId);
                         igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
                         return;
                     }
@@ -525,7 +542,7 @@
                     IGMP igmp = (IGMP) ipv4Pkt.getPayload();
 
                     Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
-                    PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+                    PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
                     switch (igmp.getIgmpType()) {
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
                             igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
@@ -535,8 +552,8 @@
                                     log.info("IGMP Picked up query from connectPoint");
                                     //OK to process packet
                                     processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
-                                                                 pkt.receivedFrom(),
-                                                                 0xff & igmp.getMaxRespField());
+                                            pkt.receivedFrom(),
+                                            0xff & igmp.getMaxRespField());
                                     break;
                                 } else {
                                     //Not OK to process packet
@@ -546,7 +563,7 @@
                             }
 
                             processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
-                                             0xff & igmp.getMaxRespField());
+                                    0xff & igmp.getMaxRespField());
                             break;
                         case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
                             igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
@@ -566,14 +583,14 @@
                             break;
 
                         default:
-                            log.warn("Unknown IGMP message type:" + igmp.getIgmpType());
+                            log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
                             igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
                             igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
                             break;
                     }
 
                 } catch (Exception ex) {
-                    log.error("igmp process error : {} ", ex);
+                    log.error("igmp process error : ", ex);
                 }
             });
         }
@@ -592,14 +609,14 @@
             IGMPGroup group = itr.next();
             if (group instanceof IGMPMembership) {
                 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                  pkt.receivedFrom(), igmp.getIgmpType());
+                        pkt.receivedFrom(), igmp.getIgmpType());
             } else {
                 IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
                 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                             IGMPMembership.MODE_IS_EXCLUDE :
-                                             IGMPMembership.MODE_IS_INCLUDE);
+                        IGMPMembership.MODE_IS_EXCLUDE :
+                        IGMPMembership.MODE_IS_INCLUDE);
                 processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                  pkt.receivedFrom(), igmp.getIgmpType());
+                        pkt.receivedFrom(), igmp.getIgmpType());
             }
         }
 
@@ -608,7 +625,7 @@
     private class IgmpProxyTimerTask extends TimerTask {
         public void run() {
             try {
-                IgmpTimer.timeOut1s();
+                stateMachineService.timeOut1s();
                 queryMembers();
             } catch (Exception ex) {
                 log.warn("Igmp timer task error : {}", ex.getMessage());
@@ -617,13 +634,14 @@
 
         private void queryMembers() {
             GroupMember groupMember;
-            Set groupMemberSet = groupMemberMap.entrySet();
-            Iterator itr = groupMemberSet.iterator();
-            while (itr.hasNext()) {
-                Map.Entry entry = (Map.Entry) itr.next();
-                groupMember = (GroupMember) entry.getValue();
+            Set<GroupMemberId> keySet = groupMemberStore.getAllGroupMemberIds();
+            for (GroupMemberId key : keySet) {
+                groupMember = groupMemberStore.getGroupMember(key);
+                if (groupMember == null) {
+                    continue;
+                }
                 DeviceId did = groupMember.getDeviceId();
-                if (mastershipService.isLocalMaster(did)) {
+                if (igmpLeadershipService.isLocalLeader(did)) {
                     if (groupMember.isLeave()) {
                         lastQuery(groupMember);
                     } else if (periodicQuery) {
@@ -636,10 +654,14 @@
         private void lastQuery(GroupMember groupMember) {
             if (groupMember.getLastQueryInterval() < lastQueryInterval) {
                 groupMember.lastQueryInterval(true); // count times
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
                 sendQuery(groupMember);
                 groupMember.lastQueryInterval(false); // reset count number
                 groupMember.lastQueryCount(true); //count times
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
                 leaveAction(groupMember);
             }
@@ -648,10 +670,14 @@
         private void periodicQuery(GroupMember groupMember) {
             if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
                 groupMember.keepAliveInterval(true);
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
                 sendQuery(groupMember);
                 groupMember.keepAliveInterval(false);
                 groupMember.keepAliveQueryCount(true);
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
                 leaveAction(groupMember);
             }
@@ -670,12 +696,11 @@
         }
         PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
         return validateUpLinkPort(device.id(), portNumber) ?
-                    Optional.of(portNumber) : Optional.empty();
+                Optional.of(portNumber) : Optional.empty();
     }
 
     /**
-     *
-     * @param deviceId device id
+     * @param deviceId   device id
      * @param portNumber port number
      * @return true if the port name starts with NNI_PREFIX; false otherwise.
      */
@@ -688,8 +713,8 @@
         boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
                 port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
         if (!isValid) {
-            log.warn("Port cannot be validated; it is not configured as an NNI port." +
-                    "Device/port: {}/{}", deviceId, portNumber);
+            log.warn("Port cannot be validated; it is not configured as an NNI port. Device/port: {}/{}",
+                    deviceId, portNumber);
         }
         return isValid;
     }
@@ -719,14 +744,14 @@
                     @Override
                     public void onSuccess(Objective objective) {
                         log.info("Igmp filter for {} on {} {}.",
-                                 devId, port, (remove) ? REMOVED : INSTALLED);
+                                devId, port, (remove) ? REMOVED : INSTALLED);
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
                         log.info("Igmp filter {} for device {} on port {} failed because of {}",
-                                 (remove) ? INSTALLATION : REMOVAL, devId, port,
-                                 error);
+                                (remove) ? INSTALLATION : REMOVAL, devId, port,
+                                error);
                     }
                 });
 
@@ -809,7 +834,7 @@
                 case PORT_ADDED:
                     port = p.number();
                     if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         processFilterObjective(devId, port, false);
                     } else if (isUplink(devId, port)) {
                         provisionUplinkFlows();
@@ -820,7 +845,7 @@
                 case PORT_UPDATED:
                     port = p.number();
                     if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         if (event.port().isEnabled()) {
                             processFilterObjective(devId, port, false);
                         } else {
@@ -950,7 +975,7 @@
                         if (config != null && mvlan != config.egressVlan().toShort()) {
                             mvlan = config.egressVlan().toShort();
                             IgmpSender.getInstance().setMvlan(mvlan);
-                            groupMemberMap.values().forEach(m -> leaveAction(m));
+                            groupMemberStore.getAllGroupMembers().forEach(m -> leaveAction(m));
                         }
                     }
 
@@ -1009,6 +1034,7 @@
 
         processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
     }
+
     private void unprovisionConnectPointFlows() {
         if (connectPoint == null) {
             return;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
index 39d6046..adf0e9e 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
@@ -24,7 +24,6 @@
 import org.onlab.packet.IPv4;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.MacAddress;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -32,6 +31,7 @@
 import org.onosproject.net.packet.DefaultOutboundPacket;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
 import org.opencord.igmpproxy.IgmpStatisticsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@
 
     private static IgmpSender instance = null;
     private PacketService packetService;
-    private MastershipService mastershipService;
+    private IgmpLeadershipService igmpLeadershipService;
     private IgmpStatisticsService igmpStatisticsService;
     private boolean withRAUplink = true;
     private boolean withRADownlink = false;
@@ -61,16 +61,16 @@
     private int maxResp = DEFAULT_MEX_RESP;
     private Logger log = LoggerFactory.getLogger(getClass());
 
-    private IgmpSender(PacketService packetService, MastershipService mastershipService,
+    private IgmpSender(PacketService packetService, IgmpLeadershipService igmpLeadershipService,
              IgmpStatisticsService igmpStatisticsService) {
         this.packetService = packetService;
-        this.mastershipService = mastershipService;
+        this.igmpLeadershipService = igmpLeadershipService;
         this.igmpStatisticsService = igmpStatisticsService;
     }
 
-    public static void init(PacketService packetService, MastershipService mastershipService,
+    public static void init(PacketService packetService, IgmpLeadershipService igmpLeadershipService,
             IgmpStatisticsService igmpStatisticsService) {
-        instance = new IgmpSender(packetService, mastershipService, igmpStatisticsService);
+        instance = new IgmpSender(packetService, igmpLeadershipService, igmpStatisticsService);
     }
 
     public static IgmpSender getInstance() {
@@ -170,6 +170,7 @@
 
             case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
                 if (igmpMembership == null) {
+                    log.debug("Igmp membership is not found. igmp-type {} ", type);
                     return null;
                 }
                 igmpPacket.addGroup(igmpMembership);
@@ -183,6 +184,7 @@
             case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
             case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
                 if (igmpMembership == null) {
+                    log.debug("Igmp membership is not found. igmp-type {} ", type);
                     return null;
                 }
                 igmpPacket.addGroup(igmpMembership);
@@ -192,6 +194,7 @@
                 ip4Packet.setDestinationAddress(dst);
                 break;
             default:
+                log.debug("Unknown igmp type: {} ", type);
                 igmpStatisticsService.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
                 return null;
         }
@@ -225,7 +228,7 @@
     }
 
     public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
-        if (!mastershipService.isLocalMaster(deviceId)) {
+        if (!igmpLeadershipService.isLocalLeader(deviceId)) {
             return;
         }
 
@@ -241,7 +244,7 @@
     }
 
     public void sendIgmpPacket(Ethernet ethPkt, DeviceId deviceId, PortNumber portNumber) {
-        if (!mastershipService.isLocalMaster(deviceId)) {
+        if (!igmpLeadershipService.isLocalLeader(deviceId)) {
             return;
         }
 
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
deleted file mode 100644
index c4a17c9..0000000
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.igmpproxy.impl;
-
-import com.google.common.collect.Maps;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implement the timer for igmp state machine.
- */
-public final class IgmpTimer {
-
-    public static final int INVALID_TIMER_ID = 0;
-    public static int timerId = INVALID_TIMER_ID + 1;
-    private static Map<Integer, SingleTimer> igmpTimerMap = Maps.newConcurrentMap();
-
-    private IgmpTimer(){
-
-    }
-    private static int getId() {
-        return timerId++;
-    }
-
-    public static int start(SingleStateMachine machine, int timeOut) {
-        int id = getId();
-        igmpTimerMap.put(id, new SingleTimer(machine, timeOut));
-        return id;
-    }
-
-    public static int reset(int oldId, SingleStateMachine machine, int timeOut) {
-        igmpTimerMap.remove(new Integer(oldId));
-        int id = getId();
-        igmpTimerMap.put(new Integer(id), new SingleTimer(machine, timeOut));
-        return id;
-    }
-
-    public static void cancel(int id) {
-        igmpTimerMap.remove(new Integer(id));
-    }
-
-
-    static void timeOut1s() {
-        Set mapSet = igmpTimerMap.entrySet();
-        Iterator itr = mapSet.iterator();
-        while (itr.hasNext()) {
-            Map.Entry entry = (Map.Entry) itr.next();
-            SingleTimer single = (SingleTimer) entry.getValue();
-            if (single.timeOut > 0) {
-                single.timeOut--;
-            } else {
-                single.machine.timeOut();
-                itr.remove();
-            }
-        }
-    }
-
-    static class SingleTimer {
-
-        public int timeOut;  // unit is 1 second
-        public SingleStateMachine machine;
-
-        public SingleTimer(SingleStateMachine machine, int timeOut) {
-            this.machine = machine;
-            this.timeOut = timeOut;
-        }
-
-    }
-}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
index 94300b5..c99a3b7 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
@@ -26,5 +26,4 @@
 
     public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
     public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
-
 }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
index ca68bf6..83f2ef2 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
@@ -15,182 +15,218 @@
  */
 package org.opencord.igmpproxy.impl;
 
-import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.impl.state.DelayMember;
+import org.opencord.igmpproxy.impl.state.IdleMember;
+import org.opencord.igmpproxy.impl.state.NonMember;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.State;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
 
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * State machine for single IGMP group member. The state machine is implemented on
- * RFC 2236 "6. Host State Diagram".
- */
-public class SingleStateMachine {
-    // Only for tests purposes
-    static boolean sendQuery = true;
 
-    static final int STATE_NON = 0;
-    static final int STATE_DELAY = 1;
-    static final int STATE_IDLE = 2;
-    static final int TRANSITION_JOIN = 0;
-    static final int TRANSITION_LEAVE = 1;
-    static final int TRANSITION_QUERY = 2;
-    static final int TRANSITION_TIMEOUT = 3;
-    static final int DEFAULT_MAX_RESP = 0xfffffff;
-    static final int DEFAULT_COUNT = 1;
-    private DeviceId devId;
-    private Ip4Address groupIp;
+/**
+ * State machine implementation.
+ */
+public final class SingleStateMachine implements StateMachine {
+    public static final int DEFAULT_MAX_RESP = 0xfffffff;
+
+    private StateMachineId stateMachineId;
+    private int currentState;
+
     private Ip4Address srcIp;
     private PortNumber upLinkPort;
 
-    private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
-    private int timerId = IgmpTimer.INVALID_TIMER_ID;
-    private int timeOut = DEFAULT_MAX_RESP;
-    private State[] states =
-            {
-                    new NonMember(), new DelayMember(), new IdleMember()
-            };
+    private AtomicInteger timeOut;  // unit is 1 second
+    private State[] states;
     private int[] nonTransition =
-            {STATE_DELAY, STATE_NON, STATE_NON, STATE_NON};
+            {State.STATE_DELAY, State.STATE_NON,
+                    State.STATE_NON, State.STATE_NON};
     private int[] delayTransition =
-            {STATE_DELAY, STATE_NON, STATE_DELAY, STATE_IDLE};
+            {State.STATE_DELAY, State.STATE_NON,
+                    State.STATE_DELAY, State.STATE_IDLE};
     private int[] idleTransition =
-            {STATE_IDLE, STATE_NON, STATE_DELAY, STATE_IDLE};
+            {State.STATE_IDLE, State.STATE_NON,
+                    State.STATE_DELAY, State.STATE_IDLE};
     //THE TRANSITION TABLE
     private int[][] transition =
             {nonTransition, delayTransition, idleTransition};
-    private int currentState = STATE_NON;
 
-    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
-        this.devId = devId;
-        this.groupIp = groupIp;
-        this.srcIp = src;
+    /**
+     * Constructor for serializer.
+     */
+    private SingleStateMachine() {
+        this.stateMachineId = null;
+        this.srcIp = null;
+        this.upLinkPort = null;
+        this.timeOut = null;
+        this.states = null;
+    }
+
+    /**
+     * Constructor of single state machine.
+     *
+     * @param deviceId   device id of state-machine
+     * @param groupIp    group id of state-machine
+     * @param srcIp      source ip of state-machine
+     * @param upLinkPort uplink port of state-machine
+     */
+    public SingleStateMachine(DeviceId deviceId,
+                              Ip4Address groupIp,
+                              Ip4Address srcIp,
+                              PortNumber upLinkPort) {
+        this.stateMachineId = StateMachineId.of(deviceId, groupIp);
+        this.srcIp = srcIp;
         this.upLinkPort = upLinkPort;
+        this.currentState = State.STATE_NON;
+        this.timeOut = null;
+        this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
     }
 
-    public Ip4Address getGroupIp() {
-        return groupIp;
-    }
-
-    public DeviceId getDeviceId() {
-        return devId;
-    }
-    public boolean increaseCounter() {
-        count.incrementAndGet();
-        return true;
-    }
-
-    public boolean decreaseCounter() {
-        if (count.get() > 0) {
-            count.decrementAndGet();
-            return true;
+    /**
+     * Constructor of single state machine.
+     *
+     * @param machineId id of state-machine
+     * @param srcIp source ip of state-machine
+     * @param upLinkPort uplink port of state-machine
+     * @param currentState current state of state-machine
+     * @param timeout timeout value of state-machine
+     */
+    public SingleStateMachine(StateMachineId machineId,
+                              Ip4Address srcIp,
+                              PortNumber upLinkPort,
+                              int currentState,
+                              Integer timeout) {
+        this.stateMachineId = machineId;
+        this.srcIp = srcIp;
+        this.upLinkPort = upLinkPort;
+        this.currentState = currentState;
+        if (timeout != null) {
+            createTimeOut(timeout);
         } else {
-            return false;
+            this.timeOut = null;
         }
+        this.states = new State[]{new NonMember(this), new DelayMember(this), new IdleMember(this)};
     }
 
-    public int getCounter() {
-        return count.get();
+    @Override
+    public StateMachineId getStateMachineId() {
+        return this.stateMachineId;
     }
+
+    @Override
+    public Ip4Address getGroupIp() {
+        return this.stateMachineId.getGroupIp();
+    }
+
+    @Override
+    public DeviceId getDeviceId() {
+        return this.stateMachineId.getDeviceId();
+    }
+
+
+    @Override
+    public Ip4Address getSrcIp() {
+        return srcIp;
+    }
+
+    public PortNumber getUpLinkPort() {
+        return upLinkPort;
+    }
+
+    @Override
     public int currentState() {
-        return currentState;
+        return this.currentState;
+    }
+
+    public Integer getTimeOut() {
+        return timeOut == null ? null : timeOut.get();
+    }
+
+    @Override
+    public void increaseTimeOut() {
+        this.timeOut.getAndIncrement();
+    }
+
+    @Override
+    public void decreaseTimeOut() {
+        this.timeOut.getAndDecrement();
+    }
+
+    @Override
+    public void setMaxTimeout() {
+        this.timeOut.set(DEFAULT_MAX_RESP);
+    }
+
+    @Override
+    public void startTimer(int timeout) {
+        createTimeOut(timeout);
+    }
+
+    @Override
+    public void resetTimer(int timeout) {
+        setTimeOut(timeout);
+    }
+
+    @Override
+    public void destroyTimer() {
+        setTimeOut(null);
+    }
+
+    @Override
+    public void join(boolean messageOutAllowed) {
+        if (messageOutAllowed) {
+            State state = states[currentState];
+            state.join();
+        }
+        next(State.TRANSITION_JOIN);
+    }
+
+    @Override
+    public void leave(boolean messageOutAllowed) {
+        if (messageOutAllowed) {
+            State state = states[currentState];
+            state.leave();
+        }
+        next(State.TRANSITION_LEAVE);
+    }
+
+    @Override
+    public void query(int maxResp) {
+        State state = states[currentState];
+        state.query(maxResp);
+        next(State.TRANSITION_QUERY);
+    }
+
+    @Override
+    public void timeOut(boolean sendQuery) {
+        if (sendQuery) {
+            State state = states[currentState];
+            state.timeOut();
+        }
+        next(State.TRANSITION_TIMEOUT);
     }
 
     private void next(int msg) {
-        currentState = transition[currentState][msg];
+        this.currentState = transition[currentState][msg];
     }
 
-    public void join(boolean messageOutAllowed) {
-        states[currentState].join(messageOutAllowed);
-        next(TRANSITION_JOIN);
+    private void setTimeOut(int timeout) {
+        timeOut.set(timeout);
     }
 
-    public void leave(boolean messageOutAllowed) {
-        states[currentState].leave(messageOutAllowed);
-        next(TRANSITION_LEAVE);
-    }
-
-    public void query(int maxResp) {
-        states[currentState].query(maxResp);
-        next(TRANSITION_QUERY);
-    }
-
-    public void timeOut() {
-        states[currentState].timeOut();
-        next(TRANSITION_TIMEOUT);
-    }
-
-    int getTimeOut(int maxTimeOut) {
-        Random random = new Random();
-        return Math.abs(random.nextInt()) % maxTimeOut;
-    }
-
-    protected void cancelTimer() {
-        if (IgmpTimer.INVALID_TIMER_ID != timerId) {
-            IgmpTimer.cancel(timerId);
+    private void setTimeOut(Integer timeout) {
+        if (timeout == null) {
+            timeOut = null;
+        } else {
+            timeOut.set(timeout);
         }
     }
 
-    class State {
-        public void join(boolean messageOutAllowed) {
-        }
-
-        public void leave(boolean messageOutAllowed) {
-            if (messageOutAllowed) {
-                Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
-                        IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp) :
-                        IgmpSender.getInstance().buildIgmpV2Leave(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
-            }
-        }
-
-        public void query(int maxResp) {
-        }
-
-        public void timeOut() {
-        }
-
-    }
-
-    class NonMember extends State {
-        public void join(boolean messageOutAllowed) {
-            if (messageOutAllowed) {
-                Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
-                        IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp) :
-                        IgmpSender.getInstance().buildIgmpV2Join(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
-                timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
-                timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
-            }
-        }
-    }
-
-    class DelayMember extends State {
-        public void query(int maxResp) {
-            if (maxResp < timeOut) {
-                timeOut = getTimeOut(maxResp);
-                timerId = IgmpTimer.reset(timerId, SingleStateMachine.this, timeOut);
-            }
-        }
-
-        public void timeOut() {
-            if (sendQuery) {
-                Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
-                        IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp) :
-                        IgmpSender.getInstance().buildIgmpV2ResponseQuery(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
-                timeOut = DEFAULT_MAX_RESP;
-            }
-        }
-
-    }
-
-    class IdleMember extends State {
-        public void query(int maxResp) {
-            timeOut = getTimeOut(maxResp);
-            timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
-        }
+    private void createTimeOut(Integer timeout) {
+        timeOut = new AtomicInteger(timeout);
     }
 }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java
new file mode 100644
index 0000000..5cc255d
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachineSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+/**
+ * Custom serializer for {@link SingleStateMachine}.
+ */
+public class SingleStateMachineSerializer extends Serializer<SingleStateMachine> {
+    /**
+     * Creates serializer instance.
+     */
+    public SingleStateMachineSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, SingleStateMachine singleStateMachine) {
+        kryo.writeClassAndObject(output, singleStateMachine.getStateMachineId());
+        kryo.writeClassAndObject(output, singleStateMachine.getSrcIp());
+        kryo.writeClassAndObject(output, singleStateMachine.getUpLinkPort());
+        kryo.writeClassAndObject(output, singleStateMachine.getTimeOut());
+        kryo.writeObject(output, singleStateMachine.currentState());
+    }
+
+    @Override
+    public SingleStateMachine read(Kryo kryo, Input input, Class<SingleStateMachine> aClass) {
+        StateMachineId stateMachineId = (StateMachineId) kryo.readClassAndObject(input);
+        Ip4Address srcIp = (Ip4Address) kryo.readClassAndObject(input);
+        PortNumber uplinkPort = (PortNumber) kryo.readClassAndObject(input);
+        Integer timeout = (Integer) kryo.readClassAndObject(input);
+        int currentState = kryo.readObject(input, int.class);
+        return new SingleStateMachine(stateMachineId, srcIp, uplinkPort, currentState, timeout);
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
deleted file mode 100644
index e87756d..0000000
--- a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.igmpproxy.impl;
-
-import com.google.common.collect.Maps;
-import org.onlab.packet.Ip4Address;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * State machine for whole IGMP process. The state machine is implemented on
- * RFC 2236 "6. Host State Diagram".
- */
-public final class StateMachine {
-
-    private static final String GROUP = "Group";
-
-    private StateMachine() {
-
-    }
-    private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
-
-    private static String getId(DeviceId devId, Ip4Address groupIp) {
-        return devId.toString() + GROUP + groupIp.toString();
-    }
-
-    private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
-        String id = getId(devId, groupIp);
-        return map.get(id);
-    }
-
-    public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
-        SingleStateMachine machine = get(devId, groupIp);
-        if (null == machine) {
-            return;
-        }
-        machine.cancelTimer();
-        map.remove(getId(devId, groupIp));
-    }
-
-    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
-        SingleStateMachine machine = get(devId, groupIp);
-
-        if (null == machine) {
-            machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
-            map.put(getId(devId, groupIp), machine);
-
-            boolean shouldSendJoin = true;
-            if (IgmpManager.isIgmpOnPodBasis() &&
-                    groupListenedByOtherDevices(devId, groupIp)) {
-                // unset the flag if igmp messages are evaluated on POD basis
-                // and there are already active members of this group
-                // across the entire POD
-                shouldSendJoin = false;
-            }
-            machine.join(shouldSendJoin);
-            return true;
-        }
-        machine.increaseCounter();
-        return false;
-    }
-
-    public static boolean leave(DeviceId devId, Ip4Address groupIp) {
-        SingleStateMachine machine = get(devId, groupIp);
-        if (null == machine) {
-            return false;
-        }
-
-        machine.decreaseCounter();
-        // make sure machine instance still exists.
-        // it may be removed by the preceding thread
-        if (machine.getCounter() == 0) {
-            boolean shouldSendLeave = true;
-            if (IgmpManager.isIgmpOnPodBasis() &&
-                    groupListenedByOtherDevices(devId, groupIp)) {
-                // unset the flag if igmp messages are evaluated on POD basis
-                // and there are still active members of this group
-                // across the entire POD
-                shouldSendLeave = false;
-            }
-            machine.leave(shouldSendLeave);
-            destroySingle(devId, groupIp);
-            return true;
-        }
-        return false;
-    }
-
-    static void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
-        SingleStateMachine machine = get(devId, groupIp);
-        if (null == machine) {
-            return;
-        }
-        machine.query(maxResp);
-    }
-
-    static void generalQuery(DeviceId devId, int maxResp) {
-        for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
-            SingleStateMachine machine = entry.getValue();
-            if (devId.equals(machine.getDeviceId())) {
-                machine.query(maxResp);
-            }
-        }
-    }
-
-    static void generalQuery(int maxResp) {
-        for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
-            SingleStateMachine machine = entry.getValue();
-            machine.query(maxResp);
-        }
-    }
-
-    public static Set<Map.Entry<String, SingleStateMachine>> entrySet() {
-        return map.entrySet();
-    }
-
-    public static void timeOut(DeviceId devId, Ip4Address groupIp) {
-        SingleStateMachine machine = get(devId, groupIp);
-        if (null == machine) {
-            return;
-        }
-        machine.timeOut();
-    }
-
-    public static void clearMap() {
-        map.clear();
-    }
-
-    /**
-     * @param devId   id of the device being excluded
-     * @param groupIp group IP address
-     * @return true if this group has at least one listener connected to
-     * any device in the map except for the device specified; false otherwise.
-     */
-    private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
-        for (SingleStateMachine machine : map.values()) {
-            if (machine.getDeviceId().equals(devId)) {
-                continue;
-            }
-            if (machine.getGroupIp().equals(groupIp)) {
-                //means group is being listened by other peers in the domain
-                return true;
-            }
-        }
-        return false;
-    }
-
-}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
new file mode 100644
index 0000000..8c29a2f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachineManager.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.igmpproxy.impl.store.machine.StateMachineStore;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+@Component(immediate = true, service = StateMachineService.class)
+public class StateMachineManager implements StateMachineService {
+    // Only for tests purposes
+    public static boolean sendQuery = true;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StateMachineStore stateMachineStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService igmpLeadershipService;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        log.info("Stopped.");
+    }
+
+    private StateMachine get(DeviceId devId, Ip4Address groupIp) {
+        StateMachineId id = StateMachineId.of(devId, groupIp);
+        return stateMachineStore.getStateMachine(id);
+    }
+
+    public void destroySingle(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine has already been destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return;
+        }
+        stateMachineStore.removeStateMachine(machine.getStateMachineId());
+        stateMachineStore.removeCounter(machine.getStateMachineId());
+    }
+
+    @Override
+    public boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
+        StateMachine machineInstance = get(devId, groupIp);
+
+        if (machineInstance == null) {
+            machineInstance = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
+            stateMachineStore.putStateMachine(machineInstance);
+            stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+            log.debug("Instance of machine created with id: {}", machineInstance.getStateMachineId());
+            boolean shouldSendJoin = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are already active members of this group
+                // across the entire POD
+                shouldSendJoin = false;
+            }
+            machineInstance.join(shouldSendJoin);
+            stateMachineStore.updateStateMachine(machineInstance);
+            return true;
+        }
+        log.debug("Instance of machine has already been created. deviceId: {} and groupIp: {}",
+                devId, groupIp);
+        stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
+        return false;
+    }
+
+    @Override
+    public boolean leave(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine has already been left and destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return false;
+        }
+
+        long count = stateMachineStore.decreaseAndGetCounter(machine.getStateMachineId());
+        // make sure machine instance still exists.
+        // it may be removed by the preceding thread
+        if (count == 0) {
+            boolean shouldSendLeave = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are still active members of this group
+                // across the entire POD
+                shouldSendLeave = false;
+            }
+            machine.leave(shouldSendLeave);
+            destroySingle(devId, groupIp);
+            log.debug("This machine left and destroyed. deviceId: {} and groupIp: {}", devId, groupIp);
+            return true;
+        }
+        log.debug("This machine still has member/members. number of member/members: {}, deviceId: {}, groupIp: {} ",
+                count, devId, groupIp);
+        return false;
+    }
+
+    @Override
+    public void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine is not found. deviceId: {} and groupIp: {} ", devId, groupIp);
+            return;
+        }
+        machine.query(maxResp);
+        stateMachineStore.updateStateMachine(machine);
+    }
+
+    @Override
+    public void generalQuery(DeviceId devId, int maxResp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            if (devId.equals(machine.getDeviceId())) {
+                machine.query(maxResp);
+                stateMachineStore.updateStateMachine(machine);
+            }
+        }
+    }
+
+    @Override
+    public void generalQuery(int maxResp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            machine.query(maxResp);
+            stateMachineStore.updateStateMachine(machine);
+        }
+    }
+
+    @Override
+    public void timeOut(DeviceId devId, Ip4Address groupIp) {
+        StateMachine machine = get(devId, groupIp);
+        if (machine == null) {
+            log.debug("Machine is not found. deviceId: {} and groupIp: {}", devId, groupIp);
+            return;
+        }
+        machine.timeOut(sendQuery);
+        stateMachineStore.updateStateMachine(machine);
+    }
+
+    @Override
+    public void timeOut1s() {
+        Collection<StateMachine> mapSet = stateMachineStore.getAllStateMachines();
+        for (StateMachine machineInfo : mapSet) {
+            if (machineInfo.getTimeOut() != null) {
+                StateMachineId machineId = machineInfo.getStateMachineId();
+                if (igmpLeadershipService.isLocalLeader(machineId.getDeviceId())) {
+                    if (machineInfo.getTimeOut() > 0) {
+                        stateMachineStore.decreaseTimeout(machineId);
+                    } else {
+                        StateMachine machine = stateMachineStore.getStateMachine(machineId);
+                        machine.timeOut(sendQuery);
+                        machine.destroyTimer();
+                        stateMachineStore.updateStateMachine(machine);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void clearAllMaps() {
+        stateMachineStore.clearAllStateMachines();
+    }
+
+    /**
+     * @param devId   id of the device being excluded
+     * @param groupIp group IP address
+     * @return true if this group has at least one listener connected to
+     * any device in the map except for the device specified; false otherwise.
+     */
+    private boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+        for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
+            if (machine.getStateMachineId().getDeviceId().equals(devId)) {
+                continue;
+            }
+            if (machine.getStateMachineId().getGroupIp().equals(groupIp)) {
+                //means group is being listened by other peers in the domain
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java
new file mode 100644
index 0000000..40fe4e5
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/AbstractState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.State;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+import java.util.Random;
+
+/**
+ * Abstract implementation of state.
+ */
+public abstract class AbstractState implements State {
+
+    protected StateMachine machine;
+
+    protected AbstractState(StateMachine machine) {
+        this.machine = machine;
+    }
+
+    public void join() {
+    }
+
+    public void leave() {
+        DeviceId devId = machine.getStateMachineId().getDeviceId();
+        Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+        Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+                IgmpSender.getInstance().buildIgmpV3Leave(groupIp, machine.getSrcIp()) :
+                IgmpSender.getInstance().buildIgmpV2Leave(groupIp, machine.getSrcIp());
+        IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+    }
+
+    public void query(int maxResp) {
+    }
+
+    public void timeOut() {
+    }
+
+    protected int getTimeOut(int maxTimeOut) {
+        Random random = new Random();
+        return Math.abs(random.nextInt()) % maxTimeOut;
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java
new file mode 100644
index 0000000..6d6b7e0
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/DelayMember.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of delay-member state.
+ */
+public class DelayMember extends AbstractState {
+    public DelayMember(StateMachine machine) {
+        super(machine);
+    }
+
+    @Override
+    public void query(int maxResp) {
+        if (maxResp < machine.getTimeOut()) {
+            int timeout = getTimeOut(maxResp);
+            machine.resetTimer(timeout);
+        }
+    }
+
+    @Override
+    public void timeOut() {
+        DeviceId devId = machine.getStateMachineId().getDeviceId();
+        Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+        Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+                IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, machine.getSrcIp()) :
+                IgmpSender.getInstance().buildIgmpV2ResponseQuery(groupIp, machine.getSrcIp());
+        IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+        machine.setMaxTimeout();
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java
new file mode 100644
index 0000000..63b030c
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/IdleMember.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of idle-member state.
+ */
+public class IdleMember extends AbstractState {
+    public IdleMember(StateMachine machine) {
+        super(machine);
+    }
+
+    @Override
+    public void query(int maxResp) {
+        int timeout = getTimeOut(maxResp);
+        machine.startTimer(timeout);
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java
new file mode 100644
index 0000000..1437a6f
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/NonMember.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.state;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.opencord.igmpproxy.impl.IgmpManager;
+import org.opencord.igmpproxy.impl.IgmpSender;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * Implementation of non-member state.
+ */
+public class NonMember extends AbstractState {
+    public NonMember(StateMachine machine) {
+        super(machine);
+    }
+
+    @Override
+    public void join() {
+        DeviceId devId = machine.getStateMachineId().getDeviceId();
+        Ip4Address groupIp = machine.getStateMachineId().getGroupIp();
+
+        Ethernet eth = IgmpManager.outgoingIgmpWithV3() ?
+                IgmpSender.getInstance().buildIgmpV3Join(groupIp, machine.getSrcIp()) :
+                IgmpSender.getInstance().buildIgmpV2Join(groupIp, machine.getSrcIp());
+        IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, machine.getUpLinkPort());
+        int timeout = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+        machine.startTimer(timeout);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java
new file mode 100644
index 0000000..706a222
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/state/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of IGMPProxy application.
+ */
+package org.opencord.igmpproxy.impl.state;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java
new file mode 100644
index 0000000..9a3af33
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/AbstractGroupMemberStore.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.store.AbstractStore;
+import org.opencord.igmpproxy.GroupMemberId;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract implementation of group-member store.
+ */
+public abstract class AbstractGroupMemberStore extends AbstractStore<GroupMemberEvent, GroupMemberStoreDelegate>
+        implements GroupMemberStore {
+
+    protected Map<GroupMemberId, GroupMember> groupMemberMap;
+
+    protected AbstractGroupMemberStore() {
+
+    }
+
+    protected AbstractGroupMemberStore(Map<GroupMemberId, GroupMember> groupMemberMap) {
+        this.groupMemberMap = groupMemberMap;
+    }
+
+    @Override
+    public GroupMember putGroupMember(GroupMember groupMember) {
+        return groupMemberMap.put(groupMember.getGroupMemberId(), groupMember);
+    }
+
+    @Override
+    public GroupMember updateGroupMember(GroupMember groupMember) {
+        return groupMemberMap.replace(groupMember.getGroupMemberId(), groupMember);
+    }
+
+    @Override
+    public GroupMember removeGroupMember(GroupMemberId groupMemberId) {
+        return groupMemberMap.remove(groupMemberId);
+    }
+
+    @Override
+    public GroupMember getGroupMember(GroupMemberId groupMemberId) {
+        return groupMemberMap.getOrDefault(groupMemberId, null);
+    }
+
+    @Override
+    public Set<GroupMemberId> getAllGroupMemberIds() {
+        return ImmutableSet.copyOf(groupMemberMap.keySet());
+    }
+
+    @Override
+    public Collection<GroupMember> getAllGroupMembers() {
+        return ImmutableList.copyOf(groupMemberMap.values());
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java
new file mode 100644
index 0000000..01f9af9
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/DistributedGroupMemberStore.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.GroupMemberIdSerializer;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Group member store based on distributed storage.
+ */
+@Component(service = GroupMemberStore.class)
+public class DistributedGroupMemberStore extends AbstractGroupMemberStore {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final String GROUP_MEMBER_MAP_NAME = "onos-igmpproxy-groupmember-table";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    private ConsistentMap<GroupMemberId, GroupMember> consistentMap;
+
+    public DistributedGroupMemberStore() {
+        super();
+    }
+
+    @Activate
+    public void activate() {
+        KryoNamespace groupMemberSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(new GroupMemberIdSerializer(), GroupMemberId.class)
+                .register(GroupMember.class)
+                .build();
+        consistentMap = storageService.<GroupMemberId, GroupMember>consistentMapBuilder()
+                .withName(GROUP_MEMBER_MAP_NAME)
+                .withSerializer(Serializer.using(groupMemberSerializer))
+                .build();
+        groupMemberMap = consistentMap.asJavaMap();
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        groupMemberMap.clear();
+        groupMemberMap = null;
+        consistentMap.destroy();
+        log.info("Stopped.");
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
similarity index 63%
rename from app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
rename to app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
index a05b962..0add46c 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMember.java
@@ -13,13 +13,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.opencord.igmpproxy.impl;
+package org.opencord.igmpproxy.impl.store.groupmember;
 
 import org.onlab.packet.IGMPMembership;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.VlanId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
+import org.opencord.igmpproxy.GroupMemberId;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -27,12 +28,10 @@
 /**
  * Date struct to keep Igmp member infomations.
  */
-public final class  GroupMember {
+public final class GroupMember {
 
     private final VlanId vlan;
-    private final DeviceId deviceId;
-    private final PortNumber portNumber;
-    private final Ip4Address groupIp;
+    private final GroupMemberId groupMemberId;
     private final boolean v2;
     private byte recordType = IGMPMembership.MODE_IS_INCLUDE;
     private ArrayList<Ip4Address> sourceList = new ArrayList<>();
@@ -43,23 +42,17 @@
     private boolean leave = false;
 
     public GroupMember(Ip4Address groupIp, VlanId vlan, DeviceId deviceId, PortNumber portNum, boolean isV2) {
-        this.groupIp = groupIp;
         this.vlan = vlan;
-        this.deviceId = deviceId;
-        this.portNumber = portNum;
+        this.groupMemberId = GroupMemberId.of(groupIp, deviceId, portNum);
         v2 = isV2;
     }
 
-    static String getkey(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
-        return groupIp.toString() + deviceId.toString() + portNum.toString();
-    }
-
     public String getkey() {
-        return GroupMember.getkey(groupIp, deviceId, portNumber);
+        return groupMemberId.toString();
     }
 
-    public String getId() {
-        return getkey();
+    public GroupMemberId getGroupMemberId() {
+        return groupMemberId;
     }
 
     public VlanId getvlan() {
@@ -67,15 +60,15 @@
     }
 
     public DeviceId getDeviceId() {
-        return deviceId;
+        return groupMemberId.getDeviceId();
     }
 
     public PortNumber getPortNumber() {
-        return portNumber;
+        return groupMemberId.getPortNum();
     }
 
     public Ip4Address getGroupIp() {
-        return groupIp;
+        return groupMemberId.getGroupIp();
     }
 
     public byte getRecordType() {
@@ -95,63 +88,8 @@
         this.recordType = recordType;
         this.sourceList.clear();
         this.sourceList.addAll(newSourceList);
-
-        /*TODO : support SSM
-        if (this.recordType == IGMPMembership.MODE_IS_INCLUDE) {
-            switch (recordType) {
-                case IGMPMembership.MODE_IS_INCLUDE:
-                case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
-                    //however , set to include<B> anyway
-                    this.sourceList = sourceList;
-                    this.recordType = IGMPMembership.MODE_IS_INCLUDE;
-                    break;
-                case IGMPMembership.MODE_IS_EXCLUDE:
-                case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
-                    //set to exclude<B>
-                    this.sourceList = sourceList;
-                    this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
-                    break;
-                case IGMPMembership.ALLOW_NEW_SOURCES:
-                    //set to include <A+B>
-                    join(this.sourceList, sourceList);
-                    break;
-                case IGMPMembership.BLOCK_OLD_SOURCES:
-                    //set to include <A-B>
-                    exclude(this.sourceList, sourceList);
-                    break;
-                default:
-                    break;
-            }
-        } else if (this.recordType == IGMPMembership.MODE_IS_EXCLUDE) {
-            switch (recordType) {
-                case IGMPMembership.MODE_IS_INCLUDE:
-                case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
-                    //set to include<B>
-                    this.recordType = IGMPMembership.MODE_IS_INCLUDE;
-                    this.sourceList = sourceList;
-                    break;
-                case IGMPMembership.MODE_IS_EXCLUDE:
-                case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
-                    this.sourceList = sourceList;
-                    this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
-                    break;
-                case IGMPMembership.ALLOW_NEW_SOURCES:
-                    //set to exclude <A-B>
-                    exclude(this.sourceList, sourceList);
-                    break;
-                case IGMPMembership.BLOCK_OLD_SOURCES:
-                    //set to exclude <A+B>
-                    join(this.sourceList, sourceList);
-                    break;
-                default:
-                    break;
-            }
-        }*/
-
-        return;
     }
 
-
     /*join B to A (A+B)*/
     private void join(ArrayList<Integer> listA, ArrayList<Integer> listB) {
         Iterator<Integer> iterA = null;
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java
new file mode 100644
index 0000000..494b89b
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Group-member event.
+ */
+public class GroupMemberEvent extends
+        AbstractEvent<GroupMemberEvent.Type, GroupMember> {
+
+    /**
+     * Type of group-member event.
+     */
+    public enum Type {
+        /**
+         * Signifies that group-member added to store.
+         */
+        GROUP_MEMBER_ADDED,
+        /**
+         * Signifies that group-member updated in store.
+         */
+        GROUP_MEMBER_UPDATED,
+        /**
+         * Signifies that group-member has been removed from store.
+         */
+        GROUP_MEMBER_REMOVED
+    }
+
+    /**
+     *
+     * @param type group-member event type.
+     * @param subject group-member.
+     */
+    public GroupMemberEvent(GroupMemberEvent.Type type, GroupMember subject) {
+        super(type, subject);
+    }
+
+    protected GroupMemberEvent(GroupMemberEvent.Type type, GroupMember subject, long time) {
+        super(type, subject, time);
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java
new file mode 100644
index 0000000..74f4ca7
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStore.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.opencord.igmpproxy.GroupMemberId;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Group Member Store Service.
+ */
+public interface GroupMemberStore {
+
+    /**
+     * Add new group-member to store.
+     *
+     * @param groupMember given group-member.
+     * @return added group-member.
+     */
+    GroupMember putGroupMember(GroupMember groupMember);
+
+    /**
+     * Update group-member in store.
+     *
+     * @param groupMember given group-member.
+     * @return updated group-member.
+     */
+    GroupMember updateGroupMember(GroupMember groupMember);
+
+    /**
+     * removed group-member from store.
+     *
+     * @param groupMemberId given group-member id
+     * @return removed group-member
+     */
+    GroupMember removeGroupMember(GroupMemberId groupMemberId);
+
+    /**
+     * get group-member from store.
+     *
+     * @param groupMemberId given group-member identification
+     * @return group-member or null if not found
+     */
+    GroupMember getGroupMember(GroupMemberId groupMemberId);
+
+    /**
+     * Returns all group member ids.
+     *
+     * @return set of group member ids
+     */
+    Set<GroupMemberId> getAllGroupMemberIds();
+
+    /**
+     * Returns all group members.
+     *
+     * @return all group members
+     */
+    Collection<GroupMember> getAllGroupMembers();
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java
new file mode 100644
index 0000000..e9f5509
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/GroupMemberStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Group member store delegate abstraction.
+ */
+public interface GroupMemberStoreDelegate extends StoreDelegate<GroupMemberEvent> {
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java
new file mode 100644
index 0000000..7c025a6
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/groupmember/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of group-member store.
+ */
+package org.opencord.igmpproxy.impl.store.groupmember;
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java
new file mode 100644
index 0000000..0ecbb14
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/AbstractStateMachineStore.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.store.AbstractStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract implementation of state-machine store.
+ */
+public abstract class AbstractStateMachineStore
+        extends AbstractStore<StateMachineEvent, StateMachineStoreDelegate>
+        implements StateMachineStore {
+
+    protected Map<StateMachineId, StateMachine> stateMachineMap;
+
+    protected AbstractStateMachineStore() {
+    }
+
+    protected AbstractStateMachineStore(Map<StateMachineId, StateMachine> stateMachineMap) {
+        this.stateMachineMap = stateMachineMap;
+    }
+
+    @Override
+    public StateMachine putStateMachine(StateMachine stateMachine) {
+        return stateMachineMap.put(stateMachine.getStateMachineId(), stateMachine);
+    }
+
+    @Override
+    public StateMachine updateStateMachine(StateMachine machine) {
+        return stateMachineMap.replace(machine.getStateMachineId(), machine);
+    }
+
+    @Override
+    public StateMachine removeStateMachine(StateMachineId id) {
+        return stateMachineMap.remove(id);
+    }
+
+    @Override
+    public StateMachine getStateMachine(StateMachineId id) {
+        return stateMachineMap.get(id);
+    }
+
+    @Override
+    public Collection<StateMachine> getAllStateMachines() {
+        return ImmutableList.copyOf(stateMachineMap.values());
+    }
+
+    @Override
+    public void clearAllStateMachines() {
+        stateMachineMap.clear();
+        stateMachineMap = null;
+    }
+
+    @Override
+    public void decreaseTimeout(StateMachineId machineId) {
+        StateMachine machine = stateMachineMap.get(machineId);
+        machine.decreaseTimeOut();
+        updateStateMachine(machine);
+    }
+
+    @Override
+    public void increaseTimeout(StateMachineId machineId) {
+        StateMachine machine = stateMachineMap.get(machineId);
+        machine.increaseTimeOut();
+        updateStateMachine(machine);
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java
new file mode 100644
index 0000000..f337b09
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/DistributedStateMachineStore.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.igmpproxy.impl.SingleStateMachine;
+import org.opencord.igmpproxy.impl.SingleStateMachineSerializer;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+import org.opencord.igmpproxy.statemachine.StateMachineIdSerializer;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * State machine store based on distributed storage.
+ */
+@Component(service = StateMachineStore.class)
+public class DistributedStateMachineStore extends AbstractStateMachineStore {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final String STATE_MACHINE_COUNTER_STORE = "onos-state-machine-counter-store";
+    private static final String STATE_MACHINE_MAP_NAME = "onos-state-machine-store";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    private AtomicCounterMap<StateMachineId> stateMachineCounters;
+
+    private ConsistentMap<StateMachineId, StateMachine> consistentMap;
+
+    public DistributedStateMachineStore() {
+        super();
+    }
+
+    @Activate
+    public void activate() {
+        KryoNamespace.Builder stateMachineKryoBuilder = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(new StateMachineIdSerializer(), StateMachineId.class);
+
+        stateMachineCounters = storageService.<StateMachineId>atomicCounterMapBuilder()
+                .withName(STATE_MACHINE_COUNTER_STORE)
+                .withSerializer(Serializer.using(stateMachineKryoBuilder.build())).build();
+
+        stateMachineKryoBuilder
+                .register(new SingleStateMachineSerializer(), SingleStateMachine.class);
+        this.consistentMap = storageService.<StateMachineId, StateMachine>consistentMapBuilder()
+                .withName(STATE_MACHINE_MAP_NAME)
+                .withSerializer(Serializer.using(stateMachineKryoBuilder.build()))
+                .build();
+        super.stateMachineMap = consistentMap.asJavaMap();
+
+
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        stateMachineMap.clear();
+        stateMachineMap = null;
+        consistentMap.destroy();
+        stateMachineCounters.clear();
+        stateMachineCounters.destroy();
+        log.info("Stopped.");
+    }
+
+    @Override
+    public long increaseAndGetCounter(StateMachineId stateMachineId) {
+        return stateMachineCounters.incrementAndGet(stateMachineId);
+    }
+
+    @Override
+    public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+        if (stateMachineCounters.get(stateMachineId) > 0) {
+            return stateMachineCounters.decrementAndGet(stateMachineId);
+        } else {
+            return stateMachineCounters.get(stateMachineId);
+        }
+    }
+
+    @Override
+    public long getCounter(StateMachineId stateMachineId) {
+        return stateMachineCounters.get(stateMachineId);
+    }
+
+    @Override
+    public boolean removeCounter(StateMachineId stateMachineId) {
+        stateMachineCounters.remove(stateMachineId);
+        return true;
+    }
+
+    @Override
+    public void clearAllStateMachines() {
+        super.clearAllStateMachines();
+        stateMachineCounters.clear();
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java
new file mode 100644
index 0000000..ab9df23
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onosproject.event.AbstractEvent;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+
+/**
+ * State machine event.
+ */
+public class StateMachineEvent extends
+        AbstractEvent<StateMachineEvent.Type, StateMachine> {
+    /**
+     * Internal state-machine event type.
+     */
+    public enum Type {
+        /**
+         * Signifies that state-machine added to store.
+         */
+        STATE_MACHINE_ADDED,
+        /**
+         * Signifies that state-machine updated in store.
+         */
+        STATE_MACHINE_UPDATED,
+        /**
+         * Signifies that state-machine removed from store.
+         */
+        STATE_MACHINE_REMOVED
+    }
+
+    /**
+     * Creates new state machine event.
+     *
+     * @param type    state-machine event type.
+     * @param subject state machine.
+     */
+    public StateMachineEvent(Type type, StateMachine subject) {
+        super(type, subject);
+    }
+
+    protected StateMachineEvent(Type type, StateMachine subject, long time) {
+        super(type, subject, time);
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java
new file mode 100644
index 0000000..ddb566a
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStore.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
+
+import java.util.Collection;
+
+/**
+ * State Machine Store.
+ */
+public interface StateMachineStore {
+    /**
+     * Increases counter state-machine.
+     *
+     * @param stateMachineId identification of machine
+     * @return counter
+     */
+    long increaseAndGetCounter(StateMachineId stateMachineId);
+
+    /**
+     * Decreases counter of state-machine.
+     *
+     * @param stateMachineId identification of machine
+     * @return if counter greater than zero, decreases counter and return counter
+     */
+    long decreaseAndGetCounter(StateMachineId stateMachineId);
+
+    /**
+     * Removes counter from counter-map.
+     *
+     * @param stateMachineId identification of machine
+     * @return true
+     */
+    boolean removeCounter(StateMachineId stateMachineId);
+
+    /**
+     * Get counter using state-machine id.
+     *
+     * @param stateMachineId identification of machine
+     * @return counter of member that use this id
+     */
+    long getCounter(StateMachineId stateMachineId);
+
+    /**
+     * Add new machine to store.
+     *
+     * @param stateMachine given machine
+     * @return added informations
+     */
+    StateMachine putStateMachine(StateMachine stateMachine);
+
+    /**
+     * Update information in store.
+     *
+     * @param stateMachine given machine
+     * @return updated info
+     */
+    StateMachine updateStateMachine(StateMachine stateMachine);
+
+    /**
+     * Get information from store.
+     *
+     * @param id given identification of machine
+     * @return information of machine
+     */
+    StateMachine getStateMachine(StateMachineId id);
+
+    /**
+     * Remove machine from store.
+     *
+     * @param id given identification of machine
+     * @return removed info
+     */
+    StateMachine removeStateMachine(StateMachineId id);
+
+    /**
+     * Get informations of all machine.
+     *
+     * @return machine informations
+     */
+    Collection<StateMachine> getAllStateMachines();
+
+    /**
+     * clear information map.
+     */
+    void clearAllStateMachines();
+
+    /**
+     * Decreases timeout of timer.
+     *
+     * @param machineId given machine id
+     */
+    void decreaseTimeout(StateMachineId machineId);
+
+    /**
+     * Increases timeout of timer.
+     *
+     * @param machineId given machine id
+     */
+    void increaseTimeout(StateMachineId machineId);
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java
new file mode 100644
index 0000000..f8db089
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/StateMachineStoreDelegate.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl.store.machine;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * State machine store delegate abstraction.
+ */
+public interface StateMachineStoreDelegate extends StoreDelegate<StateMachineEvent> {
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java
new file mode 100644
index 0000000..2b9bc9e
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/store/machine/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of state-machine store.
+ */
+package org.opencord.igmpproxy.impl.store.machine;
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index b301f94..6283c08 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
 import org.onlab.packet.IGMPMembership;
@@ -59,6 +60,11 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.impl.store.groupmember.AbstractGroupMemberStore;
+import org.opencord.igmpproxy.impl.store.machine.AbstractStateMachineStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -78,6 +84,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class IgmpManagerBase {
 
@@ -96,7 +104,7 @@
 
     // Common connect point of aggregation switch used by all devices.
     protected static final ConnectPoint COMMON_CONNECT_POINT =
-           ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
+            ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
     // Uplink ports for two olts A and B
     protected static final PortNumber PORT_A = PortNumber.portNumber(1);
     protected static final PortNumber PORT_B = PortNumber.portNumber(2);
@@ -135,23 +143,24 @@
 
         @Override
         public Device getDevice(DeviceId deviceId) {
-           if (flagForDevice) {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                           .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
-               flagForDevice = false;
-               return deviceA;
+            if (flagForDevice) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
+                flagForDevice = false;
+                return deviceA;
             } else {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                          .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
-               return deviceB;
-           }
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
+                return deviceB;
+            }
         }
+
         @Override
         public List<Port> getPorts(DeviceId deviceId) {
             return lsPorts;
@@ -162,7 +171,7 @@
             DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
                     .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
             SparseAnnotations annotations = annotationsBuilder.build();
-            Annotations[] da = {annotations };
+            Annotations[] da = {annotations};
             Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_C, Device.Type.OTHER, "", "", "", "", null, da);
             lsDevices.add(deviceA);
             return lsDevices;
@@ -184,31 +193,32 @@
     static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
     static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
     ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
-         new ConfigFactory<ApplicationId, IgmpproxyConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
-    @Override
-    public IgmpproxyConfig createConfig() {
-          return new IgmpproxyConfig();
-        }
-    };
+            new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+                @Override
+                public IgmpproxyConfig createConfig() {
+                    return new IgmpproxyConfig();
+                }
+            };
 
     ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
-        new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+            new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
 
-        @Override
-        public IgmpproxySsmTranslateConfig createConfig() {
-            return new IgmpproxySsmTranslateConfig();
-        }
-    };
+                @Override
+                public IgmpproxySsmTranslateConfig createConfig() {
+                    return new IgmpproxySsmTranslateConfig();
+                }
+            };
 
 
     class MockIgmpProxyConfig extends IgmpproxyConfig {
         boolean igmpOnPodBasis = true;
 
         MockIgmpProxyConfig(boolean igmpFlagValue) {
-           igmpOnPodBasis = igmpFlagValue;
-       }
+            igmpOnPodBasis = igmpFlagValue;
+        }
+
         @Override
         public boolean igmpOnPodBasis() {
             return igmpOnPodBasis;
@@ -224,17 +234,18 @@
 
         @Override
         public ConnectPoint connectPoint() {
-               return COMMON_CONNECT_POINT;
+            return COMMON_CONNECT_POINT;
         }
     }
 
 
-     class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
-         Boolean igmpOnPodFlag = false;
+    class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+        Boolean igmpOnPodFlag = false;
 
-         TestNetworkConfigRegistry(Boolean igmpFlag) {
-             igmpOnPodFlag = igmpFlag;
-         }
+        TestNetworkConfigRegistry(Boolean igmpFlag) {
+            igmpOnPodFlag = igmpFlag;
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         public <S> Set<S> getSubjects(Class<S> subjectClass) {
@@ -242,19 +253,19 @@
                 return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
             }
             return null;
-       }
+        }
 
-         @SuppressWarnings("unchecked")
-         @Override
-         public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
-             if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
-                 IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
-                 return (C) igmpproxyConfig;
-             } else {
-                 super.getConfig(subject, configClass);
-             }
-             return null;
-         }
+        @SuppressWarnings("unchecked")
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
+                IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+                return (C) igmpproxyConfig;
+            } else {
+                super.getConfig(subject, configClass);
+            }
+            return null;
+        }
     }
 
 
@@ -276,12 +287,18 @@
         @Override
         public void emit(OutboundPacket packet) {
             synchronized (savedPackets) {
-               savedPackets.add(packet);
-               savedPackets.notify();
+                savedPackets.add(packet);
+                savedPackets.notify();
             }
         }
-     }
+    }
 
+    class TestIgmpLeaderShipService implements IgmpLeadershipService {
+        @Override
+        public boolean isLocalLeader(DeviceId deviceId) {
+            return true;
+        }
+    }
 
     class MockMastershipService extends MastershipServiceAdapter {
         @Override
@@ -407,9 +424,9 @@
      * Mocks the DefaultPacketContext.
      */
     final class TestPacketContext extends DefaultPacketContext {
-       TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
-         super(time, inPkt, outPkt, block);
-         }
+        TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
+            super(time, inPkt, outPkt, block);
+        }
 
         @Override
         public void send() {
@@ -423,31 +440,31 @@
      * @param reply Ethernet packet
      * @throws InterruptedException
      */
-     void sendPacket(Ethernet reply) {
+    void sendPacket(Ethernet reply) {
 
-         if (reply != null) {
-             final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
+        if (reply != null) {
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
 
-             if (flagForQueryPacket) {
-                 InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
-                 context = new TestPacketContext(127L, inBoundPacket, null, false);
-                 packetProcessor.process(context);
-             } else {
-                 if (flagForPacket) {
-                     InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inPacket, null, false);
-                     flagForPacket = false;
+            if (flagForQueryPacket) {
+                InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
+                context = new TestPacketContext(127L, inBoundPacket, null, false);
+                packetProcessor.process(context);
+            } else {
+                if (flagForPacket) {
+                    InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inPacket, null, false);
+                    flagForPacket = false;
 
-                     packetProcessor.process(context);
-                 } else {
-                     InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inBoundPacket, null, false);
-                     flagForPacket = true;
+                    packetProcessor.process(context);
+                } else {
+                    InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inBoundPacket, null, false);
+                    flagForPacket = true;
 
-                     packetProcessor.process(context);
-                 }
-             }
-         }
+                    packetProcessor.process(context);
+                }
+            }
+        }
     }
 
     protected class MockSadisService implements SadisService {
@@ -498,7 +515,7 @@
     private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
         MockSubscriberAndDeviceInformation sub =
                 new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
-                                                       CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+                        CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
 
         @Override
         public SubscriberAndDeviceInformation get(String id) {
@@ -653,15 +670,15 @@
         }
 
         @Override
-         public void disableComponent(String name) {
-             // TODO Auto-generated method stub
-         }
+        public void disableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
 
-         @Override
-         public ServiceReference getServiceReference() {
-             // TODO Auto-generated method stub
-             return null;
-         }
+        @Override
+        public ServiceReference getServiceReference() {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 
     Ethernet buildWrongIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -669,7 +686,7 @@
         igmpMembership.setRecordType((byte) 0x33);
 
         return IgmpSender.getInstance().buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp,
-            igmpMembership, sourceIp, false);
+                igmpMembership, sourceIp, false);
     }
 
     Ethernet buildUnknownIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -679,4 +696,53 @@
         return IgmpSender.getInstance().buildIgmpPacket((byte) 0x44, groupIp, igmpMembership, sourceIp, false);
     }
 
+    class TestStateMachineStoreService extends AbstractStateMachineStore {
+        private static final int DEFAULT_COUNT = 0;
+        private Map<StateMachineId, AtomicLong> countsMap;
+
+        public TestStateMachineStoreService(Map<StateMachineId, StateMachine> map) {
+            super();
+            stateMachineMap = Maps.newConcurrentMap();
+            countsMap = Maps.newConcurrentMap();
+        }
+
+        @Override
+        public long increaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count == null) {
+                count = new AtomicLong(DEFAULT_COUNT);
+                countsMap.put(stateMachineId, count);
+            }
+            return count.incrementAndGet();
+        }
+
+        @Override
+        public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count.get() > 0) {
+                return count.decrementAndGet();
+            } else {
+                return count.get();
+            }
+        }
+
+        @Override
+        public boolean removeCounter(StateMachineId stateMachineId) {
+            countsMap.remove(stateMachineId);
+            return true;
+        }
+
+        @Override
+        public long getCounter(StateMachineId stateMachineId) {
+            return countsMap.get(stateMachineId).get();
+        }
+
+    }
+
+
+    class TestGroupMemberStoreService extends AbstractGroupMemberStore {
+        public TestGroupMemberStoreService() {
+            super(Maps.newConcurrentMap());
+        }
+    }
 }
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
index dc220f8..e0daa14 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
@@ -15,6 +15,7 @@
  */
 package org.opencord.igmpproxy.impl;
 
+import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -23,7 +24,6 @@
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
-import org.opencord.igmpproxy.impl.IgmpManagerBase.MockComponentContext;
 
 import static org.junit.Assert.*;
 
@@ -42,6 +42,7 @@
     @Before
     public void setUp() {
         igmpManager = new IgmpManager();
+        igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
         igmpManager.coreService = new CoreServiceAdapter();
         igmpManager.mastershipService = new MockMastershipService();
         igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -55,16 +56,22 @@
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
+
+        igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+        StateMachineManager stateMachineManager = new StateMachineManager();
+        stateMachineManager.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+        stateMachineManager.activate(new MockComponentContext());
+        igmpManager.stateMachineService = stateMachineManager;
+
         // By default - we send query messages
-        SingleStateMachine.sendQuery = true;
+        StateMachineManager.sendQuery = true;
     }
 
     // Tear Down the IGMP application.
     @After
     public void tearDown() {
         igmpManager.deactivate();
-        IgmpManager.groupMemberMap.clear();
-        StateMachine.clearMap();
+        igmpManager.stateMachineService.clearAllMaps();
     }
 
     // Checking the Default value of IGMP_ON_POD_BASIS.
@@ -88,7 +95,7 @@
     @Test
     public void testIgmpOnPodBasisDefaultValue() throws InterruptedException {
         // We need to count join messages sent on the upstream
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -118,7 +125,7 @@
     @Test
     public void testIgmpOnPodBasisTrueValue() throws InterruptedException {
         // We need to count join messages
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(true);
         igmpManager.activate();
@@ -129,7 +136,7 @@
         sendPacket(firstPacket);
         // Emitted packet is stored in list savedPackets
         synchronized (savedPackets) {
-          savedPackets.wait(WAIT_TIMEOUT);
+            savedPackets.wait(WAIT_TIMEOUT);
         }
         assertNotNull(savedPackets);
         assertEquals(1, savedPackets.size());
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
index 2677511..b2e8271 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
@@ -21,6 +21,7 @@
 
 import java.util.List;
 
+import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,6 +54,7 @@
     @Before
     public void setUp() {
         igmpManager = new IgmpManager();
+        igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
         igmpManager.coreService = new CoreServiceAdapter();
         igmpManager.mastershipService = new MockMastershipService();
         igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -61,6 +63,11 @@
         igmpManager.flowRuleService = new FlowRuleServiceAdapter();
         igmpManager.multicastService = new TestMulticastRouteService();
         igmpManager.sadisService = new MockSadisService();
+        igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+        StateMachineManager stateMachineService = new StateMachineManager();
+        stateMachineService.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+        stateMachineService.activate(new MockComponentContext());
+        igmpManager.stateMachineService = stateMachineService;
         igmpStatisticsManager = new IgmpStatisticsManager();
         igmpStatisticsManager.cfgService = new MockCfgService();
         igmpStatisticsManager.addListener(mockListener);
@@ -68,7 +75,7 @@
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
         // By default - we send query messages
-        SingleStateMachine.sendQuery = true;
+        StateMachineManager.sendQuery = true;
     }
 
     // Tear Down the IGMP application.
@@ -76,14 +83,13 @@
     public void tearDown() {
         igmpStatisticsManager.removeListener(mockListener);
         igmpStatisticsManager.deactivate();
-        IgmpManager.groupMemberMap.clear();
-        StateMachine.clearMap();
+        igmpManager.stateMachineService.clearAllMaps();
     }
 
     //Test Igmp Statistics.
     @Test
     public void testIgmpStatistics() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
 
@@ -103,7 +109,7 @@
         }
 
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-            assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
+                assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
         assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
         assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
         assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
@@ -119,18 +125,18 @@
     //Test packet with Unknown Multicast IpAddress
     @Test
     public void testIgmpUnknownMulticastIpAddress() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
 
         Ethernet firstPacket =
-             IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
+                IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-             igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
     }
 
     //Test Igmp Query Statistics.
@@ -146,15 +152,15 @@
 
         //IGMPV3 General Membership Query packet
         Ethernet igmpv3MembershipQueryPkt1 =
-              IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
+                IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
         sendPacket(igmpv3MembershipQueryPkt1);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+                assertEquals(igmpStatisticsManager.getIgmpStats()
+                        .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
         assertEquals(igmpStatisticsManager.getIgmpStats()
-            .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
+                .getIgmpGeneralMembershipQuery().longValue(), 1);
         assertEquals(igmpStatisticsManager.getIgmpStats()
-            .getIgmpGeneralMembershipQuery().longValue(), 1);
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-             .getCurrentGrpNumCounter().longValue(), 1);
+                .getCurrentGrpNumCounter().longValue(), 1);
     }
 
     //Test Events
@@ -163,10 +169,10 @@
         final int waitEventGeneration = igmpStatisticsManager.statisticsGenerationPeriodInSeconds * 1000;
         //assert that event listened as the app activates
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-            assertEquals(mockListener.events.size(), 1));
+                assertEquals(mockListener.events.size(), 1));
 
         assertAfter(waitEventGeneration / 2, waitEventGeneration, () ->
-            assertEquals(mockListener.events.size(), 2));
+                assertEquals(mockListener.events.size(), 2));
 
         for (IgmpStatisticsEvent event : mockListener.events) {
             assertEquals(event.type(), IgmpStatisticsEvent.Type.STATS_UPDATE);
@@ -176,7 +182,7 @@
     //Test packet with Unknown Wrong Membership mode
     @Test
     public void testWrongIgmpPacket() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -185,14 +191,14 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
     }
 
     //Test packet with Unknown IGMP type.
     @Test
     public void testUnknownIgmpPacket() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -201,14 +207,14 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
     }
 
     //Test packet with Insufficient Permission.
     @Test
     public void testSufficientPermission() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         flagForPermission = true;
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
@@ -218,8 +224,9 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getFailJoinReqInsuffPermissionAccessCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats()
+                                .getFailJoinReqInsuffPermissionAccessCounter().longValue()));
     }
 
     public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {