SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS

Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index b985b14..97e97cc 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -15,10 +15,14 @@
  */
 package org.opencord.igmpproxy.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onosproject.net.Device;
+import org.opencord.igmpproxy.IgmpLeadershipService;
 import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -114,7 +118,6 @@
     private static final Class<McastConfig> MCAST_CONFIG_CLASS =
             McastConfig.class;
 
-    public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
     private static ApplicationId appId;
 
     private static int unSolicitedTimeout = 3; // unit is 1 sec
@@ -176,6 +179,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected IgmpStatisticsService igmpStatisticsManager;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected GroupMemberStore groupMemberStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StateMachineService stateMachineService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService igmpLeadershipService;
+
     private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
@@ -217,15 +229,15 @@
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
-    private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE,  MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
-                              CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
+    private List<Byte> validMembershipModes = Arrays.asList(MODE_IS_INCLUDE, MODE_IS_EXCLUDE, CHANGE_TO_INCLUDE_MODE,
+            CHANGE_TO_EXCLUDE_MODE, ALLOW_NEW_SOURCES, BLOCK_OLD_SOURCES);
 
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(APP_NAME);
         coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
         packetService.addProcessor(processor, PacketProcessor.director(4));
-        IgmpSender.init(packetService, mastershipService, igmpStatisticsManager);
+        IgmpSender.init(packetService, igmpLeadershipService, igmpStatisticsManager);
 
         networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
         networkConfig.registerConfigFactory(igmpproxyConfigFactory);
@@ -249,7 +261,7 @@
         deviceService.addListener(deviceListener);
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
-                                                                        "events-igmp-%d", log));
+                "events-igmp-%d", log));
         log.info("Started");
     }
 
@@ -285,10 +297,10 @@
         Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
         maxResp = calculateMaxResp(maxResp);
         if (gAddr != null && !gAddr.isZero()) {
-            StateMachine.specialQuery(deviceId, gAddr, maxResp);
+            stateMachineService.specialQuery(deviceId, gAddr, maxResp);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
         } else {
-            StateMachine.generalQuery(deviceId, maxResp);
+            stateMachineService.generalQuery(deviceId, maxResp);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
         }
     }
@@ -303,7 +315,7 @@
             deviceService.getAvailableDevices().forEach(device -> {
                 Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
                 if (accessDevice.isPresent()) {
-                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                    stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
                     igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
                 }
             });
@@ -311,7 +323,7 @@
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
-            StateMachine.generalQuery(maxResponseTime);
+            stateMachineService.generalQuery(maxResponseTime);
             igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
         }
     }
@@ -367,7 +379,7 @@
             if (pimSSmInterworking) {
                 src = ssmTranslateRoute(groupIp);
                 if (src == null) {
-                    log.info("no ssm translate for group " + groupIp.toString());
+                    log.info("no ssm translate for group {}", groupIp);
                     return;
                 }
             } else {
@@ -384,8 +396,8 @@
                 join = false;
             }
         }
-        String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
-        GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+        GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+        GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
 
         if (join) {
             igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
@@ -394,7 +406,7 @@
                 if (!sourceConfigured.isPresent()) {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                     log.warn("Unable to process IGMP Join from {} since no source " +
-                                     "configuration is found.", deviceId);
+                            "configuration is found.", deviceId);
                     igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
                     return;
                 }
@@ -402,7 +414,7 @@
                 Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
                 if (deviceUplink.isEmpty()) {
                     log.warn("Unable to process IGMP Join since uplink port " +
-                     "of the device {} is not found.", deviceId);
+                            "of the device {} is not found.", deviceId);
                     return;
                 }
 
@@ -414,14 +426,15 @@
 
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 if (isJoined) {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
                     igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
                 } else {
                     igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                 }
-                groupMemberMap.put(groupMemberKey, groupMember);
+                groupMemberStore.putGroupMember(groupMember);
+                log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
                     McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
@@ -438,11 +451,13 @@
             groupMember.resetAllTimers();
             groupMember.updateList(recordType, sourceList);
             groupMember.setLeave(false);
