Refactor to separate api from implementation

Change-Id: Ia3ff3eab719cd380f502d359367e65835693ce12
diff --git a/app/app.xml b/app/app.xml
new file mode 100644
index 0000000..ec9ce34
--- /dev/null
+++ b/app/app.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2020-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<app name="org.opencord.mcast" origin="ONF" version="${project.version}"
+     category="Traffic Steering" url="http://opencord.org" title="CORD Mcast app"
+     featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+     features="${project.artifactId}" apps="org.onosproject.mcast,org.opencord.sadis">
+    <description>${project.description}</description>
+    <artifact>mvn:${project.groupId}/mcast-api/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+</app>
diff --git a/app/features.xml b/app/features.xml
new file mode 100644
index 0000000..27289ef
--- /dev/null
+++ b/app/features.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Copyright 2020-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+    <feature name="${project.artifactId}" version="${project.version}"
+             description="${project.description}">
+        <feature>onos-api</feature>
+        <bundle>mvn:${project.groupId}/mcast-api/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
+    </feature>
+</features>
diff --git a/app/pom.xml b/app/pom.xml
new file mode 100644
index 0000000..674e530
--- /dev/null
+++ b/app/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2020-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opencord</groupId>
+        <artifactId>mcast</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>mcast-app</artifactId>
+
+    <packaging>bundle</packaging>
+    <description>CORD Multicast application</description>
+
+    <properties>
+        <onos.app.name>org.opencord.mcast</onos.app.name>
+        <onos.app.category>Traffic Steering</onos.app.category>
+        <onos.app.title>CORD Multicast App</onos.app.title>
+        <onos.app.url>http://opencord.org</onos.app.url>
+        <onos.app.requires>
+            org.onosproject.mcast,
+            org.opencord.sadis
+        </onos.app.requires>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+            <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-apps-mcast-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>mcast-api</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-junit</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
new file mode 100644
index 0000000..e014b79
--- /dev/null
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
@@ -0,0 +1,725 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordmcast.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastListener;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.opencord.cordmcast.CordMcastService;
+import org.opencord.cordmcast.CordMcastStatisticsService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.cordmcast.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+/**
+ * CORD multicast provisioning application. Operates by listening to
+ * events on the multicast rib and provisioning groups to program multicast
+ * flows on the dataplane.
+ */
+@Component(immediate = true,
+        property = {
+        VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
+        PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
+})
+public class CordMcast implements CordMcastService {
+    private static final String APP_NAME = "org.opencord.mcast";
+
+    private final Logger log = getLogger(getClass());
+
+    private static final int DEFAULT_PRIORITY = 500;
+    private static final short DEFAULT_MCAST_VLAN = 4000;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MulticastRouteService mcastService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowObjectiveService flowObjectiveService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry networkConfig;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    public DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CordMcastStatisticsService cordMcastStatisticsService;
+
+    protected McastListener listener = new InternalMulticastListener();
+
+    private InternalNetworkConfigListener configListener =
+            new InternalNetworkConfigListener();
+
+    private ConsistentMap<NextKey, NextContent> groups;
+
+    private ApplicationId appId;
+    private ApplicationId coreAppId;
+    private short mcastVlan = DEFAULT_MCAST_VLAN;
+
+    /**
+     * Whether to use VLAN for multicast traffic.
+     **/
+    private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
+
+    /**
+     * Priority for multicast rules.
+     **/
+    private int priority = DEFAULT_PRIORITY;
+
+    private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
+            McastConfig.class;
+
+    private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
+            new ConfigFactory<ApplicationId, McastConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
+                @Override
+                public McastConfig createConfig() {
+                    return new McastConfig();
+                }
+            };
+
+    // lock to synchronize local operations
+    private final Lock mcastLock = new ReentrantLock();
+    private void mcastLock() {
+        mcastLock.lock();
+    }
+    private void mcastUnlock() {
+        mcastLock.unlock();
+    }
+    private ExecutorService eventExecutor;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+
+        appId = coreService.registerApplication(APP_NAME);
+        coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
+
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
+                                                                        "events-mcast-%d", log));
+
+        KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
+                .register(KryoNamespaces.API)
+                .register(NextKey.class)
+                .register(NextContent.class);
+        groups = storageService
+                .<NextKey, NextContent>consistentMapBuilder()
+                .withName("cord-mcast-groups-store")
+                .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
+                .build();
+
+        networkConfig.registerConfigFactory(cordMcastConfigFactory);
+        networkConfig.addListener(configListener);
+        mcastService.addListener(listener);
+
+        mcastService.getRoutes().stream()
+                .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
+                .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
+                .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
+                                                                          sink)));
+
+        McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
+        updateConfig(config);
+        log.info("Started");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        String s = get(properties, VLAN_ENABLED);
+        vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
+
+        try {
+            s = get(properties, PRIORITY);
+            priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
+        } catch (NumberFormatException ne) {
+            log.error("Unable to parse configuration parameter for priority", ne);
+            priority = DEFAULT_PRIORITY;
+        }
+        cordMcastStatisticsService.setVlanValue(assignedVlan());
+    }
+
+    @Deactivate
+    public void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), false);
+        mcastService.removeListener(listener);
+        networkConfig.removeListener(configListener);
+        networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
+        eventExecutor.shutdown();
+        clearGroups();
+        groups.destroy();
+        log.info("Stopped");
+    }
+
+    public void clearGroups() {
+        mcastLock();
+        try {
+            groups.keySet().forEach(groupInfo -> {
+                if (!isLocalLeader(groupInfo.getDevice())) {
+                    return;
+                }
+                NextContent next = groups.get(groupInfo).value();
+
+                if (next != null) {
+                    ObjectiveContext context = new DefaultObjectiveContext(
+                            (objective) -> log.debug("Successfully remove {}",
+                                                     groupInfo.group),
+                            (objective, error) -> log.warn("Failed to remove {}: {}",
+                                                           groupInfo.group, error));
+                    // remove the flow rule
+                    flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
+                                                                                  groupInfo.group).remove(context));
+                    // remove all ports from the group
+                    next.getOutPorts().stream().forEach(portNumber ->
+                        flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
+                                                                                    portNumber,
+                                                                                    NextType.RemoveFromExisting,
+                                                                                    groupInfo.group))
+                    );
+
+                }
+            });
+            groups.clear();
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private VlanId multicastVlan() {
+        return VlanId.vlanId(mcastVlan);
+    }
+
+    protected VlanId assignedVlan() {
+        return vlanEnabled ? multicastVlan() : VlanId.NONE;
+    }
+
+    private class InternalMulticastListener implements McastListener {
+        @Override
+        public void event(McastEvent event) {
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case ROUTE_ADDED:
+                    case ROUTE_REMOVED:
+                    case SOURCES_ADDED:
+                        break;
+                    case SINKS_ADDED:
+                        addSinks(event);
+                        break;
+                    case SINKS_REMOVED:
+                        removeSinks(event);
+                        break;
+                    default:
+                        log.warn("Unknown mcast event {}", event.type());
+                }
+            });
+        }
+    }
+
+    /**
+     * Processes previous, and new sinks then finds the sinks to be removed.
+     * @param prevSinks the previous sinks to be evaluated
+     * @param newSinks the new sinks to be evaluated
+     * @returnt the set of the sinks to be removed
+     */
+    private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
+                                                  Map<HostId, Set<ConnectPoint>> newSinks) {
+        return getSinksToBeProcessed(prevSinks, newSinks);
+    }
+
+
+    /**
+     * Processes previous, and new sinks then finds the sinks to be added.
+     * @param newSinks the new sinks to be processed
+     * @param allPrevSinks all previous sinks
+     * @return the set of the sinks to be added
+     */
+    private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
+                                                Map<HostId, Set<ConnectPoint>> allPrevSinks) {
+        return getSinksToBeProcessed(newSinks, allPrevSinks);
+    }
+
+    /**
+     * Gets single-homed sinks that are in set1 but not in set2.
+     * @param sinkSet1 the first sink map
+     * @param sinkSet2 the second sink map
+     * @return a set containing all the single-homed sinks found in set1 but not in set2
+     */
+    private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
+                                                    Map<HostId, Set<ConnectPoint>> sinkSet2) {
+        final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+        sinkSet1.forEach(((hostId, connectPoints) -> {
+            if (HostId.NONE.equals(hostId)) {
+                //assume all connect points associated with HostId.NONE are single homed sinks
+                sinksToBeProcessed.addAll(connectPoints);
+                return;
+            }
+        }));
+        Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
+                Sets.newHashSet() :
+                sinkSet2.get(HostId.NONE);
+        return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
+    };
+
+
+    private void removeSinks(McastEvent event) {
+        mcastLock();
+        try {
+            Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
+                                                                     event.subject().sinks());
+            sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private void removeSink(IpAddress group, ConnectPoint sink) {
+        if (!isLocalLeader(sink.deviceId())) {
+            log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
+                      sink.deviceId(), sink, group);
+            return;
+        }
+
+        Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
+
+        if (!oltInfo.isPresent()) {
+            log.warn("Unknown OLT device : {}", sink.deviceId());
+            return;
+        }
+
+        log.debug("Removing sink {} from the group {}", sink, group);
+
+        NextKey key = new NextKey(sink.deviceId(), group);
+        groups.computeIfPresent(key, (k, v) -> {
+            flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
+                                                                  NextType.RemoveFromExisting, group));
+
+            Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
+            outPorts.remove(sink.port());
+
+            if (outPorts.isEmpty()) {
+                // this is the last sink
+                ObjectiveContext context = new DefaultObjectiveContext(
+                        (objective) -> log.debug("Successfully remove {} on {}",
+                                                 group, sink),
+                        (objective, error) -> log.warn("Failed to remove {} on {}: {}",
+                                                       group, sink, error));
+                ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
+                flowObjectiveService.forward(sink.deviceId(), fwdObj);
+            }
+            // remove the whole entity if no out port exists in the port list
+            return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
+                                                               ImmutableSet.copyOf(outPorts));
+        });
+    }
+
+    private void addSinks(McastEvent event) {
+        mcastLock();
+        try {
+            Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
+                                                                 event.prevSubject().sinks());
+            sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    private void addSink(McastRoute route, ConnectPoint sink) {
+
+        if (!isLocalLeader(sink.deviceId())) {
+            log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
+                      sink.deviceId(), sink, route.group());
+            return;
+        }
+
+        Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
+
+        if (!oltInfo.isPresent()) {
+            log.warn("Unknown OLT device : {}", sink.deviceId());
+            return;
+        }
+
+        log.debug("Adding sink {} to the group {}", sink, route.group());
+
+        NextKey key = new NextKey(sink.deviceId(), route.group());
+        NextObjective newNextObj;
+
+        boolean theFirstSinkOfGroup = false;
+        if (!groups.containsKey(key)) {
+            // First time someone request this mcast group via this device
+            Integer nextId = flowObjectiveService.allocateNextId();
+            newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
+            // Store the new port
+            groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
+            theFirstSinkOfGroup = true;
+        } else {
+            // This device already serves some subscribers of this mcast group
+            Versioned<NextContent> nextObj = groups.get(key);
+            if (nextObj.value().getOutPorts().contains(sink.port())) {
+                log.info("Group {} already serves the sink connected to {}", route.group(), sink);
+                return;
+            }
+            newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
+                                    NextType.AddToExisting, route.group());
+            // add new port to the group
+            Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
+            outPorts.add(sink.port());
+            groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
+        }
+
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+                                         route.group(), sink.deviceId(), sink.port().toLong(),
+                                         assignedVlan()),
+                (objective, error) -> {
+                    log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+                             route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
+                             error);
+                });
+
+        flowObjectiveService.next(sink.deviceId(), newNextObj);
+
+        if (theFirstSinkOfGroup) {
+            // create the necessary flow rule if this is the first sink request for the group
+            // on this device
+            flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
+                                                                    route.group()).add(context));
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param serialNumber serial number of a device
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+        long start = System.currentTimeMillis();
+        try {
+            return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+        } finally {
+            if (log.isDebugEnabled()) {
+                // SADIS may call remote systems to fetch device data and this calls can take a long time.
+                // This measurement is just for monitoring these kinds of situations.
+                log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+            }
+
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param deviceId device id
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        return getSubscriberAndDeviceInformation(device.serialNumber());
+    }
+
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+
+                    case CONFIG_ADDED:
+                    case CONFIG_UPDATED:
+                        if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
+                            McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
+                            if (config != null) {
+                                //TODO: Simply remove flows/groups, hosts will response period query
+                                // and re-sent IGMP report, so the flows can be rebuild.
+                                // However, better to remove and re-add mcast flow rules here
+                                if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
+                                    clearGroups();
+                                }
+                                updateConfig(config);
+                            }
+                        }
+                        break;
+                    case CONFIG_REGISTERED:
+                    case CONFIG_UNREGISTERED:
+                    case CONFIG_REMOVED:
+                        break;
+                    default:
+                        break;
+                }
+            });
+        }
+    }
+
+    private void updateConfig(McastConfig config) {
+        if (config == null) {
+            return;
+        }
+        log.debug("multicast config received: {}", config);
+
+        if (config.egressVlan() != null) {
+            mcastVlan = config.egressVlan().toShort();
+        }
+    }
+
+    private class NextKey {
+        private DeviceId device;
+        private IpAddress group;
+
+        public NextKey(DeviceId deviceId, IpAddress groupAddress) {
+            device = deviceId;
+            group = groupAddress;
+        }
+
+        public DeviceId getDevice() {
+            return device;
+        }
+
+        public int hashCode() {
+            return Objects.hash(this.device, this.group);
+        }
+
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            } else if (!(obj instanceof NextKey)) {
+                return false;
+            } else {
+                NextKey that = (NextKey) obj;
+                return this.getClass() == that.getClass() &&
+                        Objects.equals(this.device, that.device) &&
+                        Objects.equals(this.group, that.group);
+            }
+        }
+    }
+
+    private class NextContent {
+        private Integer nextId;
+        private Set<PortNumber> outPorts;
+
+        public NextContent(Integer nextId, Set<PortNumber> outPorts) {
+            this.nextId = nextId;
+            this.outPorts = outPorts;
+        }
+
+        public Integer getNextId() {
+            return nextId;
+        }
+
+        public Set<PortNumber> getOutPorts() {
+            return ImmutableSet.copyOf(outPorts);
+        }
+
+        public int hashCode() {
+            return Objects.hash(this.nextId, this.outPorts);
+        }
+
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            } else if (!(obj instanceof NextContent)) {
+                return false;
+            } else {
+                NextContent that = (NextContent) obj;
+                return this.getClass() == that.getClass() &&
+                        Objects.equals(this.nextId, that.nextId) &&
+                        Objects.equals(this.outPorts, that.outPorts);
+            }
+        }
+    }
+
+    private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
+
+    private NextObjective nextObject(Integer nextId, PortNumber port,
+                                     NextType nextType, IpAddress mcastIp) {
+
+        // Build the meta selector with the fwd objective info
+        TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
+                .matchIPDst(mcastIp.toIpPrefix());
+
+        if (vlanEnabled) {
+            metadata.matchVlanId(multicastVlan());
+        }
+
+        DefaultNextObjective.Builder build = DefaultNextObjective.builder()
+                .fromApp(appId)
+                .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
+                .withType(NextObjective.Type.BROADCAST)
+                .withId(nextId)
+                .withMeta(metadata.build());
+
+        ObjectiveContext content = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.debug("Next Objective {} installed", objective.id());
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.debug("Next Objective {} failed, because {}",
+                          objective.id(),
+                          error);
+            }
+        };
+
+        switch (nextType) {
+            case AddNew:
+                return build.add(content);
+            case AddToExisting:
+                return build.addToExisting(content);
+            case Remove:
+                return build.remove(content);
+            case RemoveFromExisting:
+                return build.removeFromExisting(content);
+            default:
+                return null;
+        }
+    }
+
+    private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
+        TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(mcastIp.toIpPrefix());
+
+        //build the meta selector
+        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+        if (vlanEnabled) {
+            metabuilder.matchVlanId(multicastVlan());
+        }
+
+        ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
+                .fromApp(appId)
+                .nextStep(nextId)
+                .makePermanent()
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .withPriority(priority)
+                .withSelector(mcast.build())
+                .withMeta(metabuilder.build());
+
+        return fwdBuilder;
+    }
+
+    // Custom-built function, when the device is not available we need a fallback mechanism
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            // When the device is available we just check the mastership
+            if (deviceService.isAvailable(deviceId)) {
+                return false;
+            }
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+        return true;
+    }
+
+}
+
+
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
new file mode 100644
index 0000000..a5715f4
--- /dev/null
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordmcast.impl;
+
+
+import org.onlab.packet.VlanId;
+import org.onlab.util.SafeRecurringTask;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
+import org.opencord.cordmcast.CordMcastStatistics;
+import org.opencord.cordmcast.CordMcastStatisticsEvent;
+import org.opencord.cordmcast.CordMcastStatisticsEventListener;
+import org.opencord.cordmcast.CordMcastStatisticsService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.cordmcast.impl.OsgiPropertyConstants.EVENT_GENERATION_PERIOD;
+import static org.opencord.cordmcast.impl.OsgiPropertyConstants.EVENT_GENERATION_PERIOD_DEFAULT;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * For managing CordMcastStatisticsEvent.
+ */
+@Component(immediate = true, property = { EVENT_GENERATION_PERIOD + ":Integer=" + EVENT_GENERATION_PERIOD_DEFAULT, })
+public class CordMcastStatisticsManager
+        extends AbstractListenerManager<CordMcastStatisticsEvent, CordMcastStatisticsEventListener>
+        implements CordMcastStatisticsService {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MulticastRouteService mcastService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    CordMcastStatisticsEventListener listener;
+
+    /**
+     * Multicast Statistics generation time interval.
+     **/
+    private int eventGenerationPeriodInSeconds = EVENT_GENERATION_PERIOD_DEFAULT;
+    private final Logger log = getLogger(getClass());
+    private ScheduledFuture<?> scheduledFuture = null;
+    private ScheduledExecutorService executor;
+
+    private VlanId vlanId;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        eventDispatcher.addSink(CordMcastStatisticsEvent.class, listenerRegistry);
+        executor = Executors.newScheduledThreadPool(1);
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+        log.info("CordMcastStatisticsManager activated.");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+        try {
+            String s = get(properties, EVENT_GENERATION_PERIOD);
+            eventGenerationPeriodInSeconds =
+                    isNullOrEmpty(s) ? EVENT_GENERATION_PERIOD_DEFAULT : Integer.parseInt(s.trim());
+        } catch (NumberFormatException ne) {
+            log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+            eventGenerationPeriodInSeconds = EVENT_GENERATION_PERIOD_DEFAULT;
+        }
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(true);
+        }
+        scheduledFuture = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishEvent),
+                0, eventGenerationPeriodInSeconds, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        eventDispatcher.removeSink(CordMcastStatisticsEvent.class);
+        scheduledFuture.cancel(true);
+        executor.shutdown();
+    }
+
+    public List<CordMcastStatistics> getMcastDetails() {
+        List<CordMcastStatistics> mcastData = new ArrayList<CordMcastStatistics>();
+        Set<McastRoute> routes = mcastService.getRoutes();
+        routes.forEach(route -> {
+            mcastData.add(new CordMcastStatistics(route.group(),
+                    route.source().isEmpty() ? "*" : route.source().get().toString(),
+                    vlanId));
+        });
+        return mcastData;
+    }
+
+    @Override
+    public void setVlanValue(VlanId vlanValue) {
+        vlanId = vlanValue;
+    }
+
+    /**
+     * pushing mcast stat data as event.
+     */
+    protected void publishEvent() {
+        log.debug("pushing cord mcast event to kafka");
+        List<CordMcastStatistics> routeList = getMcastDetails();
+        routeList.forEach(mcastStats -> {
+            log.debug("Group: " +
+                    (mcastStats.getGroupAddress() != null ? mcastStats.getGroupAddress().toString() : "null") +
+                    " | Source: " +
+                    (mcastStats.getSourceAddress() != null ? mcastStats.getSourceAddress().toString() : "null") +
+                    " | Vlan: " +
+                    (mcastStats.getVlanId() != null ? mcastStats.getVlanId().toString() : "null"));
+        });
+        post(new CordMcastStatisticsEvent(CordMcastStatisticsEvent.Type.STATUS_UPDATE, routeList));
+    }
+}
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/cordmcast/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..dcd5077
--- /dev/null
+++ b/app/src/main/java/org/opencord/cordmcast/impl/OsgiPropertyConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.cordmcast.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String VLAN_ENABLED = "vlanEnabled";
+    public static final boolean DEFAULT_VLAN_ENABLED = true;
+
+    public static final String PRIORITY = "priority";
+    public static final int DEFAULT_PRIORITY = 500;
+
+    public static final String EVENT_GENERATION_PERIOD = "eventGenerationPeriodInSeconds";
+    public static final int EVENT_GENERATION_PERIOD_DEFAULT = 30;
+}
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/package-info.java b/app/src/main/java/org/opencord/cordmcast/impl/package-info.java
new file mode 100644
index 0000000..841d95c
--- /dev/null
+++ b/app/src/main/java/org/opencord/cordmcast/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Application for provisioning multicast streams in the context of cord.
+ */
+package org.opencord.cordmcast.impl;
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
new file mode 100644
index 0000000..a622e6d
--- /dev/null
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
@@ -0,0 +1,395 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordmcast.impl;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.McastRouteUpdate;
+import org.onosproject.mcast.api.MulticastRouteService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.store.service.StorageServiceAdapter;
+import org.onosproject.store.service.TestConsistentMap;
+import org.opencord.cordmcast.CordMcastStatistics;
+import org.opencord.cordmcast.CordMcastStatisticsEvent;
+import org.osgi.service.component.ComponentContext;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.assertAfter;
+
+public class McastTest extends McastTestBase {
+
+  private CordMcast cordMcast;
+  private CordMcastStatisticsManager cordMcastStatisticsManager;
+
+  private MockCordMcastStatisticsEventListener mockListener = new MockCordMcastStatisticsEventListener();
+
+  private static final int WAIT_TIMEOUT = 1000;
+  private static final int WAIT = 250;
+  McastRouteUpdate previousSubject, currentSubject;
+
+  @Before
+  public void setUp() {
+      cordMcast = new CordMcast();
+      cordMcastStatisticsManager = new CordMcastStatisticsManager();
+
+      cordMcast.coreService = new MockCoreService();
+      cordMcast.networkConfig = new TestNetworkConfigRegistry();
+      cordMcast.flowObjectiveService = new MockFlowObjectiveService();
+      cordMcast.mastershipService = new TestMastershipService();
+      cordMcast.deviceService = new MockDeviceService();
+      cordMcast.componentConfigService = new ComponentConfigAdapter();
+      cordMcastStatisticsManager.componentConfigService = new ComponentConfigAdapter();
+      cordMcastStatisticsManager.addListener(mockListener);
+      cordMcast.sadisService = new MockSadisService();
+      cordMcast.cordMcastStatisticsService = cordMcastStatisticsManager;
+
+      cordMcast.storageService = EasyMock.createMock(StorageServiceAdapter.class);
+      expect(cordMcast.storageService.consistentMapBuilder()).andReturn(new TestConsistentMap.Builder<>());
+      replay(cordMcast.storageService);
+
+      Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+      cfgDict.put("vlanEnabled", false);
+      cfgDict.put("eventGenerationPeriodInSeconds", EVENT_GENERATION_PERIOD);
+
+      cordMcast.componentConfigService = EasyMock.createNiceMock(ComponentConfigService.class);
+      replay(cordMcast.componentConfigService);
+
+      Set<McastRoute> route1Set = new HashSet<McastRoute>();
+      route1Set.add(route1);
+
+      cordMcast.mcastService = EasyMock.createNiceMock(MulticastRouteService.class);
+      expect(cordMcast.mcastService.getRoutes()).andReturn(Sets.newHashSet());
+      replay(cordMcast.mcastService);
+
+      cordMcastStatisticsManager.mcastService = EasyMock.createNiceMock(MulticastRouteService.class);
+      expect(cordMcastStatisticsManager.mcastService.getRoutes()).andReturn(route1Set).times(2);
+      replay(cordMcastStatisticsManager.mcastService);
+
+      TestUtils.setField(cordMcastStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+
+      ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
+      expect(componentContext.getProperties()).andReturn(cfgDict).times(2);
+      replay(componentContext);
+      cordMcast.cordMcastStatisticsService = cordMcastStatisticsManager;
+      cordMcastStatisticsManager.activate(componentContext);
+
+      cordMcast.activate(componentContext);
+   }
+
+    @After
+    public void tearDown() {
+      cordMcast.deactivate();
+      forwardMap.clear();
+      nextMap.clear();
+    }
+
+    @Test
+    public void testAddingSinkEvent() throws InterruptedException {
+
+      Set<ConnectPoint> sinks2Cp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_B));
+      Map<HostId, Set<ConnectPoint>> sinks2 = ImmutableMap.of(HOST_ID_NONE, sinks2Cp);
+
+      //Adding the details to create different routes
+      previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+      currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks2);
+      // Creating new mcast event for adding sink
+      McastEvent event = new McastEvent(McastEvent.Type.SINKS_ADDED, previousSubject, currentSubject);
+      cordMcast.listener.event(event);
+      synchronized (forwardMap) {
+        forwardMap.wait(WAIT_TIMEOUT);
+      }
+
+      // ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
+      assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
+      assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+
+      // Output port number will be PORT_B i.e. 16
+      Collection<TrafficTreatment> traffictreatMentCollection =
+           nextMap.get(DEVICE_ID_OF_A).next();
+      assertTrue(1 == traffictreatMentCollection.size());
+      OutputInstruction output = null;
+      for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
+         output = outputPort(trafficTreatment);
+      }
+      assertNotNull(output);
+      assertTrue(PORT_B == output.port());
+      // Checking the group ip address
+      TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
+      IPCriterion ipCriterion = ipAddress(trafficSelector);
+      assertNotNull(ipCriterion);
+      assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+
+    }
+
+    @Test
+    public void testAddToExistingSinkEvent() throws InterruptedException {
+
+       // Adding first sink (none --> CP_B)
+       testAddingSinkEvent();
+
+       Set<ConnectPoint> sinksCp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_B));
+       Map<HostId, Set<ConnectPoint>> sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       sinksCp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_B, CONNECT_POINT_C));
+       sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       // Again listening the mcast event with different output port ( none --> CP_B, CP_C)
+       McastEvent event = new McastEvent(McastEvent.Type.SINKS_ADDED, previousSubject, currentSubject);
+       cordMcast.listener.event(event);
+
+       // NextMap will contain the operation "ADD_TO_EXISTING" in the DefaultNextObjective.
+       assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD_TO_EXISTING));
+       // Output port number will be changed to 24 i.e. PORT_C
+       Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
+       assertTrue(1 == traffictreatMentCollection.size());
+       OutputInstruction output = null;
+       for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
+          output = outputPort(trafficTreatment);
+       }
+       assertNotNull(output);
+       assertTrue(PORT_C == output.port());
+    }
+
+    @Test
+    public void testRemoveSinkEvent() throws InterruptedException {
+
+       testAddToExistingSinkEvent();
+       // Handling the mcast event for removing sink.
+       Set<ConnectPoint> sinksCp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_B, CONNECT_POINT_C));
+       Map<HostId, Set<ConnectPoint>> sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       sinksCp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_C));
+       sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       McastEvent event = new McastEvent(McastEvent.Type.SINKS_REMOVED, previousSubject, currentSubject);
+       cordMcast.listener.event(event);
+       // Operation will be REMOVE_FROM_EXISTING and nextMap will be updated. ( None --> CP_C)
+       assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
+
+       // Output port number will be PORT_B i.e. 16
+       // Port_B is removed from the group.
+       Collection<TrafficTreatment> traffictreatMentCollection =
+            nextMap.get(DEVICE_ID_OF_A).next();
+       assertTrue(1 == traffictreatMentCollection.size());
+       OutputInstruction output = null;
+       for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
+          output = outputPort(trafficTreatment);
+       }
+       assertNotNull(output);
+       assertTrue(PORT_B == output.port());
+
+    }
+
+    @Test
+    public void testRemoveLastSinkEvent() throws InterruptedException {
+
+       testRemoveSinkEvent();
+       // Handling the mcast event for removing sink.
+       Set<ConnectPoint> sinksCp = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_C));
+       Map<HostId, Set<ConnectPoint>> sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       sinksCp = new HashSet<ConnectPoint>(Arrays.asList());
+       sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       McastEvent event = new McastEvent(McastEvent.Type.SINKS_REMOVED, previousSubject, currentSubject);
+       cordMcast.listener.event(event);
+
+       // Operation will be REMOVE_FROM_EXISTING and nextMap will be updated.  None --> { }
+       assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
+
+       // Output port number will be changed to 24 i.e. PORT_C
+       Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
+       assertTrue(1 == traffictreatMentCollection.size());
+       OutputInstruction output = null;
+       for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
+          output = outputPort(trafficTreatment);
+       }
+       assertNotNull(output);
+       assertTrue(PORT_C == output.port());
+  }
+
+  @Test
+  public void testUnkownOltDevice() throws InterruptedException {
+
+       // Configuration of mcast event for unknown olt device
+       final DeviceId deviceIdOfB = DeviceId.deviceId("of:1");
+
+       ConnectPoint connectPointA = new ConnectPoint(deviceIdOfB, PORT_A);
+       ConnectPoint connectPointB = new ConnectPoint(deviceIdOfB, PORT_B);
+       Set<ConnectPoint> sourcesCp = new HashSet<ConnectPoint>(Arrays.asList(connectPointA));
+       Set<ConnectPoint> sinksCp = new HashSet<ConnectPoint>(Arrays.asList());
+       Set<ConnectPoint> sinks2Cp = new HashSet<ConnectPoint>(Arrays.asList(connectPointB));
+       Map<HostId, Set<ConnectPoint>> sources = ImmutableMap.of(HOST_ID_NONE, sourcesCp);
+
+       Map<HostId, Set<ConnectPoint>> sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+       Map<HostId, Set<ConnectPoint>> sinks2 = ImmutableMap.of(HOST_ID_NONE, sinks2Cp);
+       //Adding the details to create different routes
+       McastRouteUpdate previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+       McastRouteUpdate currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks2);
+       // Creating new mcast event for adding sink
+       McastEvent event = new McastEvent(McastEvent.Type.SINKS_ADDED, previousSubject, currentSubject);
+       cordMcast.listener.event(event);
+       // OltInfo flag is set to true when olt device is unkown
+       assertAfter(WAIT, WAIT * 2, () -> assertTrue(knownOltFlag));
+       assertTrue(0 == forwardMap.size());
+       assertTrue(0 == nextMap.size());
+
+  }
+
+  @Test
+  public void testRouteAddedEvent() throws InterruptedException {
+
+      //Adding the details to create different routes
+      previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      // Creating new mcast event for route adding
+      McastEvent event = new McastEvent(McastEvent.Type.ROUTE_ADDED, previousSubject, currentSubject);
+      cordMcast.listener.event(event);
+      // There will be no forwarding objective
+      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
+      assertTrue(0 == nextMap.size());
+
+   }
+
+
+  @Test
+  public void testRouteRemovedEvent() throws InterruptedException {
+
+      testRouteAddedEvent();
+
+      //Adding the details to create different routes
+      previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      // Creating new mcast event for route removing
+      McastEvent event = new McastEvent(McastEvent.Type.ROUTE_REMOVED, previousSubject, currentSubject);
+      cordMcast.listener.event(event);
+      // There will be no forwarding objective
+      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
+      assertTrue(0 == nextMap.size());
+
+   }
+
+  @Test
+  public void testSourceAddedEvent() throws InterruptedException {
+
+      // Adding route before adding source.
+      testRouteAddedEvent();
+
+      //Adding the details to create different routes
+      previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+      // Creating new mcast event for source adding
+      McastEvent event = new McastEvent(McastEvent.Type.SOURCES_ADDED, previousSubject, currentSubject);
+      cordMcast.listener.event(event);
+      // There will be no forwarding objective
+      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
+      assertTrue(0 == nextMap.size());
+
+   }
+
+  @Test
+  public void testSourcesRemovedEvent() throws InterruptedException {
+
+      testSourceAddedEvent();
+
+      //Adding the details to create different routes
+      previousSubject = McastRouteUpdate.mcastRouteUpdate(route1, sources, sinks);
+      currentSubject = McastRouteUpdate.mcastRouteUpdate(route1, emptySource, sinks);
+      // Creating new mcast event for removing source
+      // Warning message of unknown event will be displayed.
+      McastEvent event = new McastEvent(McastEvent.Type.SOURCES_REMOVED, previousSubject, currentSubject);
+      cordMcast.listener.event(event);
+      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
+      assertTrue(0 == nextMap.size());
+   }
+
+    @Test
+    public void mcastTestEventGeneration() throws InterruptedException {
+      //fetching route details used to push CordMcastStatisticsEvent.
+      IpAddress testGroup = route1.group();
+      String testSource = route1.source().isEmpty() ? "*" : route1.source().get().toString();
+      VlanId testVlan = cordMcast.assignedVlan();
+
+      // Thread is scheduled without any delay
+      assertAfter(WAIT, WAIT * 2, () ->
+              assertEquals(1, mockListener.mcastEventList.size()));
+
+      for (CordMcastStatisticsEvent event: mockListener.mcastEventList) {
+           assertEquals(event.type(), CordMcastStatisticsEvent.Type.STATUS_UPDATE);
+      }
+
+      CordMcastStatistics cordMcastStatistics = mockListener.mcastEventList.get(0).subject().get(0);
+      assertEquals(VlanId.NONE, cordMcastStatistics.getVlanId());
+      assertEquals(testVlan, cordMcastStatistics.getVlanId());
+      assertEquals(testSource, cordMcastStatistics.getSourceAddress());
+      assertEquals(testGroup, cordMcastStatistics.getGroupAddress());
+
+      // Test for vlanEnabled
+      Dictionary<String, Object> cfgDict = new Hashtable<>();
+      cfgDict.put("vlanEnabled", true);
+
+      ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
+      expect(componentContext.getProperties()).andReturn(cfgDict);
+      replay(componentContext);
+      cordMcast.modified(componentContext);
+      testVlan = cordMcast.assignedVlan();
+
+      assertAfter(EVENT_GENERATION_PERIOD, EVENT_GENERATION_PERIOD * 1000, () ->
+              assertEquals(2, mockListener.mcastEventList.size()));
+
+      for (CordMcastStatisticsEvent event: mockListener.mcastEventList) {
+          assertEquals(event.type(), CordMcastStatisticsEvent.Type.STATUS_UPDATE);
+      }
+
+      cordMcastStatistics = mockListener.mcastEventList.get(1).subject().get(0);
+      assertNotEquals(VlanId.NONE, cordMcastStatistics.getVlanId());
+      assertEquals(testVlan, cordMcastStatistics.getVlanId());
+      assertEquals(testSource, cordMcastStatistics.getSourceAddress());
+      assertEquals(testGroup, cordMcastStatistics.getGroupAddress());
+    }
+}
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
new file mode 100644
index 0000000..17ad11b
--- /dev/null
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordmcast.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.TestApplicationId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+
+import com.google.common.collect.ImmutableMap;
+import org.opencord.cordmcast.CordMcastStatisticsEvent;
+import org.opencord.cordmcast.CordMcastStatisticsEventListener;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class McastTestBase {
+
+     // Map to store the forwardingObjective in flowObjectiveService.forward()
+     Map<DeviceId, ForwardingObjective> forwardMap = new HashMap<>();
+     // Map to store the nextObjective in flowObjectiveService.next()
+     Map<DeviceId, NextObjective> nextMap = new HashMap<>();
+     // Device configuration
+     protected static final DeviceId DEVICE_ID_OF_A = DeviceId.deviceId("of:00000a0a0a0a0a00");
+     // Port number
+     protected static final PortNumber PORT_A = PortNumber.portNumber(1048576);
+     protected static final PortNumber PORT_B = PortNumber.portNumber(16);
+     protected static final PortNumber PORT_C = PortNumber.portNumber(24);
+
+     // Connect Point for creating source and sink
+     protected static final ConnectPoint CONNECT_POINT_A = new ConnectPoint(DEVICE_ID_OF_A, PORT_A);
+     protected static final ConnectPoint CONNECT_POINT_B = new ConnectPoint(DEVICE_ID_OF_A, PORT_B);
+     protected static final ConnectPoint CONNECT_POINT_C = new ConnectPoint(DEVICE_ID_OF_A, PORT_C);
+
+     // serial number of the device A
+     protected static final String SERIAL_NUMBER_OF_DEVICE_A = "serialNumberOfDevA";
+     // Management ip address of the device A
+     protected static final Ip4Address MANAGEMENT_IP_OF_A = Ip4Address.valueOf("10.177.125.4");
+     //Host id configuration
+     protected static final HostId HOST_ID_NONE = HostId.NONE;
+     // Source connect point
+     protected static final  Set<ConnectPoint> SOURCES_CP = new HashSet<ConnectPoint>(Arrays.asList(CONNECT_POINT_A));
+     Map<HostId, Set<ConnectPoint>> sources = ImmutableMap.of(HOST_ID_NONE, SOURCES_CP);
+
+     protected static final IpAddress MULTICAST_IP = IpAddress.valueOf("224.0.0.22");
+     protected static final IpAddress SOURCE_IP = IpAddress.valueOf("192.168.1.1");
+     // Creating dummy route with IGMP type.
+     McastRoute route1 = new McastRoute(SOURCE_IP, MULTICAST_IP, McastRoute.Type.IGMP);
+
+     // Creating empty sink used in prevRoute
+     Set<ConnectPoint> sinksCp = new HashSet<ConnectPoint>(Arrays.asList());
+     Map<HostId, Set<ConnectPoint>> sinks = ImmutableMap.of(HOST_ID_NONE, sinksCp);
+
+     // Creating empty source
+     Set<ConnectPoint> sourceCp = new HashSet<ConnectPoint>(Arrays.asList());
+     Map<HostId, Set<ConnectPoint>> emptySource = ImmutableMap.of(HOST_ID_NONE, sourceCp);
+
+     // Flag to check unknown olt device
+     boolean knownOltFlag = false;
+
+     // For the tests reduce events period to 1s
+     protected static final int EVENT_GENERATION_PERIOD = 1;
+
+     class MockCoreService extends CoreServiceAdapter {
+          @Override
+          public ApplicationId registerApplication(String name) {
+               ApplicationId testApplicationId = TestApplicationId.create("org.opencord.cordmcast");
+               return testApplicationId;
+          }
+     }
+
+     class MockFlowObjectiveService extends FlowObjectiveServiceAdapter {
+          @Override
+          public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+              synchronized (forwardMap) {
+                forwardMap.put(deviceId, forwardingObjective);
+                forwardMap.notify();
+              }
+          }
+
+          @Override
+          public void next(DeviceId deviceId, NextObjective nextObjective) {
+             nextMap.put(deviceId, nextObjective);
+          }
+     }
+
+     class TestMastershipService extends MastershipServiceAdapter {
+          @Override
+          public boolean isLocalMaster(DeviceId deviceId) {
+               return true;
+          }
+     }
+
+    protected class MockSadisService implements SadisService {
+
+        @Override
+        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+            return new MockSubService();
+        }
+
+        @Override
+        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+            return null;
+        }
+    }
+
+    /**
+     * Mocks the McastConfig class to return vlan id value.
+     */
+    static class MockMcastConfig extends McastConfig {
+        @Override
+        public VlanId egressVlan() {
+            return VlanId.vlanId("4000");
+        }
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    @SuppressWarnings("unchecked")
+    static final class TestNetworkConfigRegistry
+            extends NetworkConfigRegistryAdapter {
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            McastConfig mcastConfig = new MockMcastConfig();
+            return (C) mcastConfig;
+        }
+    }
+
+    public static class TestEventDispatcher extends DefaultEventSinkRegistry
+            implements EventDeliveryService {
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public synchronized void post(Event event) {
+            EventSink sink = getSink(event.getClass());
+            checkState(sink != null, "No sink for event %s", event);
+            sink.process(event);
+        }
+
+        @Override
+        public void setDispatchTimeLimit(long millis) {
+
+        }
+
+        @Override
+        public long getDispatchTimeLimit() {
+            return 0;
+        }
+    }
+
+    public static class MockCordMcastStatisticsEventListener implements CordMcastStatisticsEventListener {
+        protected List<CordMcastStatisticsEvent> mcastEventList = new ArrayList<CordMcastStatisticsEvent>();
+
+        @Override
+        public void event(CordMcastStatisticsEvent event) {
+            mcastEventList.add(event);
+        }
+    }
+
+    private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+        MockSubscriberAndDeviceInformation deviceA =
+                new MockSubscriberAndDeviceInformation(SERIAL_NUMBER_OF_DEVICE_A, MANAGEMENT_IP_OF_A);
+
+        @Override
+        public SubscriberAndDeviceInformation get(String id) {
+            return SERIAL_NUMBER_OF_DEVICE_A.equals(id) ? deviceA : null;
+        }
+
+        @Override
+        public void invalidateAll() {
+        }
+
+        @Override
+        public void invalidateId(String id) {
+        }
+
+        @Override
+        public SubscriberAndDeviceInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    private class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+        MockSubscriberAndDeviceInformation(String id, Ip4Address ipAddress) {
+            this.setId(id);
+            this.setIPAddress(ipAddress);
+            this.setUplinkPort((int) PORT_A.toLong());
+        }
+    }
+
+    class MockDeviceService extends DeviceServiceAdapter {
+
+        @Override
+        public Device getDevice(DeviceId deviceId) {
+            if (DEVICE_ID_OF_A.equals(deviceId)) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, MANAGEMENT_IP_OF_A.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+
+                Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "",
+                        "", SERIAL_NUMBER_OF_DEVICE_A, null, da);
+                return deviceA;
+            } else {
+                knownOltFlag = true;
+            }
+            return null;
+        }
+    }
+
+     public OutputInstruction outputPort(TrafficTreatment trafficTreatment) {
+         List<Instruction> listOfInstructions = trafficTreatment.allInstructions();
+         OutputInstruction output = null;
+         for (Instruction intruction : listOfInstructions) {
+           output = (OutputInstruction) intruction;
+         }
+         return output;
+     }
+
+     public IPCriterion ipAddress(TrafficSelector trafficSelector) {
+       Set<Criterion> criterionSet = trafficSelector.criteria();
+       Iterator<Criterion> it = criterionSet.iterator();
+       IPCriterion ipCriterion = null;
+       while (it.hasNext()) {
+           Criterion criteria = it.next();
+           if (Criterion.Type.IPV4_DST == criteria.type()) {
+             ipCriterion = (IPCriterion) criteria;
+           }
+       }
+       return (IPCriterion) ipCriterion;
+     }
+
+}