Refactors IGMPProxy application in API and APP bundles

Change-Id: I465d6c0f49079804ae8e0a1f464581c25c6d2300
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java b/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
new file mode 100644
index 0000000..a05b962
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/GroupMember.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Date struct to keep Igmp member infomations.
+ */
+public final class  GroupMember {
+
+    private final VlanId vlan;
+    private final DeviceId deviceId;
+    private final PortNumber portNumber;
+    private final Ip4Address groupIp;
+    private final boolean v2;
+    private byte recordType = IGMPMembership.MODE_IS_INCLUDE;
+    private ArrayList<Ip4Address> sourceList = new ArrayList<>();
+    private int keepAliveQueryInterval = 0;
+    private int keepAliveQueryCount = 0;
+    private int lastQueryInterval = 0;
+    private int lastQueryCount = 0;
+    private boolean leave = false;
+
+    public GroupMember(Ip4Address groupIp, VlanId vlan, DeviceId deviceId, PortNumber portNum, boolean isV2) {
+        this.groupIp = groupIp;
+        this.vlan = vlan;
+        this.deviceId = deviceId;
+        this.portNumber = portNum;
+        v2 = isV2;
+    }
+
+    static String getkey(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+        return groupIp.toString() + deviceId.toString() + portNum.toString();
+    }
+
+    public String getkey() {
+        return GroupMember.getkey(groupIp, deviceId, portNumber);
+    }
+
+    public String getId() {
+        return getkey();
+    }
+
+    public VlanId getvlan() {
+        return vlan;
+    }
+
+    public DeviceId getDeviceId() {
+        return deviceId;
+    }
+
+    public PortNumber getPortNumber() {
+        return portNumber;
+    }
+
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
+
+    public byte getRecordType() {
+        return recordType;
+    }
+
+    public boolean getv2() {
+        return v2;
+    }
+
+    public ArrayList<Ip4Address> getSourceList() {
+        return sourceList;
+    }
+
+
+    public void updateList(byte recordType, ArrayList<Ip4Address> newSourceList) {
+        this.recordType = recordType;
+        this.sourceList.clear();
+        this.sourceList.addAll(newSourceList);
+
+        /*TODO : support SSM
+        if (this.recordType == IGMPMembership.MODE_IS_INCLUDE) {
+            switch (recordType) {
+                case IGMPMembership.MODE_IS_INCLUDE:
+                case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
+                    //however , set to include<B> anyway
+                    this.sourceList = sourceList;
+                    this.recordType = IGMPMembership.MODE_IS_INCLUDE;
+                    break;
+                case IGMPMembership.MODE_IS_EXCLUDE:
+                case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
+                    //set to exclude<B>
+                    this.sourceList = sourceList;
+                    this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
+                    break;
+                case IGMPMembership.ALLOW_NEW_SOURCES:
+                    //set to include <A+B>
+                    join(this.sourceList, sourceList);
+                    break;
+                case IGMPMembership.BLOCK_OLD_SOURCES:
+                    //set to include <A-B>
+                    exclude(this.sourceList, sourceList);
+                    break;
+                default:
+                    break;
+            }
+        } else if (this.recordType == IGMPMembership.MODE_IS_EXCLUDE) {
+            switch (recordType) {
+                case IGMPMembership.MODE_IS_INCLUDE:
+                case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
+                    //set to include<B>
+                    this.recordType = IGMPMembership.MODE_IS_INCLUDE;
+                    this.sourceList = sourceList;
+                    break;
+                case IGMPMembership.MODE_IS_EXCLUDE:
+                case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
+                    this.sourceList = sourceList;
+                    this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
+                    break;
+                case IGMPMembership.ALLOW_NEW_SOURCES:
+                    //set to exclude <A-B>
+                    exclude(this.sourceList, sourceList);
+                    break;
+                case IGMPMembership.BLOCK_OLD_SOURCES:
+                    //set to exclude <A+B>
+                    join(this.sourceList, sourceList);
+                    break;
+                default:
+                    break;
+            }
+        }*/
+
+        return;
+    }
+
+
+    /*join B to A (A+B)*/
+    private void join(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+        Iterator<Integer> iterA = null;
+        Iterator<Integer> iterB = listB.iterator();
+        boolean exists;
+        while (iterB.hasNext()) {
+            iterA = listA.iterator();
+            exists = false;
+            int ipToAdd = iterB.next();
+            while (iterA.hasNext()) {
+                if (iterA.next().equals(ipToAdd)) {
+                    exists = true;
+                    break;
+                }
+            }
+            if (!exists) {
+                listA.add(ipToAdd);
+            }
+        }
+    }
+
+    /* include A and B (A*B)*/
+    private void intersection(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+        Iterator<Integer> iterA = listA.iterator();
+        Iterator<Integer> iterB;
+        boolean exists;
+
+        while (iterA.hasNext()) {
+            iterB = listB.iterator();
+            int ipToInclude = iterA.next();
+            exists = false;
+            while (iterB.hasNext()) {
+                if (iterB.next().equals(ipToInclude)) {
+                    exists = true;
+                    break;
+                }
+            }
+            if (!exists) {
+                iterA.remove();
+            }
+        }
+    }
+
+    /*exclude B from A (A-B)*/
+    private void exclude(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+        Iterator<Integer> iterA = null;
+        Iterator<Integer> iterB = listB.iterator();
+
+        while (iterB.hasNext()) {
+            iterA = listA.iterator();
+            int ipToDel = iterB.next();
+            while (iterA.hasNext()) {
+                if (iterA.next().equals(ipToDel)) {
+                    iterA.remove();
+                    break;
+                }
+            }
+        }
+    }
+
+    public void setLeave(boolean l) {
+        leave = l;
+    }
+
+    public boolean isLeave() {
+        return leave;
+    }
+
+    public int getKeepAliveQueryInterval() {
+        return keepAliveQueryInterval;
+    }
+
+    public int getKeepAliveQueryCount() {
+        return keepAliveQueryCount;
+    }
+
+    public int getLastQueryInterval() {
+        return lastQueryInterval;
+    }
+
+    public int getLastQueryCount() {
+        return lastQueryCount;
+    }
+
+    public void keepAliveQueryCount(boolean add) {
+        if (add) {
+            keepAliveQueryCount++;
+        } else {
+            keepAliveQueryCount = 0;
+        }
+    }
+
+    public void lastQueryCount(boolean add) {
+        if (add) {
+            lastQueryCount++;
+        } else {
+            lastQueryCount = 0;
+        }
+    }
+
+    public void keepAliveInterval(boolean add) {
+        if (add) {
+            keepAliveQueryInterval++;
+        } else {
+            keepAliveQueryInterval = 0;
+        }
+    }
+
+    public void lastQueryInterval(boolean add) {
+        if (add) {
+            lastQueryInterval++;
+        } else {
+            lastQueryInterval = 0;
+        }
+    }
+
+    public void resetAllTimers() {
+        keepAliveQueryInterval = 0;
+        keepAliveQueryCount = 0;
+        lastQueryInterval = 0;
+        lastQueryCount = 0;
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
new file mode 100644
index 0000000..11e28e5
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -0,0 +1,1010 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.net.Device;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IGMP;
+import org.onlab.packet.IGMPGroup;
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.packet.IGMPMembership.MODE_IS_INCLUDE;
+import static org.onlab.packet.IGMPMembership.MODE_IS_EXCLUDE;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_INCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_EXCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.ALLOW_NEW_SOURCES;
+import static org.onlab.packet.IGMPMembership.BLOCK_OLD_SOURCES;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Igmp process application, use proxy mode, support first join/ last leave , fast leave
+ * period query and keep alive, packet out igmp message to uplink port features.
+ */
+@Component(immediate = true)
+public class IgmpManager {
+
+    private static final String APP_NAME = "org.opencord.igmpproxy";
+
+    private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
+            IgmpproxyConfig.class;
+    private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
+            IgmpproxySsmTranslateConfig.class;
+    private static final Class<McastConfig> MCAST_CONFIG_CLASS =
+            McastConfig.class;
+
+    public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
+    private static ApplicationId appId;
+
+    private static int unSolicitedTimeout = 3; // unit is 1 sec
+    private static int keepAliveCount = 3;
+    private static int lastQueryInterval = 2;  //unit is 1 sec
+    private static int lastQueryCount = 2;
+    private static boolean fastLeave = true;
+    private static boolean withRAUplink = true;
+    private static boolean withRADownlink = false;
+    private static boolean periodicQuery = true;
+    private static short mvlan = 4000;
+    private static byte igmpCos = 7;
+    public static boolean connectPointMode = true;
+    public static ConnectPoint connectPoint = null;
+    private static ConnectPoint sourceDeviceAndPort = null;
+    private static boolean enableIgmpProvisioning = false;
+    private static boolean igmpOnPodBasis = false;
+
+    private static final Integer MAX_PRIORITY = 10000;
+    private static final String INSTALLED = "installed";
+    private static final String REMOVED = "removed";
+    private static final String INSTALLATION = "installation";
+    private static final String REMOVAL = "removal";
+    private static final String NNI_PREFIX = "nni";
+
+    private static boolean pimSSmInterworking = false;
+    private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
+    private final ScheduledExecutorService scheduledExecutorService =
+            Executors.newScheduledThreadPool(1);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowObjectiveService flowObjectiveService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry networkConfig;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MulticastRouteService multicastService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpStatisticsService igmpStatisticsManager;
+
+    private IgmpPacketProcessor processor = new IgmpPacketProcessor();
+    private Logger log = LoggerFactory.getLogger(getClass());
+    private ApplicationId coreAppId;
+    private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
+
+    private InternalNetworkConfigListener configListener =
+            new InternalNetworkConfigListener();
+    private DeviceListener deviceListener = new InternalDeviceListener();
+
+    private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
+            new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+                @Override
+                public IgmpproxyConfig createConfig() {
+                    return new IgmpproxyConfig();
+                }
+            };
+    private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
+            new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+                @Override
+                public IgmpproxySsmTranslateConfig createConfig() {
+                    return new IgmpproxySsmTranslateConfig();
+                }
+            };
+
+    private int maxResp = 10; //unit is 1 sec
+    private int keepAliveInterval = 120; //unit is 1 sec
+
+    private ExecutorService eventExecutor;
+
+    public static int getUnsolicitedTimeout() {
+        return unSolicitedTimeout;
+    }
+
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+
+    private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE,  MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
+                              CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(APP_NAME);
+        coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
+        packetService.addProcessor(processor, PacketProcessor.director(4));
+        IgmpSender.init(packetService, mastershipService, igmpStatisticsManager);
+
+        networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
+        networkConfig.registerConfigFactory(igmpproxyConfigFactory);
+        networkConfig.addListener(configListener);
+
+        configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
+        configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
+
+        subsService = sadisService.getSubscriberInfoService();
+        if (connectPointMode) {
+            provisionConnectPointFlows();
+        } else {
+            provisionUplinkFlows();
+        }
+
+        McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
+        if (config != null) {
+            mvlan = config.egressVlan().toShort();
+            IgmpSender.getInstance().setMvlan(mvlan);
+        }
+        deviceService.addListener(deviceListener);
+        scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+                                                                        "events-igmp-%d", log));
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        scheduledExecutorService.shutdown();
+        eventExecutor.shutdown();
+
+        // de-register and null our handler
+        networkConfig.removeListener(configListener);
+        networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
+        networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
+        deviceService.removeListener(deviceListener);
+        packetService.removeProcessor(processor);
+        flowRuleService.removeFlowRulesById(appId);
+        log.info("Stopped");
+    }
+
+    protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
+        try {
+            String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
+                    .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
+            return Ip4Address.valueOf(mgmtAddress[0]);
+        } catch (Exception ex) {
+            log.info("No valid Ipaddress for " + ofDeviceId.toString());
+            return null;
+        }
+    }
+
+    private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
+
+        DeviceId deviceId = cp.deviceId();
+        Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
+        maxResp = calculateMaxResp(maxResp);
+        if (gAddr != null && !gAddr.isZero()) {
+            StateMachine.specialQuery(deviceId, gAddr, maxResp);
+            igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
+        } else {
+            StateMachine.generalQuery(deviceId, maxResp);
+            igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+        }
+    }
+
+    private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
+
+        Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
+        final int maxResponseTime = calculateMaxResp(maxResp);
+        //The query is received on the ConnectPoint
+        // send query accordingly to the registered OLT devices.
+        if (gAddr != null && !gAddr.isZero()) {
+            deviceService.getAvailableDevices().forEach(device -> {
+                Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+                if (accessDevice.isPresent()) {
+                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
+                }
+            });
+            igmpStatisticsManager.getIgmpStats().increaseCurrentGrpNumCounter();
+        } else {
+            //Don't know which group is targeted by the query
+            //So query all the members(in all the OLTs) and proxy their reports
+            StateMachine.generalQuery(maxResponseTime);
+            igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+        }
+    }
+
+
+    private int calculateMaxResp(int maxResp) {
+        if (maxResp >= 128) {
+            int mant = maxResp & 0xf;
+            int exp = (maxResp >> 4) & 0x7;
+            maxResp = (mant | 0x10) << (exp + 3);
+        }
+
+        return (maxResp + 5) / 10;
+    }
+
+    private Ip4Address ssmTranslateRoute(IpAddress group) {
+        return ssmTranslateTable.get(group);
+    }
+
+    private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
+        DeviceId deviceId = cp.deviceId();
+        PortNumber portNumber = cp.port();
+
+        Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
+        if (!groupIp.isMulticast()) {
+            log.info(groupIp.toString() + " is not a valid group address");
+            igmpStatisticsManager.getIgmpStats().increaseFailJoinReqUnknownMulticastIpCounter();
+            return;
+        }
+        Ip4Address srcIp = getDeviceIp(deviceId);
+
+        byte recordType = igmpGroup.getRecordType();
+        boolean join = false;
+
+        ArrayList<Ip4Address> sourceList = new ArrayList<>();
+
+        if (!validMembershipModes.contains(recordType)) {
+            igmpStatisticsManager.getIgmpStats().increaseReportsRxWithWrongModeCounter();
+        }
+        if (igmpGroup.getSources().size() > 0) {
+            igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
+            if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
+                    recordType == IGMPMembership.MODE_IS_EXCLUDE ||
+                    recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
+                join = false;
+            } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
+                    recordType == IGMPMembership.MODE_IS_INCLUDE ||
+                    recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
+                join = true;
+            }
+        } else {
+            IpAddress src = null;
+            if (pimSSmInterworking) {
+                src = ssmTranslateRoute(groupIp);
+                if (src == null) {
+                    log.info("no ssm translate for group " + groupIp.toString());
+                    return;
+                }
+            } else {
+                src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
+            }
+            sourceList.add(src.getIp4Address());
+            if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
+                    recordType == IGMPMembership.MODE_IS_EXCLUDE ||
+                    recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
+                join = true;
+            } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
+                    recordType == IGMPMembership.MODE_IS_INCLUDE ||
+                    recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
+                join = false;
+            }
+        }
+        String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
+        GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+
+        if (join) {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
+            if (groupMember == null) {
+                Optional<ConnectPoint> sourceConfigured = getSource();
+                if (!sourceConfigured.isPresent()) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                    log.warn("Unable to process IGMP Join from {} since no source " +
+                                     "configuration is found.", deviceId);
+                    igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+                    return;
+                }
+
+                Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+                if (deviceUplink.isEmpty()) {
+                    log.warn("Unable to process IGMP Join since uplink port " +
+                     "of the device {} is not found.", deviceId);
+                    return;
+                }
+
+                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+                } else {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+                }
+
+                HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
+
+                boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                if (isJoined) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
+                } else {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                }
+                groupMemberMap.put(groupMemberKey, groupMember);
+                groupMember.updateList(recordType, sourceList);
+                groupMember.getSourceList().forEach(source -> {
+                    McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
+                    //add route
+                    multicastService.add(route);
+                    //add source to the route
+                    multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
+                    //add sink to the route
+                    multicastService.addSinks(route, Sets.newHashSet(cp));
+                });
+                igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+
+            }
+            groupMember.resetAllTimers();
+            groupMember.updateList(recordType, sourceList);
+            groupMember.setLeave(false);
+        } else {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
+            if (groupMember == null) {
+                log.info("receive leave but no instance, group " + groupIp.toString() +
+                        " device:" + deviceId.toString() + " port:" + portNumber.toString());
+                igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+                return;
+            } else {
+                groupMember.setLeave(true);
+                if (fastLeave) {
+                    leaveAction(groupMember);
+                } else {
+                    sendQuery(groupMember);
+                }
+            }
+        }
+    }
+
+    private void leaveAction(GroupMember groupMember) {
+        igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
+        ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
+        StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+        groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
+                new McastRoute(source, groupMember.getGroupIp(),
+                               McastRoute.Type.IGMP), Sets.newHashSet(cp)));
+        groupMemberMap.remove(groupMember.getId());
+    }
+
+    private void sendQuery(GroupMember groupMember) {
+        Ethernet ethpkt;
+        Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
+        if (groupMember.getv2()) {
+            ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
+        } else {
+            ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
+        }
+        IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
+    }
+
+    /**
+     * @return connect point of the source if configured; and empty Optional otherwise.
+     */
+    public static Optional<ConnectPoint> getSource() {
+        return sourceDeviceAndPort == null ? Optional.empty() :
+                Optional.of(sourceDeviceAndPort);
+    }
+
+    /**
+     * Packet processor responsible for forwarding packets along their paths.
+     */
+    private class IgmpPacketProcessor implements PacketProcessor {
+        @Override
+        public void process(PacketContext context) {
+
+            eventExecutor.execute(() -> {
+                try {
+                    InboundPacket pkt = context.inPacket();
+                    Ethernet ethPkt = pkt.parsed();
+                    if (ethPkt == null) {
+                        return;
+                    }
+                    igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
+
+                    if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+                        return;
+                    }
+
+                    IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+
+                    if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+                        return;
+                    }
+
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpValidChecksumCounter();
+                    short vlan = ethPkt.getVlanID();
+                    DeviceId deviceId = pkt.receivedFrom().deviceId();
+
+                    if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+                            !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
+                        log.error("Device not registered in netcfg :" + deviceId.toString());
+                        igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+                        return;
+                    }
+
+                    IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+
+                    Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
+                    PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+                    switch (igmp.getIgmpType()) {
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
+                            //Discard Query from OLT’s non-uplink port’s
+                            if (!pkt.receivedFrom().port().equals(upLinkPort)) {
+                                if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                                    log.info("IGMP Picked up query from connectPoint");
+                                    //OK to process packet
+                                    processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+                                                                 pkt.receivedFrom(),
+                                                                 0xff & igmp.getMaxRespField());
+                                    break;
+                                } else {
+                                    //Not OK to process packet
+                                    log.warn("IGMP Picked up query from non-uplink port {}", upLinkPort);
+                                    return;
+                                }
+                            }
+
+                            processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+                                             0xff & igmp.getMaxRespField());
+                            break;
+                        case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
+                            log.debug("IGMP version 1  message types are not currently supported.");
+                            break;
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
+                        case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
+                        case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
+
+                        default:
+                            log.warn("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
+                            igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+                            break;
+                    }
+
+                } catch (Exception ex) {
+                    log.error("igmp process error : {} ", ex);
+                }
+            });
+        }
+    }
+
+    private void processIgmpMessage(InboundPacket pkt, IGMP igmp, PortNumber upLinkPort, short vlan) {
+        //Discard join/leave from OLT’s uplink port’s
+        if (pkt.receivedFrom().port().equals(upLinkPort) ||
+                isConnectPoint(pkt.receivedFrom().deviceId(), pkt.receivedFrom().port())) {
+            log.info("IGMP Picked up join/leave from uplink/connectPoint port");
+            return;
+        }
+
+        Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+        while (itr.hasNext()) {
+            IGMPGroup group = itr.next();
+            if (group instanceof IGMPMembership) {
+                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            } else {
+                IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+                mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+                                             IGMPMembership.MODE_IS_EXCLUDE :
+                                             IGMPMembership.MODE_IS_INCLUDE);
+                processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            }
+        }
+
+    }
+
+    private class IgmpProxyTimerTask extends TimerTask {
+        public void run() {
+            try {
+                IgmpTimer.timeOut1s();
+                queryMembers();
+            } catch (Exception ex) {
+                log.warn("Igmp timer task error : {}", ex.getMessage());
+            }
+        }
+
+        private void queryMembers() {
+            GroupMember groupMember;
+            Set groupMemberSet = groupMemberMap.entrySet();
+            Iterator itr = groupMemberSet.iterator();
+            while (itr.hasNext()) {
+                Map.Entry entry = (Map.Entry) itr.next();
+                groupMember = (GroupMember) entry.getValue();
+                DeviceId did = groupMember.getDeviceId();
+                if (mastershipService.isLocalMaster(did)) {
+                    if (groupMember.isLeave()) {
+                        lastQuery(groupMember);
+                    } else if (periodicQuery) {
+                        periodicQuery(groupMember);
+                    }
+                }
+            }
+        }
+
+        private void lastQuery(GroupMember groupMember) {
+            if (groupMember.getLastQueryInterval() < lastQueryInterval) {
+                groupMember.lastQueryInterval(true); // count times
+            } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
+                sendQuery(groupMember);
+                groupMember.lastQueryInterval(false); // reset count number
+                groupMember.lastQueryCount(true); //count times
+            } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
+                leaveAction(groupMember);
+            }
+        }
+
+        private void periodicQuery(GroupMember groupMember) {
+            if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
+                groupMember.keepAliveInterval(true);
+            } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
+                sendQuery(groupMember);
+                groupMember.keepAliveInterval(false);
+                groupMember.keepAliveQueryCount(true);
+            } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
+                leaveAction(groupMember);
+            }
+        }
+
+    }
+
+    public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+        Device device = deviceService.getDevice(devId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+        if (olt.isEmpty()) {
+            return Optional.empty();
+        }
+        PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+        return validateUpLinkPort(device.id(), portNumber) ?
+                    Optional.of(portNumber) : Optional.empty();
+    }
+
+    /**
+     *
+     * @param deviceId device id
+     * @param portNumber port number
+     * @return true if the port name starts with NNI_PREFIX; false otherwise.
+     */
+    public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+        Port port = deviceService.getPort(deviceId, portNumber);
+        if (port == null) {
+            //port is not discovered by ONOS; so cannot validate it.
+            return false;
+        }
+        boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+                port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
+        if (!isValid) {
+            log.warn("Port cannot be validated; it is not configured as an NNI port." +
+                    "Device/port: {}/{}", deviceId, portNumber);
+        }
+        return isValid;
+    }
+
+    public static boolean isIgmpOnPodBasis() {
+        return igmpOnPodBasis;
+    }
+
+    private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
+        if (!enableIgmpProvisioning) {
+            log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
+            return;
+        }
+        //TODO migrate to packet requests when packet service uses filtering objectives
+        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+        builder = remove ? builder.deny() : builder.permit();
+
+        FilteringObjective igmp = builder
+                .withKey(Criteria.matchInPort(port))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Igmp filter for {} on {} {}.",
+                                 devId, port, (remove) ? REMOVED : INSTALLED);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.info("Igmp filter {} for device {} on port {} failed because of {}",
+                                 (remove) ? INSTALLATION : REMOVAL, devId, port,
+                                 error);
+                    }
+                });
+
+        flowObjectiveService.filter(devId, igmp);
+
+    }
+
+    private boolean isConnectPoint(DeviceId device, PortNumber port) {
+        if (connectPoint != null) {
+            return (connectPointMode && connectPoint.deviceId().equals(device)
+                    && connectPoint.port().equals(port));
+        } else {
+            log.info("connectPoint not configured for device {}", device);
+            return false;
+        }
+    }
+
+    private boolean isUplink(DeviceId device, PortNumber port) {
+        if (connectPointMode) {
+            return false;
+        }
+        Optional<PortNumber> upLinkPort = getDeviceUplink(device);
+        return upLinkPort.isPresent() && upLinkPort.get().equals(port);
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param serialNumber serial number of a device
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+        long start = System.currentTimeMillis();
+        try {
+            return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+        } finally {
+            if (log.isDebugEnabled()) {
+                // SADIS can call remote systems to fetch device data and this calls can take a long time.
+                // This measurement is just for monitoring these kinds of situations.
+                log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+            }
+
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param deviceId device id
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        return getSubscriberAndDeviceInformation(device.serialNumber());
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceId devId = event.subject().id();
+            Port p = event.port();
+            if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
+                    !(p != null && isConnectPoint(devId, p.number()))) {
+                return;
+            }
+            PortNumber port;
+
+            switch (event.type()) {
+
+                case DEVICE_ADDED:
+                case DEVICE_UPDATED:
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                case PORT_STATS_UPDATED:
+                    break;
+                case PORT_ADDED:
+                    port = p.number();
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                        processFilterObjective(devId, port, false);
+                    } else if (isUplink(devId, port)) {
+                        provisionUplinkFlows();
+                    } else if (isConnectPoint(devId, port)) {
+                        provisionConnectPointFlows();
+                    }
+                    break;
+                case PORT_UPDATED:
+                    port = p.number();
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                        if (event.port().isEnabled()) {
+                            processFilterObjective(devId, port, false);
+                        } else {
+                            processFilterObjective(devId, port, true);
+                        }
+                    } else if (isUplink(devId, port)) {
+                        if (event.port().isEnabled()) {
+                            provisionUplinkFlows(devId);
+                        } else {
+                            processFilterObjective(devId, port, true);
+                        }
+                    } else if (isConnectPoint(devId, port)) {
+                        if (event.port().isEnabled()) {
+                            provisionConnectPointFlows();
+                        } else {
+                            unprovisionConnectPointFlows();
+                        }
+                    }
+                    break;
+                case PORT_REMOVED:
+                    port = p.number();
+                    processFilterObjective(devId, port, true);
+                    break;
+                default:
+                    log.info("Unknown device event {}", event.type());
+                    break;
+            }
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return true;
+        }
+    }
+
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+
+        private void reconfigureNetwork(IgmpproxyConfig cfg) {
+            IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
+
+            unSolicitedTimeout = newCfg.unsolicitedTimeOut();
+            maxResp = newCfg.maxResp();
+            keepAliveInterval = newCfg.keepAliveInterval();
+            keepAliveCount = newCfg.keepAliveCount();
+            lastQueryInterval = newCfg.lastQueryInterval();
+            lastQueryCount = newCfg.lastQueryCount();
+            withRAUplink = newCfg.withRAUplink();
+            withRADownlink = newCfg.withRADownlink();
+            igmpCos = newCfg.igmpCos();
+            periodicQuery = newCfg.periodicQuery();
+            fastLeave = newCfg.fastLeave();
+            pimSSmInterworking = newCfg.pimSsmInterworking();
+            enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
+            igmpOnPodBasis = newCfg.igmpOnPodBasis();
+
+            if (connectPointMode != newCfg.connectPointMode() ||
+                    connectPoint != newCfg.connectPoint()) {
+                connectPointMode = newCfg.connectPointMode();
+                connectPoint = newCfg.connectPoint();
+                if (connectPointMode) {
+                    unprovisionUplinkFlows();
+                    provisionConnectPointFlows();
+                } else {
+                    unprovisionConnectPointFlows();
+                    provisionUplinkFlows();
+                }
+            }
+            if (connectPoint != null) {
+                log.info("connect point : {}", connectPoint);
+            }
+            log.info("mode: {}", connectPointMode);
+
+            getSourceConnectPoint(newCfg);
+
+            IgmpSender.getInstance().setIgmpCos(igmpCos);
+            IgmpSender.getInstance().setMaxResp(maxResp);
+            IgmpSender.getInstance().setMvlan(mvlan);
+            IgmpSender.getInstance().setWithRADownlink(withRADownlink);
+            IgmpSender.getInstance().setWithRAUplink(withRAUplink);
+        }
+
+        void getSourceConnectPoint(IgmpproxyConfig cfg) {
+            sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
+            if (sourceDeviceAndPort != null) {
+                log.debug("source parameter configured to {}", sourceDeviceAndPort);
+            }
+        }
+
+        public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
+            if (cfg == null) {
+                return;
+            }
+            Collection<McastRoute> translations = cfg.getSsmTranslations();
+            for (McastRoute route : translations) {
+                ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
+            }
+        }
+
+        @Override
+        public void event(NetworkConfigEvent event) {
+            switch (event.type()) {
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                    // NOTE how to know if something has changed in sadis?
+
+                    if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
+                        IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
+                        if (config != null) {
+                            log.info("igmpproxy config received. {}", config);
+                            reconfigureNetwork(config);
+                        }
+                    }
+
+                    if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
+                        IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
+                        if (config != null) {
+                            reconfigureSsmTable(config);
+                        }
+                    }
+
+                    if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
+                        McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
+                        if (config != null && mvlan != config.egressVlan().toShort()) {
+                            mvlan = config.egressVlan().toShort();
+                            IgmpSender.getInstance().setMvlan(mvlan);
+                            groupMemberMap.values().forEach(m -> leaveAction(m));
+                        }
+                    }
+
+                    log.info("Reconfigured");
+                    break;
+                case CONFIG_REGISTERED:
+                case CONFIG_UNREGISTERED:
+                    break;
+                case CONFIG_REMOVED:
+                    // NOTE how to know if something has changed in sadis?
+                default:
+                    break;
+            }
+        }
+    }
+
+    private void provisionUplinkFlows(DeviceId deviceId) {
+        if (connectPointMode) {
+            return;
+        }
+
+        Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+        if (upLink.isPresent()) {
+            processFilterObjective(deviceId, upLink.get(), false);
+        }
+    }
+
+    private void provisionUplinkFlows() {
+        if (connectPointMode) {
+            return;
+        }
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevice.isPresent()) {
+                provisionUplinkFlows(device.id());
+            }
+        });
+    }
+
+    private void unprovisionUplinkFlows() {
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevices.isPresent()) {
+                Optional<PortNumber> upLink = getDeviceUplink(device.id());
+                if (upLink.isPresent()) {
+                    processFilterObjective(device.id(), upLink.get(), true);
+                }
+            }
+        });
+    }
+
+    private void provisionConnectPointFlows() {
+        if ((!connectPointMode) || connectPoint == null) {
+            return;
+        }
+
+        processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
+    }
+    private void unprovisionConnectPointFlows() {
+        if (connectPoint == null) {
+            return;
+        }
+        processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
new file mode 100644
index 0000000..a4cbaa5
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IGMP;
+import org.onlab.packet.IGMP.IGMPv2;
+import org.onlab.packet.IGMP.IGMPv3;
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ *  Message encode and send interface for igmpproxy.
+ */
+public final class IgmpSender {
+    static final String V3_REPORT_ADDRESS = "224.0.0.22";
+    static final String MAC_ADDRESS = "DE:AD:BE:EF:BA:11";
+    static final short DEFAULT_MVLAN = 4000;
+    static final byte DEFAULT_COS = 7;
+    static final int DEFAULT_MEX_RESP = 10;
+    static final byte[] RA_BYTES = {(byte) 0x94, (byte) 0x04, (byte) 0x00, (byte) 0x00};
+
+    private static IgmpSender instance = null;
+    private PacketService packetService;
+    private MastershipService mastershipService;
+    private IgmpStatisticsService igmpStatisticsService;
+    private boolean withRAUplink = true;
+    private boolean withRADownlink = false;
+    private short mvlan = DEFAULT_MVLAN;
+    private byte igmpCos = DEFAULT_COS;
+    private int maxResp = DEFAULT_MEX_RESP;
+    private Logger log = LoggerFactory.getLogger(getClass());
+
+    private IgmpSender(PacketService packetService, MastershipService mastershipService,
+             IgmpStatisticsService igmpStatisticsService) {
+        this.packetService = packetService;
+        this.mastershipService = mastershipService;
+        this.igmpStatisticsService = igmpStatisticsService;
+    }
+
+    public static void init(PacketService packetService, MastershipService mastershipService,
+            IgmpStatisticsService igmpStatisticsService) {
+        instance = new IgmpSender(packetService, mastershipService, igmpStatisticsService);
+    }
+
+    public static IgmpSender getInstance() {
+        return instance;
+    }
+
+    public void setWithRAUplink(boolean withRaUplink) {
+        this.withRAUplink = withRaUplink;
+    }
+
+    public void setWithRADownlink(boolean withRADownlink) {
+        this.withRADownlink = withRADownlink;
+    }
+
+    public void setMvlan(short mvlan) {
+        this.mvlan = mvlan;
+    }
+
+    public void setIgmpCos(byte igmpCos) {
+        this.igmpCos = igmpCos;
+    }
+
+    public void setMaxResp(int maxResp) {
+        this.maxResp = maxResp;
+    }
+
+    public Ethernet buildIgmpV3Join(Ip4Address groupIp, Ip4Address sourceIp) {
+        IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+        igmpMembership.setRecordType(IGMPMembership.CHANGE_TO_EXCLUDE_MODE);
+
+        return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+    }
+
+    public Ethernet buildIgmpV3ResponseQuery(Ip4Address groupIp, Ip4Address sourceIp) {
+        IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+        igmpMembership.setRecordType(IGMPMembership.MODE_IS_EXCLUDE);
+
+        return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+    }
+
+    public Ethernet buildIgmpV3Leave(Ip4Address groupIp, Ip4Address sourceIp) {
+        IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+        igmpMembership.setRecordType(IGMPMembership.CHANGE_TO_INCLUDE_MODE);
+
+        return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+    }
+
+    public Ethernet buildIgmpV2Query(Ip4Address groupIp, Ip4Address sourceIp) {
+        return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY, groupIp, null, sourceIp, true);
+    }
+
+    public Ethernet buildIgmpV3Query(Ip4Address groupIp, Ip4Address sourceIp) {
+        return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY, groupIp, null, sourceIp, false);
+    }
+
+    protected Ethernet buildIgmpPacket(byte type, Ip4Address groupIp, IGMPMembership igmpMembership,
+                                     Ip4Address sourceIp, boolean isV2Query) {
+
+        IGMP igmpPacket;
+        if (isV2Query) {
+            igmpPacket = new IGMP.IGMPv2();
+        } else {
+            igmpPacket = new IGMP.IGMPv3();
+        }
+
+        IPv4 ip4Packet = new IPv4();
+        Ethernet ethPkt = new Ethernet();
+
+        igmpPacket.setIgmpType(type);
+
+        switch (type) {
+            case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                igmpPacket.setMaxRespCode((byte) (maxResp * 10));
+                IGMPQuery igmpQuery = new IGMPQuery(groupIp, 0);
+
+                igmpPacket.addGroup(igmpQuery);
+                ip4Packet.setDestinationAddress(groupIp.toInt());
+                if (withRADownlink) {
+                    ip4Packet.setOptions(RA_BYTES);
+                }
+                break;
+
+            case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                if (igmpMembership == null) {
+                    return null;
+                }
+                igmpPacket.addGroup(igmpMembership);
+                if (type == IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT) {
+                    ip4Packet.setDestinationAddress(Ip4Address.valueOf(V3_REPORT_ADDRESS).toInt());
+                } else {
+                    ip4Packet.setDestinationAddress(groupIp.toInt());
+                }
+                if (withRAUplink) {
+                    ip4Packet.setOptions(RA_BYTES);
+                }
+                break;
+
+            case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+            case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+                return null;
+            default:
+                igmpStatisticsService.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+                return null;
+        }
+
+        igmpPacket.setParent(ip4Packet);
+        ip4Packet.setSourceAddress(sourceIp.toInt());
+        ip4Packet.setProtocol(IPv4.PROTOCOL_IGMP);
+        ip4Packet.setPayload(igmpPacket);
+        ip4Packet.setParent(ethPkt);
+        ip4Packet.setTtl((byte) 0x78);
+
+        ethPkt.setDestinationMACAddress(multiaddToMac(ip4Packet.getDestinationAddress()));
+        ethPkt.setSourceMACAddress(MAC_ADDRESS);
+        ethPkt.setEtherType(Ethernet.TYPE_IPV4);
+        ethPkt.setPayload(ip4Packet);
+        ethPkt.setVlanID(mvlan);
+        ethPkt.setPriorityCode(igmpCos);
+
+        return ethPkt;
+    }
+
+    private MacAddress multiaddToMac(int multiaddress) {
+        byte[] b = new byte[3];
+        b[0] = (byte) (multiaddress & 0xff);
+        b[1] = (byte) (multiaddress >> 8 & 0xff);
+        b[2] = (byte) (multiaddress >> 16 & 0x7f);
+        byte[] macByte = {0x01, 0x00, 0x5e, b[2], b[1], b[0]};
+
+        MacAddress mac = MacAddress.valueOf(macByte);
+        return mac;
+    }
+
+    public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            return;
+        }
+
+        if (IgmpManager.connectPointMode) {
+            if (IgmpManager.connectPoint == null) {
+                log.warn("cannot find a connectPoint to send the packet uplink");
+                return;
+            }
+            sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
+        } else {
+            sendIgmpPacket(ethPkt, deviceId, upLinkPort);
+        }
+    }
+
+    public void sendIgmpPacket(Ethernet ethPkt, DeviceId deviceId, PortNumber portNumber) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            return;
+        }
+
+        IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+        IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+        // We are checking the length of packets. Right now the counter value will be 0 because of internal translation
+        // As packet length will always be valid
+        // This counter will be useful in future if we change the procedure to generate the packets.
+        if ((igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT
+             || igmp.getIgmpType() == IGMP.TYPE_IGMPV2_LEAVE_GROUP) && igmp.serialize().length < IGMPv2.HEADER_LENGTH) {
+                 igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+        } else if (igmp.getIgmpType() == IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT
+            && igmp.serialize().length < IGMPv3.MINIMUM_HEADER_LEN) {
+                 igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+        }
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(portNumber).build();
+        OutboundPacket packet = new DefaultOutboundPacket(deviceId,
+                treatment, ByteBuffer.wrap(ethPkt.serialize()));
+        igmpStatisticsService.getIgmpStats().increaseValidIgmpPacketCounter();
+        packetService.emit(packet);
+
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
new file mode 100644
index 0000000..21d0011
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.opencord.igmpproxy.IgmpStatistics;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.IgmpStatisticsEvent;
+import org.opencord.igmpproxy.IgmpStatisticsEventListener;
+
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Dictionary;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+
+
+
+/**
+ *
+ * Process the stats collected in Igmp proxy application. Publish to kafka onos.
+ *
+ */
+@Component(immediate = true, property = {
+        STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+})
+public class IgmpStatisticsManager extends
+                 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+                         implements IgmpStatisticsService {
+    private final Logger log = getLogger(getClass());
+    private IgmpStatistics igmpStats;
+
+    ScheduledExecutorService executorForIgmp;
+    private ScheduledFuture<?> publisherTask;
+
+    protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService cfgService;
+
+    @Override
+    public IgmpStatistics getIgmpStats() {
+        return igmpStats;
+    }
+
+    @Activate
+    public void activate(ComponentContext context) {
+        igmpStats = new IgmpStatistics();
+
+        eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
+        executorForIgmp = Executors.newScheduledThreadPool(1);
+        cfgService.registerProperties(getClass());
+        modified(context);
+        log.info("IgmpStatisticsManager Activated");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+
+        try {
+            String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
+            statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+                Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+                    : Integer.parseInt(s.trim());
+        } catch (NumberFormatException ne) {
+            log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+            statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+        }
+        if (publisherTask != null) {
+            publisherTask.cancel(true);
+        }
+        publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
+                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        eventDispatcher.removeSink(IgmpStatisticsEvent.class);
+        publisherTask.cancel(true);
+        executorForIgmp.shutdown();
+        cfgService.unregisterProperties(getClass(), false);
+        igmpStats = null;
+        log.info("IgmpStatisticsManager Deactivated");
+    }
+
+    /**
+     * Publishes stats.
+     */
+    private void publishStats() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Notifying stats: {}", igmpStats);
+            log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
+            log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
+            log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
+            log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
+            log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
+            log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
+            log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
+            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
+            log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
+            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
+            log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
+            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
+            log.debug("--TotalMsgReceived--  " + igmpStats.getTotalMsgReceived());
+            log.debug("--UnknownIgmpTypePacketsRx--" + igmpStats.getUnknownIgmpTypePacketsRxCounter());
+            log.debug("--ReportsRxWithWrongMode--" + igmpStats.getReportsRxWithWrongModeCounter());
+            log.debug("--FailJoinReqInsuffPermission--" + igmpStats.getFailJoinReqInsuffPermissionAccessCounter());
+            log.debug("--FailJoinReqUnknownMulticastIp--" + igmpStats.getFailJoinReqUnknownMulticastIpCounter());
+            log.debug("--UnconfiguredGroupCounter--" + igmpStats.getUnconfiguredGroupCounter());
+            log.debug("--ValidIgmpPacketCounter--" + igmpStats.getValidIgmpPacketCounter());
+            log.debug("--IgmpChannelJoinCounter--" + igmpStats.getIgmpChannelJoinCounter());
+            log.debug("--CurrentGrpNumCounter--" + igmpStats.getCurrentGrpNumCounter());
+            log.debug("--IgmpValidChecksumCounter--" + igmpStats.getIgmpValidChecksumCounter());
+            log.debug("--InvalidIgmpLength--" + igmpStats.getInvalidIgmpLength());
+            log.debug("--IgmpGeneralMembershipQuery--" + igmpStats.getIgmpGeneralMembershipQuery());
+            log.debug("--IgmpGrpSpecificMembershipQuery--" + igmpStats.getIgmpGrpSpecificMembershipQuery());
+            log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" + igmpStats.getIgmpGrpAndSrcSpecificMembershipQuery());
+        }
+
+        post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
new file mode 100644
index 0000000..c4a17c9
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpTimer.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implement the timer for igmp state machine.
+ */
+public final class IgmpTimer {
+
+    public static final int INVALID_TIMER_ID = 0;
+    public static int timerId = INVALID_TIMER_ID + 1;
+    private static Map<Integer, SingleTimer> igmpTimerMap = Maps.newConcurrentMap();
+
+    private IgmpTimer(){
+
+    }
+    private static int getId() {
+        return timerId++;
+    }
+
+    public static int start(SingleStateMachine machine, int timeOut) {
+        int id = getId();
+        igmpTimerMap.put(id, new SingleTimer(machine, timeOut));
+        return id;
+    }
+
+    public static int reset(int oldId, SingleStateMachine machine, int timeOut) {
+        igmpTimerMap.remove(new Integer(oldId));
+        int id = getId();
+        igmpTimerMap.put(new Integer(id), new SingleTimer(machine, timeOut));
+        return id;
+    }
+
+    public static void cancel(int id) {
+        igmpTimerMap.remove(new Integer(id));
+    }
+
+
+    static void timeOut1s() {
+        Set mapSet = igmpTimerMap.entrySet();
+        Iterator itr = mapSet.iterator();
+        while (itr.hasNext()) {
+            Map.Entry entry = (Map.Entry) itr.next();
+            SingleTimer single = (SingleTimer) entry.getValue();
+            if (single.timeOut > 0) {
+                single.timeOut--;
+            } else {
+                single.machine.timeOut();
+                itr.remove();
+            }
+        }
+    }
+
+    static class SingleTimer {
+
+        public int timeOut;  // unit is 1 second
+        public SingleStateMachine machine;
+
+        public SingleTimer(SingleStateMachine machine, int timeOut) {
+            this.machine = machine;
+            this.timeOut = timeOut;
+        }
+
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
new file mode 100644
index 0000000..e8297c1
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.basics.BasicElementConfig;
+
+/**
+ * Net configuration class for igmpproxy.
+ */
+public class IgmpproxyConfig extends Config<ApplicationId> {
+    protected static final String DEFAULT_UNSOLICITED_TIMEOUT = "2";
+    protected static final String DEFAULT_MAX_RESP = "10";
+    protected static final String DEFAULT_KEEP_ALIVE_INTERVAL = "120";
+    protected static final String DEFAULT_KEEP_ALIVE_COUNT = "3";
+    protected static final String DEFAULT_LAST_QUERY_INTERVAL = "2";
+    protected static final String DEFAULT_LAST_QUERY_COUNT = "2";
+    protected static final String DEFAULT_IGMP_COS = "7";
+    protected static final Boolean DEFAULT_FAST_LEAVE = false;
+    protected static final Boolean DEFAULT_PERIODIC_QUERY = true;
+    protected static final String DEFAULT_WITH_RA_UPLINK = "true";
+    protected static final String DEFAULT_WITH_RA_DOWNLINK = "true";
+    private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
+    private static final Boolean DEFAULT_PIMSSM_INTERWORKING = false;
+    private static final Boolean DEFAULT_IGMP_PROVISIONING_SUPPORT = Boolean.FALSE;
+    private static final Boolean DEFAULT_IGMP_ON_POD_BASIS = Boolean.FALSE;
+
+    protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
+    protected static final String CONNECT_POINT = "globalConnectPoint";
+    private static final String UNSOLICITED_TIMEOUT = "UnsolicitedTimeOut";
+    private static final String MAX_RESP = "MaxResp";
+    private static final String KEEP_ALIVE_INTERVAL = "KeepAliveInterval";
+    private static final String KEEP_ALIVE_COUNT = "KeepAliveCount";
+    private static final String LAST_QUERY_INTERVAL = "LastQueryInterval";
+    private static final String LAST_QUERY_COUNT = "LastQueryCount";
+    private static final String FAST_LEAVE = "FastLeave";
+    private static final String PERIODIC_QUERY = "PeriodicQuery";
+    private static final String IGMP_COS = "IgmpCos";
+    private static final String WITH_RA_UPLINK = "withRAUpLink";
+    private static final String WITH_RA_DOWN_LINK = "withRADownLink";
+    private static final String PIMSSM_INTERWORKING = "pimSSmInterworking";
+    private static final String SOURCE_DEV_PORT = "sourceDeviceAndPort";
+    private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
+    private static final String IGMP_ON_POD_BASIS = "igmpOnPodBasis";
+
+    /**
+     * Gets the value of a string property, protecting for an empty
+     * JSON object.
+     *
+     * @param name name of the property
+     * @param defaultValue default value if none has been specified
+     * @return String value if one os found, default value otherwise
+     */
+    private String getStringProperty(String name, String defaultValue) {
+        if (object == null) {
+            return defaultValue;
+        }
+        return get(name, defaultValue);
+    }
+
+    public int unsolicitedTimeOut() {
+        return Integer.parseInt(getStringProperty(UNSOLICITED_TIMEOUT, DEFAULT_UNSOLICITED_TIMEOUT));
+    }
+
+    public BasicElementConfig unsolicitedTimeOut(int timeout) {
+        return (BasicElementConfig) setOrClear(UNSOLICITED_TIMEOUT, timeout);
+    }
+
+    public int maxResp() {
+        return Integer.parseInt(getStringProperty(MAX_RESP, DEFAULT_MAX_RESP));
+    }
+
+    public BasicElementConfig maxResp(int maxResp) {
+        return (BasicElementConfig) setOrClear(MAX_RESP, maxResp);
+    }
+
+    public int keepAliveInterval() {
+        return Integer.parseInt(getStringProperty(KEEP_ALIVE_INTERVAL, DEFAULT_KEEP_ALIVE_INTERVAL));
+    }
+
+    public BasicElementConfig keepAliveInterval(int interval) {
+        return (BasicElementConfig) setOrClear(KEEP_ALIVE_INTERVAL, interval);
+    }
+
+    public int keepAliveCount() {
+        return Integer.parseInt(getStringProperty(KEEP_ALIVE_COUNT, DEFAULT_KEEP_ALIVE_COUNT));
+    }
+
+    public BasicElementConfig keepAliveCount(int count) {
+        return (BasicElementConfig) setOrClear(KEEP_ALIVE_COUNT, count);
+    }
+
+    public int lastQueryInterval() {
+        return Integer.parseInt(getStringProperty(LAST_QUERY_INTERVAL, DEFAULT_LAST_QUERY_INTERVAL));
+    }
+
+    public BasicElementConfig lastQueryInterval(int interval) {
+        return (BasicElementConfig) setOrClear(LAST_QUERY_INTERVAL, interval);
+    }
+
+    public int lastQueryCount() {
+        return Integer.parseInt(getStringProperty(LAST_QUERY_COUNT, DEFAULT_LAST_QUERY_COUNT));
+    }
+
+    public BasicElementConfig lastQueryCount(int count) {
+        return (BasicElementConfig) setOrClear(LAST_QUERY_COUNT, count);
+    }
+
+    public boolean fastLeave() {
+        if (object == null || object.path(FAST_LEAVE) == null) {
+            return DEFAULT_FAST_LEAVE;
+        }
+        return Boolean.parseBoolean(getStringProperty(FAST_LEAVE, DEFAULT_FAST_LEAVE.toString()));
+    }
+
+    public BasicElementConfig fastLeave(boolean fastLeave) {
+        return (BasicElementConfig) setOrClear(FAST_LEAVE, fastLeave);
+    }
+
+    public boolean periodicQuery() {
+        if (object == null || object.path(PERIODIC_QUERY) == null) {
+            return DEFAULT_PERIODIC_QUERY;
+        }
+        return Boolean.parseBoolean(getStringProperty(PERIODIC_QUERY, DEFAULT_PERIODIC_QUERY.toString()));
+    }
+
+    public BasicElementConfig periodicQuery(boolean periodicQuery) {
+        return (BasicElementConfig) setOrClear(PERIODIC_QUERY, periodicQuery);
+    }
+
+    public byte igmpCos() {
+        return Byte.parseByte(getStringProperty(IGMP_COS, DEFAULT_IGMP_COS));
+    }
+
+    public boolean withRAUplink() {
+        if (object == null || object.path(WITH_RA_UPLINK) == null) {
+            return true;
+        }
+        return Boolean.parseBoolean(getStringProperty(WITH_RA_UPLINK, DEFAULT_WITH_RA_UPLINK));
+    }
+
+    public boolean withRADownlink() {
+        if (object == null || object.path(WITH_RA_DOWN_LINK) == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(getStringProperty(WITH_RA_DOWN_LINK, DEFAULT_WITH_RA_DOWNLINK));
+    }
+
+    public boolean connectPointMode() {
+        if (object == null || object.path(CONNECT_POINT_MODE) == null) {
+            return DEFAULT_CONNECT_POINT_MODE;
+        }
+        return Boolean.parseBoolean(getStringProperty(CONNECT_POINT_MODE, DEFAULT_CONNECT_POINT_MODE.toString()));
+    }
+
+    public ConnectPoint connectPoint() {
+        if (object == null || object.path(CONNECT_POINT) == null) {
+            return null;
+        }
+
+        try {
+            return ConnectPoint.deviceConnectPoint(getStringProperty(CONNECT_POINT, ""));
+        } catch (Exception ex) {
+            return null;
+        }
+    }
+
+    public boolean pimSsmInterworking() {
+        if (object == null || object.path(PIMSSM_INTERWORKING) == null) {
+            return DEFAULT_PIMSSM_INTERWORKING;
+        }
+        return Boolean.parseBoolean(getStringProperty(PIMSSM_INTERWORKING, DEFAULT_PIMSSM_INTERWORKING.toString()));
+    }
+
+    public ConnectPoint getSourceDeviceAndPort() {
+        if (object == null || object.path(SOURCE_DEV_PORT) == null) {
+            return null;
+        }
+
+        try {
+            return ConnectPoint.deviceConnectPoint(getStringProperty(SOURCE_DEV_PORT, ""));
+        } catch (Exception ex) {
+            return null;
+        }
+    }
+
+    public boolean enableIgmpProvisioning() {
+        if (object == null || object.path(ENABLE_IGMP_PROVISIONING) == null) {
+            return DEFAULT_IGMP_PROVISIONING_SUPPORT;
+        }
+        return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
+                                                      DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
+    }
+
+    public boolean igmpOnPodBasis() {
+        if (object == null || object.path(IGMP_ON_POD_BASIS) == null) {
+            return DEFAULT_IGMP_ON_POD_BASIS;
+        }
+        return Boolean.parseBoolean(getStringProperty(IGMP_ON_POD_BASIS,
+                                                      DEFAULT_IGMP_ON_POD_BASIS.toString()));
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxySsmTranslateConfig.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxySsmTranslateConfig.java
new file mode 100644
index 0000000..4c75d47
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxySsmTranslateConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.onlab.packet.IpAddress;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.net.config.Config;
+
+import java.util.List;
+
+/**
+ * IGMP proxy SSM translate configuration.
+ */
+public class IgmpproxySsmTranslateConfig extends Config<ApplicationId> {
+
+    private static final String SOURCE = "source";
+    private static final String GROUP = "group";
+
+    @Override
+    public boolean isValid() {
+        for (JsonNode node : array) {
+            if (!hasOnlyFields((ObjectNode) node, SOURCE, GROUP)) {
+                return false;
+            }
+
+            if (!(isIpAddress((ObjectNode) node, SOURCE, FieldPresence.MANDATORY) &&
+                    isIpAddress((ObjectNode) node, GROUP, FieldPresence.MANDATORY))) {
+                return false;
+            }
+
+        }
+        return true;
+    }
+
+    /**
+     * Gets the list of SSM translations.
+     *
+     * @return SSM translations
+     */
+    public List<McastRoute> getSsmTranslations() {
+        List<McastRoute> translations = Lists.newArrayList();
+        for (JsonNode node : array) {
+            translations.add(
+                    new McastRoute(
+                            IpAddress.valueOf(node.path(SOURCE).asText().trim()),
+                            IpAddress.valueOf(node.path(GROUP).asText().trim()),
+                            McastRoute.Type.STATIC));
+        }
+
+        return translations;
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..94300b5
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
+    public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
new file mode 100644
index 0000000..c4d73a8
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/SingleStateMachine.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * State machine for single IGMP group member. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+public class SingleStateMachine {
+    // Only for tests purposes
+    static boolean sendQuery = true;
+
+    static final int STATE_NON = 0;
+    static final int STATE_DELAY = 1;
+    static final int STATE_IDLE = 2;
+    static final int TRANSITION_JOIN = 0;
+    static final int TRANSITION_LEAVE = 1;
+    static final int TRANSITION_QUERY = 2;
+    static final int TRANSITION_TIMEOUT = 3;
+    static final int DEFAULT_MAX_RESP = 0xfffffff;
+    static final int DEFAULT_COUNT = 1;
+    private DeviceId devId;
+    private Ip4Address groupIp;
+    private Ip4Address srcIp;
+    private PortNumber upLinkPort;
+
+    private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
+    private int timerId = IgmpTimer.INVALID_TIMER_ID;
+    private int timeOut = DEFAULT_MAX_RESP;
+    private State[] states =
+            {
+                    new NonMember(), new DelayMember(), new IdleMember()
+            };
+    private int[] nonTransition =
+            {STATE_DELAY, STATE_NON, STATE_NON, STATE_NON};
+    private int[] delayTransition =
+            {STATE_DELAY, STATE_NON, STATE_DELAY, STATE_IDLE};
+    private int[] idleTransition =
+            {STATE_IDLE, STATE_NON, STATE_DELAY, STATE_IDLE};
+    //THE TRANSITION TABLE
+    private int[][] transition =
+            {nonTransition, delayTransition, idleTransition};
+    private int currentState = STATE_NON;
+
+    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
+        this.devId = devId;
+        this.groupIp = groupIp;
+        this.srcIp = src;
+        this.upLinkPort = upLinkPort;
+    }
+
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
+
+    public DeviceId getDeviceId() {
+        return devId;
+    }
+    public boolean increaseCounter() {
+        count.incrementAndGet();
+        return true;
+    }
+
+    public boolean decreaseCounter() {
+        if (count.get() > 0) {
+            count.decrementAndGet();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public int getCounter() {
+        return count.get();
+    }
+    public int currentState() {
+        return currentState;
+    }
+
+    private void next(int msg) {
+        currentState = transition[currentState][msg];
+    }
+
+    public void join(boolean messageOutAllowed) {
+        states[currentState].join(messageOutAllowed);
+        next(TRANSITION_JOIN);
+    }
+
+    public void leave(boolean messageOutAllowed) {
+        states[currentState].leave(messageOutAllowed);
+        next(TRANSITION_LEAVE);
+    }
+
+    public void query(int maxResp) {
+        states[currentState].query(maxResp);
+        next(TRANSITION_QUERY);
+    }
+
+    public void timeOut() {
+        states[currentState].timeOut();
+        next(TRANSITION_TIMEOUT);
+    }
+
+    int getTimeOut(int maxTimeOut) {
+        Random random = new Random();
+        return Math.abs(random.nextInt()) % maxTimeOut;
+    }
+
+    protected void cancelTimer() {
+        if (IgmpTimer.INVALID_TIMER_ID != timerId) {
+            IgmpTimer.cancel(timerId);
+        }
+    }
+
+    class State {
+        public void join(boolean messageOutAllowed) {
+        }
+
+        public void leave(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
+            }
+        }
+
+        public void query(int maxResp) {
+        }
+
+        public void timeOut() {
+        }
+
+    }
+
+    class NonMember extends State {
+        public void join(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
+                timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+                timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+            }
+        }
+    }
+
+    class DelayMember extends State {
+        public void query(int maxResp) {
+            if (maxResp < timeOut) {
+                timeOut = getTimeOut(maxResp);
+                timerId = IgmpTimer.reset(timerId, SingleStateMachine.this, timeOut);
+            }
+        }
+
+        public void timeOut() {
+            if (sendQuery) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
+                timeOut = DEFAULT_MAX_RESP;
+            }
+        }
+
+    }
+
+    class IdleMember extends State {
+        public void query(int maxResp) {
+            timeOut = getTimeOut(maxResp);
+            timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+        }
+    }
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
new file mode 100644
index 0000000..e87756d
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/StateMachine.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy.impl;
+
+import com.google.common.collect.Maps;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+public final class StateMachine {
+
+    private static final String GROUP = "Group";
+
+    private StateMachine() {
+
+    }
+    private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
+
+    private static String getId(DeviceId devId, Ip4Address groupIp) {
+        return devId.toString() + GROUP + groupIp.toString();
+    }
+
+    private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
+        String id = getId(devId, groupIp);
+        return map.get(id);
+    }
+
+    public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
+        SingleStateMachine machine = get(devId, groupIp);
+        if (null == machine) {
+            return;
+        }
+        machine.cancelTimer();
+        map.remove(getId(devId, groupIp));
+    }
+
+    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
+        SingleStateMachine machine = get(devId, groupIp);
+
+        if (null == machine) {
+            machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
+            map.put(getId(devId, groupIp), machine);
+
+            boolean shouldSendJoin = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are already active members of this group
+                // across the entire POD
+                shouldSendJoin = false;
+            }
+            machine.join(shouldSendJoin);
+            return true;
+        }
+        machine.increaseCounter();
+        return false;
+    }
+
+    public static boolean leave(DeviceId devId, Ip4Address groupIp) {
+        SingleStateMachine machine = get(devId, groupIp);
+        if (null == machine) {
+            return false;
+        }
+
+        machine.decreaseCounter();
+        // make sure machine instance still exists.
+        // it may be removed by the preceding thread
+        if (machine.getCounter() == 0) {
+            boolean shouldSendLeave = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are still active members of this group
+                // across the entire POD
+                shouldSendLeave = false;
+            }
+            machine.leave(shouldSendLeave);
+            destroySingle(devId, groupIp);
+            return true;
+        }
+        return false;
+    }
+
+    static void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+        SingleStateMachine machine = get(devId, groupIp);
+        if (null == machine) {
+            return;
+        }
+        machine.query(maxResp);
+    }
+
+    static void generalQuery(DeviceId devId, int maxResp) {
+        for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
+            SingleStateMachine machine = entry.getValue();
+            if (devId.equals(machine.getDeviceId())) {
+                machine.query(maxResp);
+            }
+        }
+    }
+
+    static void generalQuery(int maxResp) {
+        for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
+            SingleStateMachine machine = entry.getValue();
+            machine.query(maxResp);
+        }
+    }
+
+    public static Set<Map.Entry<String, SingleStateMachine>> entrySet() {
+        return map.entrySet();
+    }
+
+    public static void timeOut(DeviceId devId, Ip4Address groupIp) {
+        SingleStateMachine machine = get(devId, groupIp);
+        if (null == machine) {
+            return;
+        }
+        machine.timeOut();
+    }
+
+    public static void clearMap() {
+        map.clear();
+    }
+
+    /**
+     * @param devId   id of the device being excluded
+     * @param groupIp group IP address
+     * @return true if this group has at least one listener connected to
+     * any device in the map except for the device specified; false otherwise.
+     */
+    private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+        for (SingleStateMachine machine : map.values()) {
+            if (machine.getDeviceId().equals(devId)) {
+                continue;
+            }
+            if (machine.getGroupIp().equals(groupIp)) {
+                //means group is being listened by other peers in the domain
+                return true;
+            }
+        }
+        return false;
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/package-info.java b/app/src/main/java/org/opencord/igmpproxy/impl/package-info.java
new file mode 100644
index 0000000..92e5a4c
--- /dev/null
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of IGMPProxy application.
+ */
+package org.opencord.igmpproxy.impl;