SEBA-986-CordMcastStatisticsService should use distributed storage infrastructure of ONOS
Change-Id: I3e22f3bde896f5a14b85fad88e5f84e58d3840a0
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
index 1044adc..e2f4779 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
@@ -19,6 +19,8 @@
import org.onlab.packet.VlanId;
import org.onlab.util.SafeRecurringTask;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.MulticastRouteService;
@@ -40,6 +42,7 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,13 +64,20 @@
extends AbstractListenerManager<CordMcastStatisticsEvent, CordMcastStatisticsEventListener>
implements CordMcastStatisticsService {
+ private static final String CORD_MCAST_STATISTICS_LEADERSHIP = "cord-mcast-statistics-leadership";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MulticastRouteService mcastService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
- CordMcastStatisticsEventListener listener;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
/**
* Multicast Statistics generation time interval.
@@ -82,6 +92,7 @@
@Activate
public void activate(ComponentContext context) {
+ leadershipService.runForLeadership(CORD_MCAST_STATISTICS_LEADERSHIP);
eventDispatcher.addSink(CordMcastStatisticsEvent.class, listenerRegistry);
executor = Executors.newScheduledThreadPool(1);
componentConfigService.registerProperties(getClass());
@@ -112,6 +123,8 @@
eventDispatcher.removeSink(CordMcastStatisticsEvent.class);
scheduledFuture.cancel(true);
executor.shutdown();
+ leadershipService.withdraw(CORD_MCAST_STATISTICS_LEADERSHIP);
+ log.info("CordMcastStatisticsManager deactivated.");
}
public List<CordMcastStatistics> getMcastDetails() {
@@ -139,6 +152,12 @@
* pushing mcast stat data as event.
*/
protected void publishEvent() {
+ // Only publish events if we are the cluster leader for Igmp-stats
+ if (!Objects.equals(leadershipService.getLeader(CORD_MCAST_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ log.debug("This is not leader of : {}", CORD_MCAST_STATISTICS_LEADERSHIP);
+ return;
+ }
log.debug("pushing cord mcast event to kafka");
List<CordMcastStatistics> routeList = getMcastDetails();
routeList.forEach(mcastStats -> {