SEBA-986-CordMcastStatisticsService should use distributed storage infrastructure of ONOS
Change-Id: I3e22f3bde896f5a14b85fad88e5f84e58d3840a0
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
index 1044adc..e2f4779 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
@@ -19,6 +19,8 @@
import org.onlab.packet.VlanId;
import org.onlab.util.SafeRecurringTask;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.MulticastRouteService;
@@ -40,6 +42,7 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,13 +64,20 @@
extends AbstractListenerManager<CordMcastStatisticsEvent, CordMcastStatisticsEventListener>
implements CordMcastStatisticsService {
+ private static final String CORD_MCAST_STATISTICS_LEADERSHIP = "cord-mcast-statistics-leadership";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MulticastRouteService mcastService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
- CordMcastStatisticsEventListener listener;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
/**
* Multicast Statistics generation time interval.
@@ -82,6 +92,7 @@
@Activate
public void activate(ComponentContext context) {
+ leadershipService.runForLeadership(CORD_MCAST_STATISTICS_LEADERSHIP);
eventDispatcher.addSink(CordMcastStatisticsEvent.class, listenerRegistry);
executor = Executors.newScheduledThreadPool(1);
componentConfigService.registerProperties(getClass());
@@ -112,6 +123,8 @@
eventDispatcher.removeSink(CordMcastStatisticsEvent.class);
scheduledFuture.cancel(true);
executor.shutdown();
+ leadershipService.withdraw(CORD_MCAST_STATISTICS_LEADERSHIP);
+ log.info("CordMcastStatisticsManager deactivated.");
}
public List<CordMcastStatistics> getMcastDetails() {
@@ -139,6 +152,12 @@
* pushing mcast stat data as event.
*/
protected void publishEvent() {
+ // Only publish events if we are the cluster leader for Igmp-stats
+ if (!Objects.equals(leadershipService.getLeader(CORD_MCAST_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ log.debug("This is not leader of : {}", CORD_MCAST_STATISTICS_LEADERSHIP);
+ return;
+ }
log.debug("pushing cord mcast event to kafka");
List<CordMcastStatistics> routeList = getMcastDetails();
routeList.forEach(mcastStats -> {
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
index 84eb986..acd85f3 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
@@ -27,6 +27,7 @@
import org.onlab.packet.VlanId;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.mcast.api.McastEvent;
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.McastRouteUpdate;
@@ -86,6 +87,8 @@
private void init(boolean vlanEnabled, VlanId egressVlan, VlanId egressInnerVlan) {
cordMcast = new CordMcast();
cordMcastStatisticsManager = new CordMcastStatisticsManager();
+ cordMcastStatisticsManager.clusterService = new ClusterServiceAdapter();
+ cordMcastStatisticsManager.leadershipService = new LeadershipServiceMcastAdapter();
cordMcast.coreService = new MockCoreService();
TestNetworkConfigRegistry testNetworkConfigRegistry = new TestNetworkConfigRegistry();
@@ -154,23 +157,23 @@
// ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
- assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+ assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
// Output port number will be PORT_B i.e. 16
Collection<TrafficTreatment> traffictreatMentCollection =
nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
+ assertEquals(1, traffictreatMentCollection.size());
OutputInstruction output = null;
for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
output = outputPort(trafficTreatment);
}
assertNotNull(output);
- assertTrue(PORT_B == output.port());
+ assertEquals(PORT_B, output.port());
// Checking the group ip address
TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
IPCriterion ipCriterion = ipAddress(trafficSelector);
assertNotNull(ipCriterion);
- assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+ assertEquals(MULTICAST_IP, ipCriterion.ip().address());
//checking the vlan criterion
TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
VlanIdCriterion vlanId = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -198,23 +201,23 @@
}
// ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
- assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+ assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
// Output port number will be PORT_B i.e. 16
Collection<TrafficTreatment> traffictreatMentCollection =
nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
+ assertEquals(1, traffictreatMentCollection.size());
OutputInstruction output = null;
for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
output = outputPort(trafficTreatment);
}
assertNotNull(output);
- assertTrue(PORT_B == output.port());
+ assertEquals(PORT_B, output.port());
// Checking the group ip address
TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
IPCriterion ipCriterion = ipAddress(trafficSelector);
assertNotNull(ipCriterion);
- assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+ assertEquals(MULTICAST_IP, ipCriterion.ip().address());
//checking the vlan criteria
TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
VlanIdCriterion vlanIdCriterion = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -244,23 +247,23 @@
// ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
- assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+ assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
// Output port number will be PORT_B i.e. 16
Collection<TrafficTreatment> traffictreatMentCollection =
nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
+ assertEquals(1, traffictreatMentCollection.size());
OutputInstruction output = null;
for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
output = outputPort(trafficTreatment);
}
assertNotNull(output);
- assertTrue(PORT_B == output.port());
+ assertEquals(PORT_B, output.port());
// Checking the group ip address
TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
IPCriterion ipCriterion = ipAddress(trafficSelector);
assertNotNull(ipCriterion);
- assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+ assertEquals(MULTICAST_IP, ipCriterion.ip().address());
//checking the vlan criteria
TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
VlanIdCriterion vlanIdCriterion = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -289,16 +292,16 @@
// NextMap will contain the operation "ADD_TO_EXISTING" in the DefaultNextObjective.
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD_TO_EXISTING));
+ assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD_TO_EXISTING));
// Output port number will be changed to 24 i.e. PORT_C
Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
+ assertEquals(1, traffictreatMentCollection.size());
OutputInstruction output = null;
for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
output = outputPort(trafficTreatment);
}
assertNotNull(output);
- assertTrue(PORT_C == output.port());
+ assertEquals(PORT_C, output.port());
}
@Test
@@ -316,19 +319,19 @@
cordMcast.listener.event(event);
// Operation will be REMOVE_FROM_EXISTING and nextMap will be updated. ( None --> CP_C)
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
+ assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.REMOVE_FROM_EXISTING));
// Output port number will be PORT_B i.e. 16
// Port_B is removed from the group.
Collection<TrafficTreatment> traffictreatMentCollection =
nextMap.get(DEVICE_ID_OF_A).next();
- assertTrue(1 == traffictreatMentCollection.size());
+ assertEquals(1, traffictreatMentCollection.size());
OutputInstruction output = null;
for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
output = outputPort(trafficTreatment);
}
assertNotNull(output);
- assertTrue(PORT_B == output.port());
+ assertEquals(PORT_B, output.port());
}
@@ -348,7 +351,7 @@
// Operation will be REMOVE and nextMap will be updated. None --> { }
assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
- assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE));
+ assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.REMOVE));
}
@Test
@@ -374,8 +377,8 @@
cordMcast.listener.event(event);
// OltInfo flag is set to true when olt device is unkown
assertAfter(WAIT, WAIT * 2, () -> assertTrue(knownOltFlag));
- assertTrue(0 == forwardMap.size());
- assertTrue(0 == nextMap.size());
+ assertEquals(0, forwardMap.size());
+ assertEquals(0, nextMap.size());
}
@@ -389,8 +392,8 @@
McastEvent event = new McastEvent(McastEvent.Type.ROUTE_ADDED, previousSubject, currentSubject);
cordMcast.listener.event(event);
// There will be no forwarding objective
- assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
- assertTrue(0 == nextMap.size());
+ assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+ assertEquals(0, nextMap.size());
}
@@ -407,8 +410,8 @@
McastEvent event = new McastEvent(McastEvent.Type.ROUTE_REMOVED, previousSubject, currentSubject);
cordMcast.listener.event(event);
// There will be no forwarding objective
- assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
- assertTrue(0 == nextMap.size());
+ assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+ assertEquals(0, nextMap.size());
}
@@ -425,8 +428,8 @@
McastEvent event = new McastEvent(McastEvent.Type.SOURCES_ADDED, previousSubject, currentSubject);
cordMcast.listener.event(event);
// There will be no forwarding objective
- assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
- assertTrue(0 == nextMap.size());
+ assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+ assertEquals(0, nextMap.size());
}
@@ -442,8 +445,8 @@
// Warning message of unknown event will be displayed.
McastEvent event = new McastEvent(McastEvent.Type.SOURCES_REMOVED, previousSubject, currentSubject);
cordMcast.listener.event(event);
- assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
- assertTrue(0 == nextMap.size());
+ assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+ assertEquals(0, nextMap.size());
}
@Test
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
index 9271976..758be58 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
@@ -28,6 +28,8 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.event.DefaultEventSinkRegistry;
@@ -151,6 +153,13 @@
}
}
+ class LeadershipServiceMcastAdapter extends LeadershipServiceAdapter {
+ @Override
+ public NodeId getLeader(String path) {
+ return NodeId.nodeId("local");
+ }
+ }
+
protected class MockSadisService implements SadisService {
@Override