SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS
Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index b301f94..6283c08 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
package org.opencord.igmpproxy.impl;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
import org.onlab.packet.IGMPMembership;
@@ -59,6 +60,11 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketServiceAdapter;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.impl.store.groupmember.AbstractGroupMemberStore;
+import org.opencord.igmpproxy.impl.store.machine.AbstractStateMachineStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -78,6 +84,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
public class IgmpManagerBase {
@@ -96,7 +104,7 @@
// Common connect point of aggregation switch used by all devices.
protected static final ConnectPoint COMMON_CONNECT_POINT =
- ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
+ ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
// Uplink ports for two olts A and B
protected static final PortNumber PORT_A = PortNumber.portNumber(1);
protected static final PortNumber PORT_B = PortNumber.portNumber(2);
@@ -135,23 +143,24 @@
@Override
public Device getDevice(DeviceId deviceId) {
- if (flagForDevice) {
- DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
- .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
- SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
- Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
- flagForDevice = false;
- return deviceA;
+ if (flagForDevice) {
+ DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+ .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
+ SparseAnnotations annotations = annotationsBuilder.build();
+ Annotations[] da = {annotations};
+ Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
+ flagForDevice = false;
+ return deviceA;
} else {
- DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
- .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
- SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
- Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
- return deviceB;
- }
+ DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+ .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
+ SparseAnnotations annotations = annotationsBuilder.build();
+ Annotations[] da = {annotations};
+ Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
+ return deviceB;
+ }
}
+
@Override
public List<Port> getPorts(DeviceId deviceId) {
return lsPorts;
@@ -162,7 +171,7 @@
DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
.set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
SparseAnnotations annotations = annotationsBuilder.build();
- Annotations[] da = {annotations };
+ Annotations[] da = {annotations};
Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_C, Device.Type.OTHER, "", "", "", "", null, da);
lsDevices.add(deviceA);
return lsDevices;
@@ -184,31 +193,32 @@
static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
- new ConfigFactory<ApplicationId, IgmpproxyConfig>(
- SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
- @Override
- public IgmpproxyConfig createConfig() {
- return new IgmpproxyConfig();
- }
- };
+ new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+ @Override
+ public IgmpproxyConfig createConfig() {
+ return new IgmpproxyConfig();
+ }
+ };
ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
- new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
- SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+ new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
- @Override
- public IgmpproxySsmTranslateConfig createConfig() {
- return new IgmpproxySsmTranslateConfig();
- }
- };
+ @Override
+ public IgmpproxySsmTranslateConfig createConfig() {
+ return new IgmpproxySsmTranslateConfig();
+ }
+ };
class MockIgmpProxyConfig extends IgmpproxyConfig {
boolean igmpOnPodBasis = true;
MockIgmpProxyConfig(boolean igmpFlagValue) {
- igmpOnPodBasis = igmpFlagValue;
- }
+ igmpOnPodBasis = igmpFlagValue;
+ }
+
@Override
public boolean igmpOnPodBasis() {
return igmpOnPodBasis;
@@ -224,17 +234,18 @@
@Override
public ConnectPoint connectPoint() {
- return COMMON_CONNECT_POINT;
+ return COMMON_CONNECT_POINT;
}
}
- class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
- Boolean igmpOnPodFlag = false;
+ class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+ Boolean igmpOnPodFlag = false;
- TestNetworkConfigRegistry(Boolean igmpFlag) {
- igmpOnPodFlag = igmpFlag;
- }
+ TestNetworkConfigRegistry(Boolean igmpFlag) {
+ igmpOnPodFlag = igmpFlag;
+ }
+
@SuppressWarnings("unchecked")
@Override
public <S> Set<S> getSubjects(Class<S> subjectClass) {
@@ -242,19 +253,19 @@
return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
}
return null;
- }
+ }
- @SuppressWarnings("unchecked")
- @Override
- public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
- if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
- IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
- return (C) igmpproxyConfig;
- } else {
- super.getConfig(subject, configClass);
- }
- return null;
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+ if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
+ IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+ return (C) igmpproxyConfig;
+ } else {
+ super.getConfig(subject, configClass);
+ }
+ return null;
+ }
}
@@ -276,12 +287,18 @@
@Override
public void emit(OutboundPacket packet) {
synchronized (savedPackets) {
- savedPackets.add(packet);
- savedPackets.notify();
+ savedPackets.add(packet);
+ savedPackets.notify();
}
}
- }
+ }
+ class TestIgmpLeaderShipService implements IgmpLeadershipService {
+ @Override
+ public boolean isLocalLeader(DeviceId deviceId) {
+ return true;
+ }
+ }
class MockMastershipService extends MastershipServiceAdapter {
@Override
@@ -407,9 +424,9 @@
* Mocks the DefaultPacketContext.
*/
final class TestPacketContext extends DefaultPacketContext {
- TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
- super(time, inPkt, outPkt, block);
- }
+ TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
+ super(time, inPkt, outPkt, block);
+ }
@Override
public void send() {
@@ -423,31 +440,31 @@
* @param reply Ethernet packet
* @throws InterruptedException
*/
- void sendPacket(Ethernet reply) {
+ void sendPacket(Ethernet reply) {
- if (reply != null) {
- final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
+ if (reply != null) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
- if (flagForQueryPacket) {
- InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
- context = new TestPacketContext(127L, inBoundPacket, null, false);
- packetProcessor.process(context);
- } else {
- if (flagForPacket) {
- InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
- context = new TestPacketContext(127L, inPacket, null, false);
- flagForPacket = false;
+ if (flagForQueryPacket) {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
+ packetProcessor.process(context);
+ } else {
+ if (flagForPacket) {
+ InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+ context = new TestPacketContext(127L, inPacket, null, false);
+ flagForPacket = false;
- packetProcessor.process(context);
- } else {
- InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
- context = new TestPacketContext(127L, inBoundPacket, null, false);
- flagForPacket = true;
+ packetProcessor.process(context);
+ } else {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
+ flagForPacket = true;
- packetProcessor.process(context);
- }
- }
- }
+ packetProcessor.process(context);
+ }
+ }
+ }
}
protected class MockSadisService implements SadisService {
@@ -498,7 +515,7 @@
private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
MockSubscriberAndDeviceInformation sub =
new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
- CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+ CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
@Override
public SubscriberAndDeviceInformation get(String id) {
@@ -653,15 +670,15 @@
}
@Override
- public void disableComponent(String name) {
- // TODO Auto-generated method stub
- }
+ public void disableComponent(String name) {
+ // TODO Auto-generated method stub
+ }
- @Override
- public ServiceReference getServiceReference() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public ServiceReference getServiceReference() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Ethernet buildWrongIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -669,7 +686,7 @@
igmpMembership.setRecordType((byte) 0x33);
return IgmpSender.getInstance().buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp,
- igmpMembership, sourceIp, false);
+ igmpMembership, sourceIp, false);
}
Ethernet buildUnknownIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -679,4 +696,53 @@
return IgmpSender.getInstance().buildIgmpPacket((byte) 0x44, groupIp, igmpMembership, sourceIp, false);
}
+ class TestStateMachineStoreService extends AbstractStateMachineStore {
+ private static final int DEFAULT_COUNT = 0;
+ private Map<StateMachineId, AtomicLong> countsMap;
+
+ public TestStateMachineStoreService(Map<StateMachineId, StateMachine> map) {
+ super();
+ stateMachineMap = Maps.newConcurrentMap();
+ countsMap = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public long increaseAndGetCounter(StateMachineId stateMachineId) {
+ AtomicLong count = countsMap.get(stateMachineId);
+ if (count == null) {
+ count = new AtomicLong(DEFAULT_COUNT);
+ countsMap.put(stateMachineId, count);
+ }
+ return count.incrementAndGet();
+ }
+
+ @Override
+ public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+ AtomicLong count = countsMap.get(stateMachineId);
+ if (count.get() > 0) {
+ return count.decrementAndGet();
+ } else {
+ return count.get();
+ }
+ }
+
+ @Override
+ public boolean removeCounter(StateMachineId stateMachineId) {
+ countsMap.remove(stateMachineId);
+ return true;
+ }
+
+ @Override
+ public long getCounter(StateMachineId stateMachineId) {
+ return countsMap.get(stateMachineId).get();
+ }
+
+ }
+
+
+ class TestGroupMemberStoreService extends AbstractGroupMemberStore {
+ public TestGroupMemberStoreService() {
+ super(Maps.newConcurrentMap());
+ }
+ }
}