SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS
Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index b985b14..97e97cc 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -15,10 +15,14 @@
*/
package org.opencord.igmpproxy.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.net.Device;
+import org.opencord.igmpproxy.IgmpLeadershipService;
import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -114,7 +118,6 @@
private static final Class<McastConfig> MCAST_CONFIG_CLASS =
McastConfig.class;
- public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
private static ApplicationId appId;
private static int unSolicitedTimeout = 3; // unit is 1 sec
@@ -176,6 +179,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected IgmpStatisticsService igmpStatisticsManager;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected GroupMemberStore groupMemberStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StateMachineService stateMachineService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpLeadershipService igmpLeadershipService;
+
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
@@ -217,15 +229,15 @@
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
- CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
+ private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
+ CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
@Activate
protected void activate() {
appId = coreService.registerApplication(APP_NAME);
coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
packetService.addProcessor(processor, PacketProcessor.director(4));
- IgmpSender.init(packetService, mastershipService, igmpStatisticsManager);
+ IgmpSender.init(packetService, igmpLeadershipService, igmpStatisticsManager);
networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
networkConfig.registerConfigFactory(igmpproxyConfigFactory);
@@ -249,7 +261,7 @@
deviceService.addListener(deviceListener);
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
- "events-igmp-%d", log));
+ "events-igmp-%d", log));
log.info("Started");
}
@@ -285,10 +297,10 @@
Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
maxResp = calculateMaxResp(maxResp);
if (gAddr != null && !gAddr.isZero()) {
- StateMachine.specialQuery(deviceId, gAddr, maxResp);
+ stateMachineService.specialQuery(deviceId, gAddr, maxResp);
igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
} else {
- StateMachine.generalQuery(deviceId, maxResp);
+ stateMachineService.generalQuery(deviceId, maxResp);
igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
}
}
@@ -303,7 +315,7 @@
deviceService.getAvailableDevices().forEach(device -> {
Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
if (accessDevice.isPresent()) {
- StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+ stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
}
});
@@ -311,7 +323,7 @@
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
- StateMachine.generalQuery(maxResponseTime);
+ stateMachineService.generalQuery(maxResponseTime);
igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
}
}
@@ -367,7 +379,7 @@
if (pimSSmInterworking) {
src = ssmTranslateRoute(groupIp);
if (src == null) {
- log.info("no ssm translate for group " + groupIp.toString());
+ log.info("no ssm translate for group {}", groupIp);
return;
}
} else {
@@ -384,8 +396,8 @@
join = false;
}
}
- String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
- GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+ GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+ GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
if (join) {
igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
@@ -394,7 +406,7 @@
if (!sourceConfigured.isPresent()) {
igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
log.warn("Unable to process IGMP Join from {} since no source " +
- "configuration is found.", deviceId);
+ "configuration is found.", deviceId);
igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
return;
}
@@ -402,7 +414,7 @@
Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
if (deviceUplink.isEmpty()) {
log.warn("Unable to process IGMP Join since uplink port " +
- "of the device {} is not found.", deviceId);
+ "of the device {} is not found.", deviceId);
return;
}
@@ -414,14 +426,15 @@
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
if (isJoined) {
igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
} else {
igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
}
- groupMemberMap.put(groupMemberKey, groupMember);
+ groupMemberStore.putGroupMember(groupMember);
+ log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
@@ -438,11 +451,13 @@
groupMember.resetAllTimers();
groupMember.updateList(recordType, sourceList);
groupMember.setLeave(false);
+ //put updated member to the store
+ groupMemberStore.putGroupMember(groupMember);
} else {
igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
if (groupMember == null) {
- log.info("receive leave but no instance, group " + groupIp.toString() +
- " device:" + deviceId.toString() + " port:" + portNumber.toString());
+ log.info("receive leave but no instance, group {} device: {} port:{}",
+ groupIp, deviceId, portNumber);
igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
return;
} else {
@@ -451,6 +466,8 @@
leaveAction(groupMember);
} else {
sendQuery(groupMember);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
}
}
}
@@ -459,11 +476,11 @@
private void leaveAction(GroupMember groupMember) {
igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
- StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+ stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
new McastRoute(source, groupMember.getGroupIp(),
- McastRoute.Type.IGMP), Sets.newHashSet(cp)));
- groupMemberMap.remove(groupMember.getId());
+ McastRoute.Type.IGMP), Sets.newHashSet(cp)));
+ groupMemberStore.removeGroupMember(groupMember.getGroupMemberId());
}
private void sendQuery(GroupMember groupMember) {
@@ -517,7 +534,7 @@
if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
!getSubscriberAndDeviceInformation(deviceId).isPresent()) {
- log.error("Device not registered in netcfg :" + deviceId.toString());
+ log.error("Device not registered in netcfg : {}", deviceId);
igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
return;
}
@@ -525,7 +542,7 @@
IGMP igmp = (IGMP) ipv4Pkt.getPayload();
Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
- PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+ PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
switch (igmp.getIgmpType()) {
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
@@ -535,8 +552,8 @@
log.info("IGMP Picked up query from connectPoint");
//OK to process packet
processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
- pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
break;
} else {
//Not OK to process packet
@@ -546,7 +563,7 @@
}
processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ 0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
@@ -566,14 +583,14 @@
break;
default:
- log.warn("Unknown IGMP message type:" + igmp.getIgmpType());
+ log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
break;
}
} catch (Exception ex) {
- log.error("igmp process error : {} ", ex);
+ log.error("igmp process error : ", ex);
}
});
}
@@ -592,14 +609,14 @@
IGMPGroup group = itr.next();
if (group instanceof IGMPMembership) {
processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
} else {
IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE :
- IGMPMembership.MODE_IS_INCLUDE);
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
}
}
@@ -608,7 +625,7 @@
private class IgmpProxyTimerTask extends TimerTask {
public void run() {
try {
- IgmpTimer.timeOut1s();
+ stateMachineService.timeOut1s();
queryMembers();
} catch (Exception ex) {
log.warn("Igmp timer task error : {}", ex.getMessage());
@@ -617,13 +634,14 @@
private void queryMembers() {
GroupMember groupMember;
- Set groupMemberSet = groupMemberMap.entrySet();
- Iterator itr = groupMemberSet.iterator();
- while (itr.hasNext()) {
- Map.Entry entry = (Map.Entry) itr.next();
- groupMember = (GroupMember) entry.getValue();
+ Set<GroupMemberId> keySet = groupMemberStore.getAllGroupMemberIds();
+ for (GroupMemberId key : keySet) {
+ groupMember = groupMemberStore.getGroupMember(key);
+ if (groupMember == null) {
+ continue;
+ }
DeviceId did = groupMember.getDeviceId();
- if (mastershipService.isLocalMaster(did)) {
+ if (igmpLeadershipService.isLocalLeader(did)) {
if (groupMember.isLeave()) {
lastQuery(groupMember);
} else if (periodicQuery) {
@@ -636,10 +654,14 @@
private void lastQuery(GroupMember groupMember) {
if (groupMember.getLastQueryInterval() < lastQueryInterval) {
groupMember.lastQueryInterval(true); // count times
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
sendQuery(groupMember);
groupMember.lastQueryInterval(false); // reset count number
groupMember.lastQueryCount(true); //count times
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
leaveAction(groupMember);
}
@@ -648,10 +670,14 @@
private void periodicQuery(GroupMember groupMember) {
if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
groupMember.keepAliveInterval(true);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
sendQuery(groupMember);
groupMember.keepAliveInterval(false);
groupMember.keepAliveQueryCount(true);
+ //put modified group member object to the store
+ groupMemberStore.updateGroupMember(groupMember);
} else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
leaveAction(groupMember);
}
@@ -670,12 +696,11 @@
}
PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
return validateUpLinkPort(device.id(), portNumber) ?
- Optional.of(portNumber) : Optional.empty();
+ Optional.of(portNumber) : Optional.empty();
}
/**
- *
- * @param deviceId device id
+ * @param deviceId device id
* @param portNumber port number
* @return true if the port name starts with NNI_PREFIX; false otherwise.
*/
@@ -688,8 +713,8 @@
boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
if (!isValid) {
- log.warn("Port cannot be validated; it is not configured as an NNI port." +
- "Device/port: {}/{}", deviceId, portNumber);
+ log.warn("Port cannot be validated; it is not configured as an NNI port. Device/port: {}/{}",
+ deviceId, portNumber);
}
return isValid;
}
@@ -719,14 +744,14 @@
@Override
public void onSuccess(Objective objective) {
log.info("Igmp filter for {} on {} {}.",
- devId, port, (remove) ? REMOVED : INSTALLED);
+ devId, port, (remove) ? REMOVED : INSTALLED);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.info("Igmp filter {} for device {} on port {} failed because of {}",
- (remove) ? INSTALLATION : REMOVAL, devId, port,
- error);
+ (remove) ? INSTALLATION : REMOVAL, devId, port,
+ error);
}
});
@@ -809,7 +834,7 @@
case PORT_ADDED:
port = p.number();
if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
} else if (isUplink(devId, port)) {
provisionUplinkFlows();
@@ -820,7 +845,7 @@
case PORT_UPDATED:
port = p.number();
if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
if (event.port().isEnabled()) {
processFilterObjective(devId, port, false);
} else {
@@ -950,7 +975,7 @@
if (config != null && mvlan != config.egressVlan().toShort()) {
mvlan = config.egressVlan().toShort();
IgmpSender.getInstance().setMvlan(mvlan);
- groupMemberMap.values().forEach(m -> leaveAction(m));
+ groupMemberStore.getAllGroupMembers().forEach(m -> leaveAction(m));
}
}
@@ -1009,6 +1034,7 @@
processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
}
+
private void unprovisionConnectPointFlows() {
if (connectPoint == null) {
return;