+            //put updated member to the store
+            groupMemberStore.putGroupMember(groupMember);
         } else {
             igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
             if (groupMember == null) {
-                log.info("receive leave but no instance, group " + groupIp.toString() +
-                        " device:" + deviceId.toString() + " port:" + portNumber.toString());
+                log.info("receive leave but no instance, group {} device: {} port:{}",
+                        groupIp, deviceId, portNumber);
                 igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
                 return;
             } else {
@@ -451,6 +466,8 @@
                     leaveAction(groupMember);
                 } else {
                     sendQuery(groupMember);
+                    //put modified group member object to the store
+                    groupMemberStore.updateGroupMember(groupMember);
                 }
             }
         }
@@ -459,11 +476,11 @@
     private void leaveAction(GroupMember groupMember) {
         igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
-        StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+        stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
         groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
                 new McastRoute(source, groupMember.getGroupIp(),
-                               McastRoute.Type.IGMP), Sets.newHashSet(cp)));
-        groupMemberMap.remove(groupMember.getId());
+                        McastRoute.Type.IGMP), Sets.newHashSet(cp)));
+        groupMemberStore.removeGroupMember(groupMember.getGroupMemberId());
     }
 
     private void sendQuery(GroupMember groupMember) {
@@ -517,7 +534,7 @@
 
                     if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
                             !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
-                        log.error("Device not registered in netcfg :" + deviceId.toString());
+                        log.error("Device not registered in netcfg : {}", deviceId);
                         igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
                         return;
                     }
@@ -525,7 +542,7 @@
                     IGMP igmp = (IGMP) ipv4Pkt.getPayload();
 
                     Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
-                    PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+                    PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
                     switch (igmp.getIgmpType()) {
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
                             igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
@@ -535,8 +552,8 @@
                                     log.info("IGMP Picked up query from connectPoint");
                                     //OK to process packet
                                     processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
-                                                                 pkt.receivedFrom(),
-                                                                 0xff & igmp.getMaxRespField());
+                                            pkt.receivedFrom(),
+                                            0xff & igmp.getMaxRespField());
                                     break;
                                 } else {
                                     //Not OK to process packet
@@ -546,7 +563,7 @@
                             }
 
                             processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
-                                             0xff & igmp.getMaxRespField());
+                                    0xff & igmp.getMaxRespField());
                             break;
                         case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
                             igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
@@ -566,14 +583,14 @@
                             break;
 
                         default:
-                            log.warn("Unknown IGMP message type:" + igmp.getIgmpType());
+                            log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
                             igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
                             igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
                             break;
                     }
 
                 } catch (Exception ex) {
-                    log.error("igmp process error : {} ", ex);
+                    log.error("igmp process error : ", ex);
                 }
             });
         }
@@ -592,14 +609,14 @@
             IGMPGroup group = itr.next();
             if (group instanceof IGMPMembership) {
                 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                  pkt.receivedFrom(), igmp.getIgmpType());
+                        pkt.receivedFrom(), igmp.getIgmpType());
             } else {
                 IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
                 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                             IGMPMembership.MODE_IS_EXCLUDE :
-                                             IGMPMembership.MODE_IS_INCLUDE);
+                        IGMPMembership.MODE_IS_EXCLUDE :
+                        IGMPMembership.MODE_IS_INCLUDE);
                 processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                  pkt.receivedFrom(), igmp.getIgmpType());
+                        pkt.receivedFrom(), igmp.getIgmpType());
             }
         }
 
