Aggregate stats from all cluster nodes and publish

Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2d17579..c34d817 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -30,7 +30,6 @@
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
@@ -76,6 +75,7 @@
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 import org.opencord.dhcpl2relay.DhcpL2RelayListener;
 import org.opencord.dhcpl2relay.DhcpL2RelayService;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -94,7 +94,6 @@
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.time.Instant;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.List;
@@ -105,25 +104,17 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
 import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -132,8 +123,6 @@
 property = {
         OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
         ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
-        PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
-        DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
 })
 public class DhcpL2Relay
         extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -203,12 +192,6 @@
     /** Ask the DHCP Server to send back replies as L2 broadcast. */
     protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 
-    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
-    private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
-
-
-    protected PublishCountersToKafka publishCountersToKafka;
-
     ScheduledFuture<?> refreshTask;
     ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
 
@@ -231,6 +214,8 @@
 
     private BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
+    private DhcpL2RelayStoreDelegate delegate = new InnerDhcpL2RelayStoreDelegate();
+
     @Activate
     protected void activate(ComponentContext context) {
         //start the dhcp relay agent
@@ -256,6 +241,8 @@
 
         leadershipService.runForLeadership(LEADER_TOPIC);
 
+        dhcpL2RelayCounters.setDelegate(delegate);
+
         cfgService.addListener(cfgListener);
         mastershipService.addListener(changeListener);
         deviceService.addListener(deviceListener);
@@ -272,9 +259,6 @@
             modified(context);
         }
 
-        publishCountersToKafka = new PublishCountersToKafka();
-        restartPublishCountersTask();
-
         log.info("DHCP-L2-RELAY Started");
     }
 
@@ -286,6 +270,7 @@
         if (refreshService != null) {
             refreshService.shutdownNow();
         }
+        dhcpL2RelayCounters.unsetDelegate(delegate);
         cfgService.removeListener(cfgListener);
         factories.forEach(cfgService::unregisterConfigFactory);
         packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -313,41 +298,6 @@
         if (o != null) {
             enableDhcpBroadcastReplies = o;
         }
-
-        Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
-        if (newPublishCountersRate != null) {
-            if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
-                log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
-                publishCountersRate = newPublishCountersRate;
-            } else if (newPublishCountersRate < 0) {
-                log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
-                publishCountersRate = 0;
-            }
-            restartPublishCountersTask();
-        }
-
-        String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
-        if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
-            log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
-            dhcpCountersTopic = newDhcpCountersTopic;
-        }
-    }
-
-    /**
-     * Starts a thread to publish the counters to kafka at a certain rate time.
-     */
-    private void restartPublishCountersTask() {
-        if (refreshTask != null) {
-            refreshTask.cancel(true);
-        }
-        if (publishCountersRate > 0) {
-            log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
-                    publishCountersRate, publishCountersRate);
-            refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
-                    publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
-        } else {
-            log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
-        }
     }
 
     @Override
@@ -356,27 +306,6 @@
     }
 
     /**
-     * Publish the counters to kafka.
-     */
-    private class PublishCountersToKafka implements Runnable {
-        public void run() {
-            dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
-                // Publish the global counters
-                if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
-                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
-                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
-                                    counterValue), dhcpCountersTopic, null));
-                } else { // Publish the counters per subscriber
-                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
-                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
-                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
-                                    counterValue), dhcpCountersTopic, counterKey.counterClassKey));
-                }
-            });
-        }
-    }
-
-    /**
      * Checks if this app has been configured.
      *
      * @return true if all information we need have been initialized
@@ -707,7 +636,7 @@
                 packetService.emit(o);
 
                 SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
-                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
+                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_TO_SERVER"));
             } else {
                 log.error("No connect point to send msg to DHCP Server");
             }
@@ -726,7 +655,7 @@
         }
 
         private void  updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
-                                                   DhcpL2RelayCounters counterType) {
+                                                   DhcpL2RelayCounterNames counterType) {
             // Update global counter stats
             dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
             if (entry == null) {
@@ -796,7 +725,7 @@
                         forwardPacket(ethernetPacketDiscover, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"));
                     break;
                 case DHCPOFFER:
                     //reply to dhcp client.
@@ -806,7 +735,7 @@
                         sendReply(ethernetPacketOffer, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPOFFER"));
                     break;
                 case DHCPREQUEST:
                     Ethernet ethernetPacketRequest =
@@ -815,7 +744,7 @@
                         forwardPacket(ethernetPacketRequest, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPREQUEST"));
                     break;
                 case DHCPACK:
                     //reply to dhcp client.
@@ -825,7 +754,7 @@
                         sendReply(ethernetPacketAck, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPACK"));
                     break;
                 case DHCPDECLINE:
                     Ethernet ethernetPacketDecline =
@@ -834,7 +763,7 @@
                         forwardPacket(ethernetPacketDecline, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDECLINE"));
                     break;
                 case DHCPNAK:
                     //reply to dhcp client.
@@ -844,7 +773,7 @@
                         sendReply(ethernetPacketNak, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPNACK"));
                     break;
                 case DHCPRELEASE:
                     Ethernet ethernetPacketRelease =
@@ -853,7 +782,7 @@
                         forwardPacket(ethernetPacketRelease, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPRELEASE"));
                     break;
                 default:
                     break;
@@ -975,7 +904,7 @@
                 return null;
             }
 
-            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_FROM_SERVER"));
 
             // we leave the srcMac from the original packet
             etherReply.setQinQVID(VlanId.NO_VID);
@@ -1216,4 +1145,21 @@
             }
         }
     }
+
+    private class InnerDhcpL2RelayStoreDelegate implements DhcpL2RelayStoreDelegate {
+        @Override
+        public void notify(DhcpL2RelayEvent event) {
+            if (event.type().equals(DhcpL2RelayEvent.Type.STATS_UPDATE)) {
+                DhcpL2RelayEvent toPost = event;
+                if (event.getSubscriberId() != null) {
+                    // infuse the event with the allocation info before posting
+                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(event.getSubscriberId()));
+                    toPost = new DhcpL2RelayEvent(event.type(), info, event.connectPoint(),
+                            event.getCountersEntry(), event.getSubscriberId());
+                }
+                post(toPost);
+            }
+
+        }
+    }
 }