Aggregate stats from all cluster nodes and publish
Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2d17579..c34d817 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -30,7 +30,6 @@
import org.onlab.packet.VlanId;
import org.onlab.packet.dhcp.DhcpOption;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.SafeRecurringTask;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
@@ -76,6 +75,7 @@
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
import org.opencord.dhcpl2relay.DhcpL2RelayService;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -94,7 +94,6 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.List;
@@ -105,25 +104,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
/**
* DHCP Relay Agent Application Component.
@@ -132,8 +123,6 @@
property = {
OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
- PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
- DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
})
public class DhcpL2Relay
extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -203,12 +192,6 @@
/** Ask the DHCP Server to send back replies as L2 broadcast. */
protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
- protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
- private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
-
-
- protected PublishCountersToKafka publishCountersToKafka;
-
ScheduledFuture<?> refreshTask;
ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
@@ -231,6 +214,8 @@
private BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private DhcpL2RelayStoreDelegate delegate = new InnerDhcpL2RelayStoreDelegate();
+
@Activate
protected void activate(ComponentContext context) {
//start the dhcp relay agent
@@ -256,6 +241,8 @@
leadershipService.runForLeadership(LEADER_TOPIC);
+ dhcpL2RelayCounters.setDelegate(delegate);
+
cfgService.addListener(cfgListener);
mastershipService.addListener(changeListener);
deviceService.addListener(deviceListener);
@@ -272,9 +259,6 @@
modified(context);
}
- publishCountersToKafka = new PublishCountersToKafka();
- restartPublishCountersTask();
-
log.info("DHCP-L2-RELAY Started");
}
@@ -286,6 +270,7 @@
if (refreshService != null) {
refreshService.shutdownNow();
}
+ dhcpL2RelayCounters.unsetDelegate(delegate);
cfgService.removeListener(cfgListener);
factories.forEach(cfgService::unregisterConfigFactory);
packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -313,41 +298,6 @@
if (o != null) {
enableDhcpBroadcastReplies = o;
}
-
- Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
- if (newPublishCountersRate != null) {
- if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
- log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
- publishCountersRate = newPublishCountersRate;
- } else if (newPublishCountersRate < 0) {
- log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
- publishCountersRate = 0;
- }
- restartPublishCountersTask();
- }
-
- String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
- if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
- log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
- dhcpCountersTopic = newDhcpCountersTopic;
- }
- }
-
- /**
- * Starts a thread to publish the counters to kafka at a certain rate time.
- */
- private void restartPublishCountersTask() {
- if (refreshTask != null) {
- refreshTask.cancel(true);
- }
- if (publishCountersRate > 0) {
- log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
- publishCountersRate, publishCountersRate);
- refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
- publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
- } else {
- log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
- }
}
@Override
@@ -356,27 +306,6 @@
}
/**
- * Publish the counters to kafka.
- */
- private class PublishCountersToKafka implements Runnable {
- public void run() {
- dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
- // Publish the global counters
- if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
- post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
- new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
- counterValue), dhcpCountersTopic, null));
- } else { // Publish the counters per subscriber
- DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
- post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
- new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
- counterValue), dhcpCountersTopic, counterKey.counterClassKey));
- }
- });
- }
- }
-
- /**
* Checks if this app has been configured.
*
* @return true if all information we need have been initialized
@@ -707,7 +636,7 @@
packetService.emit(o);
SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_TO_SERVER"));
} else {
log.error("No connect point to send msg to DHCP Server");
}
@@ -726,7 +655,7 @@
}
private void updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
- DhcpL2RelayCounters counterType) {
+ DhcpL2RelayCounterNames counterType) {
// Update global counter stats
dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
if (entry == null) {
@@ -796,7 +725,7 @@
forwardPacket(ethernetPacketDiscover, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"));
break;
case DHCPOFFER:
//reply to dhcp client.
@@ -806,7 +735,7 @@
sendReply(ethernetPacketOffer, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPOFFER"));
break;
case DHCPREQUEST:
Ethernet ethernetPacketRequest =
@@ -815,7 +744,7 @@
forwardPacket(ethernetPacketRequest, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPREQUEST"));
break;
case DHCPACK:
//reply to dhcp client.
@@ -825,7 +754,7 @@
sendReply(ethernetPacketAck, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPACK"));
break;
case DHCPDECLINE:
Ethernet ethernetPacketDecline =
@@ -834,7 +763,7 @@
forwardPacket(ethernetPacketDecline, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDECLINE"));
break;
case DHCPNAK:
//reply to dhcp client.
@@ -844,7 +773,7 @@
sendReply(ethernetPacketNak, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPNACK"));
break;
case DHCPRELEASE:
Ethernet ethernetPacketRelease =
@@ -853,7 +782,7 @@
forwardPacket(ethernetPacketRelease, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPRELEASE"));
break;
default:
break;
@@ -975,7 +904,7 @@
return null;
}
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_FROM_SERVER"));
// we leave the srcMac from the original packet
etherReply.setQinQVID(VlanId.NO_VID);
@@ -1216,4 +1145,21 @@
}
}
}
+
+ private class InnerDhcpL2RelayStoreDelegate implements DhcpL2RelayStoreDelegate {
+ @Override
+ public void notify(DhcpL2RelayEvent event) {
+ if (event.type().equals(DhcpL2RelayEvent.Type.STATS_UPDATE)) {
+ DhcpL2RelayEvent toPost = event;
+ if (event.getSubscriberId() != null) {
+ // infuse the event with the allocation info before posting
+ DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(event.getSubscriberId()));
+ toPost = new DhcpL2RelayEvent(event.type(), info, event.connectPoint(),
+ event.getCountersEntry(), event.getSubscriberId());
+ }
+ post(toPost);
+ }
+
+ }
+ }
}