@@ -608,7 +625,7 @@
     private class IgmpProxyTimerTask extends TimerTask {
         public void run() {
             try {
-                IgmpTimer.timeOut1s();
+                stateMachineService.timeOut1s();
                 queryMembers();
             } catch (Exception ex) {
                 log.warn("Igmp timer task error : {}", ex.getMessage());
@@ -617,13 +634,14 @@
 
         private void queryMembers() {
             GroupMember groupMember;
-            Set groupMemberSet = groupMemberMap.entrySet();
-            Iterator itr = groupMemberSet.iterator();
-            while (itr.hasNext()) {
-                Map.Entry entry = (Map.Entry) itr.next();
-                groupMember = (GroupMember) entry.getValue();
+            Set<GroupMemberId> keySet = groupMemberStore.getAllGroupMemberIds();
+            for (GroupMemberId key : keySet) {
+                groupMember = groupMemberStore.getGroupMember(key);
+                if (groupMember == null) {
+                    continue;
+                }
                 DeviceId did = groupMember.getDeviceId();
-                if (mastershipService.isLocalMaster(did)) {
+                if (igmpLeadershipService.isLocalLeader(did)) {
                     if (groupMember.isLeave()) {
                         lastQuery(groupMember);
                     } else if (periodicQuery) {
@@ -636,10 +654,14 @@
         private void lastQuery(GroupMember groupMember) {
             if (groupMember.getLastQueryInterval() < lastQueryInterval) {
                 groupMember.lastQueryInterval(true); // count times
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
                 sendQuery(groupMember);
                 groupMember.lastQueryInterval(false); // reset count number
                 groupMember.lastQueryCount(true); //count times
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
                 leaveAction(groupMember);
             }
@@ -648,10 +670,14 @@
         private void periodicQuery(GroupMember groupMember) {
             if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
                 groupMember.keepAliveInterval(true);
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
                 sendQuery(groupMember);
                 groupMember.keepAliveInterval(false);
                 groupMember.keepAliveQueryCount(true);
+                //put modified group member object to the store
+                groupMemberStore.updateGroupMember(groupMember);
             } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
                 leaveAction(groupMember);
             }
@@ -670,12 +696,11 @@
         }
         PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
         return validateUpLinkPort(device.id(), portNumber) ?
-                    Optional.of(portNumber) : Optional.empty();
+                Optional.of(portNumber) : Optional.empty();
     }
 
     /**
-     *
-     * @param deviceId device id
+     * @param deviceId   device id
      * @param portNumber port number
      * @return true if the port name starts with NNI_PREFIX; false otherwise.
      */
@@ -688,8 +713,8 @@
         boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
                 port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
         if (!isValid) {
-            log.warn("Port cannot be validated; it is not configured as an NNI port." +
-                    "Device/port: {}/{}", deviceId, portNumber);
+            log.warn("Port cannot be validated; it is not configured as an NNI port. Device/port: {}/{}",
+                    deviceId, portNumber);
         }
         return isValid;
     }
@@ -719,14 +744,14 @@
                     @Override
                     public void onSuccess(Objective objective) {
                         log.info("Igmp filter for {} on {} {}.",
-                                 devId, port, (remove) ? REMOVED : INSTALLED);
+                                devId, port, (remove) ? REMOVED : INSTALLED);
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
                         log.info("Igmp filter {} for device {} on port {} failed because of {}",
-                                 (remove) ? INSTALLATION : REMOVAL, devId, port,
-                                 error);
+                                (remove) ? INSTALLATION : REMOVAL, devId, port,
+                                error);
                     }
                 });
 
@@ -809,7 +834,7 @@
                 case PORT_ADDED:
                     port = p.number();
                     if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         processFilterObjective(devId, port, false);
                     } else if (isUplink(devId, port)) {
                         provisionUplinkFlows();
@@ -820,7 +845,7 @@
                 case PORT_UPDATED:
                     port = p.number();
                     if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         if (event.port().isEnabled()) {
                             processFilterObjective(devId, port, false);
                         } else {
@@ -950,7 +975,7 @@
                         if (config != null && mvlan != config.egressVlan().toShort()) {
                             mvlan = config.egressVlan().toShort();
                             IgmpSender.getInstance().setMvlan(mvlan);
-                            groupMemberMap.values().forEach(m -> leaveAction(m));
+                            groupMemberStore.getAllGroupMembers().forEach(m -> leaveAction(m));
                         }
                     }
 
@@ -1009,6 +1034,7 @@
 
         processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
     }
+
     private void unprovisionConnectPointFlows() {
         if (connectPoint == null) {
             return;