Aggregate stats from all cluster nodes and publish
Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
index 596fb6c..a143681 100644
--- a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
@@ -33,8 +33,6 @@
private final Map.Entry<String, AtomicLong> countersEntry;
- private final String dhcpCountersTopic;
-
private final String subscriberId;
/**
@@ -64,16 +62,13 @@
* @param allocationInfo DHCP allocation info
* @param connectPoint connect point the client is on
* @param countersEntry an entry that represents the counters (used for STATS events)
- * @param dhcpCountersTopic Kafka topic where the dhcp counters are published
* @param subscriberId the subscriber identifier information
*/
public DhcpL2RelayEvent(Type type, DhcpAllocationInfo allocationInfo, ConnectPoint connectPoint,
- Map.Entry<String, AtomicLong> countersEntry,
- String dhcpCountersTopic, String subscriberId) {
+ Map.Entry<String, AtomicLong> countersEntry, String subscriberId) {
super(type, allocationInfo);
this.connectPoint = connectPoint;
this.countersEntry = countersEntry;
- this.dhcpCountersTopic = dhcpCountersTopic;
this.subscriberId = subscriberId;
}
@@ -88,7 +83,6 @@
super(type, allocationInfo);
this.connectPoint = connectPoint;
this.countersEntry = null;
- this.dhcpCountersTopic = null;
this.subscriberId = null;
}
@@ -111,15 +105,6 @@
}
/**
- * Gets the Kafka topic where the dhcp counters are published.
- *
- * @return the dhcp kafka topic
- */
- public String getDhcpCountersTopic() {
- return dhcpCountersTopic;
- }
-
- /**
* Gets the subscriber identifier information.
*
* @return the Id from subscriber
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java
new file mode 100644
index 0000000..3ff6a49
--- /dev/null
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.opencord.dhcpl2relay;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Store delegate for DHCP L2 Relay store.
+ */
+public interface DhcpL2RelayStoreDelegate extends StoreDelegate<DhcpL2RelayEvent> {
+}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
index 0f4901f..0335dff 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
@@ -19,17 +19,16 @@
import org.apache.karaf.shell.api.action.Argument;
import org.apache.karaf.shell.api.action.Command;
-import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
-import org.opencord.dhcpl2relay.impl.DhcpL2RelayCounters;
+import org.opencord.dhcpl2relay.impl.DhcpL2RelayCounterNames;
import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersIdentifier;
import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersStore;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Display/Reset the DHCP L2 relay application statistics.
@@ -57,7 +56,7 @@
description = "The counter to display (or reset). In case not specified, all counters\nwill be " +
"displayed (or reset in case the -r option is specified).",
required = false, multiValued = false)
- DhcpL2RelayCounters counter = null;
+ DhcpL2RelayCounterNames counter = null;
@Override
protected void doExecute() {
@@ -69,7 +68,7 @@
subscriberId = DhcpL2RelayEvent.GLOBAL_COUNTER;
}
- if (reset) {
+ if (reset) {
if (please == null || !please.equals(CONFIRM_PHRASE)) {
print("WARNING: Be aware that you are going to reset the counters. " +
"Enter confirmation phrase to continue.");
@@ -83,58 +82,57 @@
dhcpCounters.setCounter(subscriberId, counter, (long) 0);
}
} else {
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpCounters.getCountersMap();
- if (countersMap.size() > 0) {
- if (counter == null) {
- String jsonString = "";
- if (outputJson()) {
- jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
- } else {
- print("%s [%s] :", dhcpCounters.NAME, subscriberId);
- }
- DhcpL2RelayCounters[] counters = DhcpL2RelayCounters.values();
- for (int i = 0; i < counters.length; i++) {
- DhcpL2RelayCounters counterType = counters[i];
- AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
- if (v == null) {
- v = new AtomicLong(0);
- }
- if (outputJson()) {
- jsonString += String.format("\"%s\":%d", counterType, v.longValue());
- if (i < counters.length - 1) {
- jsonString += ",";
- }
- } else {
- printCounter(counterType, v);
- }
- }
- if (outputJson()) {
- jsonString += "}}";
- print("%s", jsonString);
- }
- } else {
- // Show only the specified counter
- AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
- if (v == null) {
- v = new AtomicLong(0);
- }
- if (outputJson()) {
- print("{\"%s\":%d}", counter, v.longValue());
- } else {
- printCounter(counter, v);
- }
- }
- } else {
- print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
- DhcpL2RelayEvent.GLOBAL_COUNTER);
- }
- }
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = dhcpCounters.getCounters().counters();
+ if (countersMap.size() > 0) {
+ if (counter == null) {
+ String jsonString = "";
+ if (outputJson()) {
+ jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
+ } else {
+ print("%s [%s] :", dhcpCounters.NAME, subscriberId);
+ }
+ DhcpL2RelayCounterNames[] counters = DhcpL2RelayCounterNames.values();
+ for (int i = 0; i < counters.length; i++) {
+ DhcpL2RelayCounterNames counterType = counters[i];
+ Long value = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ jsonString += String.format("\"%s\":%d", counterType, value);
+ if (i < counters.length - 1) {
+ jsonString += ",";
+ }
+ } else {
+ printCounter(counterType, value);
+ }
+ }
+ if (outputJson()) {
+ jsonString += "}}";
+ print("%s", jsonString);
+ }
+ } else {
+ // Show only the specified counter
+ Long value = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ print("{\"%s\":%d}", counter, value);
+ } else {
+ printCounter(counter, value);
+ }
+ }
+ } else {
+ print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
+ DhcpL2RelayEvent.GLOBAL_COUNTER);
+ }
+ }
}
- void printCounter(DhcpL2RelayCounters c, AtomicLong a) {
+ private void printCounter(DhcpL2RelayCounterNames counterNames, long value) {
// print in non-JSON format
- print(" %s %s %-4d", c,
- String.join("", Collections.nCopies(50 - c.toString().length(), ".")),
- a.longValue());
+ print(" %s %s %-4d", counterNames,
+ String.join("", Collections.nCopies(50 - counterNames.toString().length(), ".")), value);
}
}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2d17579..c34d817 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -30,7 +30,6 @@
import org.onlab.packet.VlanId;
import org.onlab.packet.dhcp.DhcpOption;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.SafeRecurringTask;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
@@ -76,6 +75,7 @@
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
import org.opencord.dhcpl2relay.DhcpL2RelayService;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -94,7 +94,6 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.List;
@@ -105,25 +104,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
/**
* DHCP Relay Agent Application Component.
@@ -132,8 +123,6 @@
property = {
OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
- PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
- DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
})
public class DhcpL2Relay
extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -203,12 +192,6 @@
/** Ask the DHCP Server to send back replies as L2 broadcast. */
protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
- protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
- private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
-
-
- protected PublishCountersToKafka publishCountersToKafka;
-
ScheduledFuture<?> refreshTask;
ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
@@ -231,6 +214,8 @@
private BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private DhcpL2RelayStoreDelegate delegate = new InnerDhcpL2RelayStoreDelegate();
+
@Activate
protected void activate(ComponentContext context) {
//start the dhcp relay agent
@@ -256,6 +241,8 @@
leadershipService.runForLeadership(LEADER_TOPIC);
+ dhcpL2RelayCounters.setDelegate(delegate);
+
cfgService.addListener(cfgListener);
mastershipService.addListener(changeListener);
deviceService.addListener(deviceListener);
@@ -272,9 +259,6 @@
modified(context);
}
- publishCountersToKafka = new PublishCountersToKafka();
- restartPublishCountersTask();
-
log.info("DHCP-L2-RELAY Started");
}
@@ -286,6 +270,7 @@
if (refreshService != null) {
refreshService.shutdownNow();
}
+ dhcpL2RelayCounters.unsetDelegate(delegate);
cfgService.removeListener(cfgListener);
factories.forEach(cfgService::unregisterConfigFactory);
packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -313,41 +298,6 @@
if (o != null) {
enableDhcpBroadcastReplies = o;
}
-
- Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
- if (newPublishCountersRate != null) {
- if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
- log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
- publishCountersRate = newPublishCountersRate;
- } else if (newPublishCountersRate < 0) {
- log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
- publishCountersRate = 0;
- }
- restartPublishCountersTask();
- }
-
- String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
- if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
- log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
- dhcpCountersTopic = newDhcpCountersTopic;
- }
- }
-
- /**
- * Starts a thread to publish the counters to kafka at a certain rate time.
- */
- private void restartPublishCountersTask() {
- if (refreshTask != null) {
- refreshTask.cancel(true);
- }
- if (publishCountersRate > 0) {
- log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
- publishCountersRate, publishCountersRate);
- refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
- publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
- } else {
- log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
- }
}
@Override
@@ -356,27 +306,6 @@
}
/**
- * Publish the counters to kafka.
- */
- private class PublishCountersToKafka implements Runnable {
- public void run() {
- dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
- // Publish the global counters
- if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
- post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
- new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
- counterValue), dhcpCountersTopic, null));
- } else { // Publish the counters per subscriber
- DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
- post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
- new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
- counterValue), dhcpCountersTopic, counterKey.counterClassKey));
- }
- });
- }
- }
-
- /**
* Checks if this app has been configured.
*
* @return true if all information we need have been initialized
@@ -707,7 +636,7 @@
packetService.emit(o);
SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_TO_SERVER"));
} else {
log.error("No connect point to send msg to DHCP Server");
}
@@ -726,7 +655,7 @@
}
private void updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
- DhcpL2RelayCounters counterType) {
+ DhcpL2RelayCounterNames counterType) {
// Update global counter stats
dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
if (entry == null) {
@@ -796,7 +725,7 @@
forwardPacket(ethernetPacketDiscover, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"));
break;
case DHCPOFFER:
//reply to dhcp client.
@@ -806,7 +735,7 @@
sendReply(ethernetPacketOffer, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPOFFER"));
break;
case DHCPREQUEST:
Ethernet ethernetPacketRequest =
@@ -815,7 +744,7 @@
forwardPacket(ethernetPacketRequest, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPREQUEST"));
break;
case DHCPACK:
//reply to dhcp client.
@@ -825,7 +754,7 @@
sendReply(ethernetPacketAck, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPACK"));
break;
case DHCPDECLINE:
Ethernet ethernetPacketDecline =
@@ -834,7 +763,7 @@
forwardPacket(ethernetPacketDecline, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDECLINE"));
break;
case DHCPNAK:
//reply to dhcp client.
@@ -844,7 +773,7 @@
sendReply(ethernetPacketNak, dhcpPayload, context);
}
entry = getSubscriberInfoFromServer(dhcpPayload, context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPNACK"));
break;
case DHCPRELEASE:
Ethernet ethernetPacketRelease =
@@ -853,7 +782,7 @@
forwardPacket(ethernetPacketRelease, context);
}
entry = getSubscriberInfoFromClient(context);
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPRELEASE"));
break;
default:
break;
@@ -975,7 +904,7 @@
return null;
}
- updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+ updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_FROM_SERVER"));
// we leave the srcMac from the original packet
etherReply.setQinQVID(VlanId.NO_VID);
@@ -1216,4 +1145,21 @@
}
}
}
+
+ private class InnerDhcpL2RelayStoreDelegate implements DhcpL2RelayStoreDelegate {
+ @Override
+ public void notify(DhcpL2RelayEvent event) {
+ if (event.type().equals(DhcpL2RelayEvent.Type.STATS_UPDATE)) {
+ DhcpL2RelayEvent toPost = event;
+ if (event.getSubscriberId() != null) {
+ // infuse the event with the allocation info before posting
+ DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(event.getSubscriberId()));
+ toPost = new DhcpL2RelayEvent(event.type(), info, event.connectPoint(),
+ event.getCountersEntry(), event.getSubscriberId());
+ }
+ post(toPost);
+ }
+
+ }
+ }
}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
similarity index 74%
rename from app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
rename to app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
index 71f7adc..8c2b694 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
@@ -23,7 +23,7 @@
/**
* Represents DHCP relay counters type.
*/
-public enum DhcpL2RelayCounters {
+public enum DhcpL2RelayCounterNames {
/**
* DHCP relay counter of type Discover.
*/
@@ -64,10 +64,10 @@
/**
* Supported types of DHCP relay counters.
*/
- static final Set<DhcpL2RelayCounters> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounters.DHCPDISCOVER,
- DhcpL2RelayCounters.DHCPRELEASE, DhcpL2RelayCounters.DHCPDECLINE,
- DhcpL2RelayCounters.DHCPREQUEST, DhcpL2RelayCounters.DHCPOFFER,
- DhcpL2RelayCounters.DHCPACK, DhcpL2RelayCounters.DHCPNACK,
- DhcpL2RelayCounters.PACKETS_TO_SERVER,
- DhcpL2RelayCounters.PACKETS_FROM_SERVER);
+ static final Set<DhcpL2RelayCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounterNames.DHCPDISCOVER,
+ DhcpL2RelayCounterNames.DHCPRELEASE, DhcpL2RelayCounterNames.DHCPDECLINE,
+ DhcpL2RelayCounterNames.DHCPREQUEST, DhcpL2RelayCounterNames.DHCPOFFER,
+ DhcpL2RelayCounterNames.DHCPACK, DhcpL2RelayCounterNames.DHCPNACK,
+ DhcpL2RelayCounterNames.PACKETS_TO_SERVER,
+ DhcpL2RelayCounterNames.PACKETS_FROM_SERVER);
}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
index 63587e1..d6c7ef9 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
@@ -24,14 +24,14 @@
*/
public final class DhcpL2RelayCountersIdentifier {
final String counterClassKey;
- final Enum<DhcpL2RelayCounters> counterTypeKey;
+ final Enum<DhcpL2RelayCounterNames> counterTypeKey;
/**
* Creates a default global counter identifier for a given counterType.
*
* @param counterTypeKey Identifies the supported type of DHCP relay counters
*/
- public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounters counterTypeKey) {
+ public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounterNames counterTypeKey) {
this.counterClassKey = DhcpL2RelayEvent.GLOBAL_COUNTER;
this.counterTypeKey = counterTypeKey;
}
@@ -43,7 +43,7 @@
* @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
* @param counterTypeKey Identifies the supported type of DHCP relay counters
*/
- public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounters counterTypeKey) {
+ public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounterNames counterTypeKey) {
this.counterClassKey = counterClassKey;
this.counterTypeKey = counterTypeKey;
}
@@ -66,4 +66,8 @@
public int hashCode() {
return Objects.hash(counterClassKey, counterTypeKey);
}
+
+ public String toString() {
+ return this.counterClassKey + "/" + this.counterTypeKey;
+ }
}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
index b79006b..a8d20ce 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
@@ -16,31 +16,25 @@
package org.opencord.dhcpl2relay.impl;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.onosproject.store.Store;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
/**
* Represents a stored DHCP Relay Counters. A counter entry is defined by the pair <counterClass, counterType>,
* where counterClass can be maybe global or subscriber ID and counterType is the DHCP message type.
*/
-public interface DhcpL2RelayCountersStore {
+public interface DhcpL2RelayCountersStore extends Store<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate> {
String NAME = "DHCP_L2_Relay_stats";
/**
- * Init counter values for a given counter class.
- *
- * @param counterClass class of counters (global, per subscriber).
- */
- void initCounters(String counterClass);
-
- /**
* Creates or updates DHCP L2 Relay counter.
*
* @param counterClass class of counters (global, per subscriber).
* @param counterType name of counter
*/
- void incrementCounter(String counterClass, DhcpL2RelayCounters counterType);
+ void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType);
/**
* Sets the value of a DHCP L2 Relay counter.
@@ -49,14 +43,14 @@
* @param counterType name of counter
* @param value The value of the counter
*/
- void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value);
+ void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value);
/**
- * Gets the DHCP L2 Relay counters map.
+ * Gets the current DHCP L2 relay counter values.
*
- * @return the DHCP counter map
+ * @return DHCP L2 relay counter values
*/
- public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap();
+ DhcpL2RelayStatistics getCounters();
/**
* Resets counter values for a given counter class.
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java
new file mode 100644
index 0000000..c719253
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of DHCP L2 Relay statistics.
+ */
+public class DhcpL2RelayStatistics {
+
+ private final ImmutableMap<DhcpL2RelayCountersIdentifier, Long> counters;
+
+ private DhcpL2RelayStatistics(ImmutableMap<DhcpL2RelayCountersIdentifier, Long> counters) {
+ this.counters = counters;
+ }
+
+ /**
+ * Creates a new empty statistics instance.
+ */
+ public DhcpL2RelayStatistics() {
+ counters = ImmutableMap.of();
+ }
+
+ /**
+ * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+ *
+ * @param id counter ID
+ * @return counter value
+ */
+ public long get(DhcpL2RelayCountersIdentifier id) {
+ return counters.getOrDefault(id, 0L);
+ }
+
+ /**
+ * Gets the map of counters.
+ *
+ * @return map of counters
+ */
+ public Map<DhcpL2RelayCountersIdentifier, Long> counters() {
+ return counters;
+ }
+
+ /**
+ * Creates a new statistics instance with the given counter values.
+ *
+ * @param counters counters
+ * @return statistics
+ */
+ public static DhcpL2RelayStatistics withCounters(Map<DhcpL2RelayCountersIdentifier, Long> counters) {
+ ImmutableMap.Builder<DhcpL2RelayCountersIdentifier, Long> builder = ImmutableMap.builder();
+
+ counters.forEach(builder::put);
+
+ return new DhcpL2RelayStatistics(builder.build());
+ }
+
+ /**
+ * Adds the given statistics instance to this one (sums the common counters) and returns
+ * a new instance containing the result.
+ *
+ * @param other other instance
+ * @return result
+ */
+ public DhcpL2RelayStatistics add(DhcpL2RelayStatistics other) {
+ ImmutableMap.Builder<DhcpL2RelayCountersIdentifier, Long> builder = ImmutableMap.builder();
+
+ Set<DhcpL2RelayCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+
+ counters.forEach((id, value) -> {
+ builder.put(id, value + other.counters.getOrDefault(id, 0L));
+ keys.remove(id);
+ });
+
+ keys.forEach(i -> builder.put(i, other.counters.get(i)));
+
+ return new DhcpL2RelayStatistics(builder.build());
+ }
+
+ @Override
+ public String toString() {
+ MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+ counters.forEach((id, v) -> helper.add(id.toString(), v));
+ return helper.toString();
+ }
+}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
index af89aa3..3c47467 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
@@ -33,6 +33,6 @@
public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
- public static final String DHCP_COUNTERS_TOPIC = "dhcpCountersTopic";
- public static final String DHCP_COUNTERS_TOPIC_DEFAULT = "onos_traffic.stats";
+ public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+ public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
index eedbace..f95867c 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
@@ -15,57 +15,215 @@
*/
package org.opencord.dhcpl2relay.impl;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
+import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.util.Iterator;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.Dictionary;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* DHCP Relay Agent Counters Manager Component.
*/
-@Component(immediate = true)
-public class SimpleDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
- private ApplicationId appId;
+@Component(immediate = true,
+property = {
+ PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+ SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+}
+)
+public class SimpleDhcpL2RelayCountersStore extends AbstractStore<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate>
+ implements DhcpL2RelayCountersStore {
+
+ private static final String DHCP_STATISTICS_LEADERSHIP = "dhcpl2relay-statistics";
+ private static final MessageSubject RESET_SUBJECT = new MessageSubject("dhcpl2relay-statistics-reset");
+
private final Logger log = getLogger(getClass());
- private Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+ private ConcurrentMap<DhcpL2RelayCountersIdentifier, Long> countersMap;
+
+ private EventuallyConsistentMap<NodeId, DhcpL2RelayStatistics> statistics;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected CoreService coreService;
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicationService;
+
+ protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+ protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(DhcpL2RelayStatistics.class)
+ .register(DhcpL2RelayCountersIdentifier.class)
+ .register(DhcpL2RelayCounterNames.class)
+ .register(ClusterMessage.class)
+ .register(MessageSubject.class)
+ .build();
+
+ private ScheduledExecutorService executor;
+
+ private ScheduledFuture<?> publisherTask;
+ private ScheduledFuture<?> syncTask;
+
+ private AtomicBoolean dirty = new AtomicBoolean(true);
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
log.info("Activate Dhcp L2 Counters Manager");
- //appId = coreService.getAppId(DhcpL2Relay.DHCP_L2RELAY_APP);
- countersMap = new ConcurrentHashMap();
+ countersMap = new ConcurrentHashMap<>();
+ componentConfigService.registerProperties(getClass());
+
+ modified(context);
+
+ statistics = storageService.<NodeId, DhcpL2RelayStatistics>eventuallyConsistentMapBuilder()
+ .withName("dhcpl2relay-statistics")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+
// Initialize counter values for the global counters
- initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+ syncStats();
+
+ leadershipService.runForLeadership(DHCP_STATISTICS_LEADERSHIP);
+
+ executor = Executors.newScheduledThreadPool(1);
+
+ clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+ this::resetLocal, executor);
+
+ startSyncTask();
+ startPublishTask();
}
- public ImmutableMap<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+ leadershipService.withdraw(DHCP_STATISTICS_LEADERSHIP);
+
+ stopPublishTask();
+ stopSyncTask();
+ executor.shutdownNow();
+ componentConfigService.unregisterProperties(getClass(), false);
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<String, Object> properties = context.getProperties();
+
+ String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+ int oldPublishCountersRate = publishCountersRate;
+ publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldPublishCountersRate != publishCountersRate) {
+ stopPublishTask();
+ startPublishTask();
+ }
+
+ s = Tools.get(properties, SYNC_COUNTERS_RATE);
+ int oldSyncCountersRate = syncCountersRate;
+ syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldSyncCountersRate != syncCountersRate) {
+ stopSyncTask();
+ startSyncTask();
+ }
+ }
+
+ private ScheduledFuture<?> startTask(Runnable r, int rate) {
+ return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+ 0, rate, TimeUnit.SECONDS);
+ }
+
+ private void stopTask(ScheduledFuture<?> task) {
+ task.cancel(true);
+ }
+
+ private void startSyncTask() {
+ syncTask = startTask(this::syncStats, syncCountersRate);
+ }
+
+ private void stopSyncTask() {
+ stopTask(syncTask);
+ }
+
+ private void startPublishTask() {
+ publisherTask = startTask(this::publishStats, publishCountersRate);
+ }
+
+ private void stopPublishTask() {
+ stopTask(publisherTask);
+ }
+
+ ImmutableMap<DhcpL2RelayCountersIdentifier, Long> getCountersMap() {
return ImmutableMap.copyOf(countersMap);
}
+ public DhcpL2RelayStatistics getCounters() {
+ return aggregate();
+ }
+
/**
* Initialize the supported counters map for the given counter class.
* @param counterClass class of counters (global, per subscriber)
+ * @param existingStats existing values to intialise the counters to
*/
- public void initCounters(String counterClass) {
+ public void initCounters(String counterClass, DhcpL2RelayStatistics existingStats) {
checkNotNull(counterClass, "counter class can't be null");
- for (DhcpL2RelayCounters counterType : DhcpL2RelayCounters.SUPPORTED_COUNTERS) {
- countersMap.put(new DhcpL2RelayCountersIdentifier(counterClass, counterType), new AtomicLong(0));
+ for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
+ DhcpL2RelayCountersIdentifier id = new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+ countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
}
}
@@ -74,17 +232,17 @@
* @param counterClass class of counters (global, per subscriber)
* @param counterType name of counter
*/
- public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+ public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
checkNotNull(counterClass, "counter class can't be null");
- if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+ if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
countersMap.compute(counterIdentifier, (key, counterValue) ->
- (counterValue != null) ? new AtomicLong(counterValue.incrementAndGet()) : new AtomicLong(1)
- );
+ (counterValue != null) ? counterValue + 1 : 1L);
} else {
log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
}
+ dirty.set(true);
}
/**
@@ -92,31 +250,84 @@
* @param counterClass class of counters (global, per subscriber)
*/
public void resetCounters(String counterClass) {
+ byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+ ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+ clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+ }
+
+ private void resetLocal(ClusterMessage m) {
+ String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+
checkNotNull(counterClass, "counter class can't be null");
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator(); it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
- countersMap.computeIfPresent(counterIdentifier, (key, counterValue) ->
- new AtomicLong(0)
- );
+ countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
}
+ dirty.set(true);
+ syncStats();
}
/**
* Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
* @param counterClass class of counters (global, per subscriber).
* @param counterType name of counter
- * @param value conter value
+ * @param value counter value
*/
- public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
+ public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
checkNotNull(counterClass, "counter class can't be null");
- if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+ if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
- countersMap.put(counterIdentifier, new AtomicLong(value));
+ countersMap.put(counterIdentifier, value);
} else {
log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
}
+ dirty.set(true);
+ syncStats();
+ }
+
+ private DhcpL2RelayStatistics aggregate() {
+ return statistics.values().stream()
+ .reduce(new DhcpL2RelayStatistics(), DhcpL2RelayStatistics::add);
+ }
+
+ /**
+ * Creates a snapshot of the current in-memory statistics.
+ *
+ * @return snapshot of statistics
+ */
+ private DhcpL2RelayStatistics snapshot() {
+ return DhcpL2RelayStatistics.withCounters(countersMap);
+ }
+
+ /**
+ * Syncs in-memory stats to the eventually consistent map.
+ */
+ private void syncStats() {
+ if (dirty.get()) {
+ statistics.put(clusterService.getLocalNode().id(), snapshot());
+ dirty.set(false);
+ }
+ }
+
+ private void publishStats() {
+ // Only publish events if we are the cluster leader for DHCP L2 relay stats
+ if (!Objects.equals(leadershipService.getLeader(DHCP_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ return;
+ }
+
+ aggregate().counters().forEach((counterKey, counterValue) -> {
+ // Subscriber-specific counters have the subscriber ID set
+ String subscriberId = null;
+ if (!counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
+ subscriberId = counterKey.counterClassKey;
+ }
+
+ delegate.notify(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+ new AbstractMap.SimpleEntry<>(counterKey.counterTypeKey.toString(),
+ new AtomicLong(counterValue)), subscriberId));
+ });
}
}
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index d4cb8bd..e218088 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -66,6 +66,9 @@
dhcpL2Relay.mastershipService = new MockMastershipService();
dhcpL2Relay.storageService = new TestStorageService();
dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
+ SimpleDhcpL2RelayCountersStore store = new SimpleDhcpL2RelayCountersStore();
+ store.componentConfigService = mockConfigService;
+ dhcpL2Relay.dhcpL2RelayCounters = store;
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
dhcpL2Relay.activate(new ComponentContextAdapter());
}
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
import org.onlab.junit.TestUtils;
import org.onlab.osgi.ComponentContextAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static org.junit.Assert.assertEquals;
@@ -59,11 +70,16 @@
dhcpL2Relay.storageService = new TestStorageService();
dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
- dhcpL2Relay.activate(new ComponentContextAdapter());
store = new SimpleDhcpL2RelayCountersStore();
+ store.storageService = new TestStorageService();
+ store.clusterService = new ClusterServiceAdapter();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterCommunicationService = new TestClusterCommunicationService<>();
+ store.componentConfigService = mockConfigService;
TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
- store.activate();
+ store.activate(new MockComponentContext());
dhcpL2Relay.dhcpL2RelayCounters = this.store;
+ dhcpL2Relay.activate(new ComponentContextAdapter());
}
/**
@@ -80,20 +96,20 @@
@Test
public void testInitCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
// Init the supported counter for a specific subscriber
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+ store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, globalCounterValue);
- assertEquals((long) 0, perSubscriberValue);
+ counterType));
+ assertEquals(0, globalCounterValue);
+ assertEquals(0, perSubscriberValue);
}
}
@@ -104,27 +120,27 @@
@Test
public void testIncrementCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
// Increment of existing supported global counter
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
// Add of a Subscriber entry that is not already in the set
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
}
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, globalCounterValue);
- assertEquals((long) 1, perSubscriberValue);
+ counterType));
+ assertEquals(1, globalCounterValue);
+ assertEquals(1, perSubscriberValue);
}
}
@@ -133,12 +149,12 @@
*/
@Test
public void testIncrementAndResetCounter() {
- DhcpL2RelayCounters counterType;
+ DhcpL2RelayCounterNames counterType;
long subscriberValue;
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
// First start incrementing the counter of a specific subscriber
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
// Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
}
// Make sure that the counter is incremented
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, subscriberValue);
+ counterType));
+ assertEquals(1, subscriberValue);
}
// Reset the counter
dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, subscriberValue);
+ counterType));
+ assertEquals(0, subscriberValue);
}
}
@@ -173,13 +189,84 @@
*/
@Test
public void testInsertOrUpdateCounter() {
- dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+ dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
- assertEquals((long) 50, subscriberValue);
+ assertEquals(50, subscriberValue);
+ }
+
+ public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+ private Consumer handler;
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber, ExecutorService executor) {
+
+ }
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ handler.accept(message);
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Function<byte[], R> decoder,
+ NodeId toNodeId, Duration timeout) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Consumer<M> handler, Executor executor) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+
+ }
}
}
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 42664cc..4106ef1 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -16,7 +16,6 @@
package org.opencord.dhcpl2relay.impl;
import com.google.common.collect.Lists;
-import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -27,20 +26,21 @@
import org.onlab.packet.UDP;
import org.onlab.packet.dhcp.DhcpOption;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import static org.easymock.EasyMock.createMock;
import static org.junit.Assert.assertEquals;
public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -49,7 +49,7 @@
private SimpleDhcpL2RelayCountersStore store;
ComponentConfigService mockConfigService =
- EasyMock.createMock(ComponentConfigService.class);
+ createMock(ComponentConfigService.class);
/**
* Sets up the services required by the dhcpl2relay app.
@@ -73,8 +73,13 @@
dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
store = new SimpleDhcpL2RelayCountersStore();
+ store.storageService = new TestStorageService();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterService = new ClusterServiceAdapter();
+ store.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
+ store.componentConfigService = mockConfigService;
TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
- store.activate();
+ store.activate(new MockComponentContext());
dhcpL2Relay.dhcpL2RelayCounters = this.store;
}
@@ -207,20 +212,20 @@
sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPACK")));
- assertEquals((long) 1, discoveryValue);
- assertEquals((long) 1, offerValue);
- assertEquals((long) 1, requestValue);
- assertEquals((long) 1, ackValue);
+ assertEquals(1, discoveryValue);
+ assertEquals(1, offerValue);
+ assertEquals(1, requestValue);
+ assertEquals(1, ackValue);
}
/**
@@ -246,32 +251,20 @@
sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPACK")));
- assertEquals((long) 1, discoveryValue);
- assertEquals((long) 1, offerValue);
- assertEquals((long) 1, requestValue);
- assertEquals((long) 1, ackValue);
- }
-
- /**
- * Tests the schedule function to publish the counters to kafka.
- *
- */
- @Test
- public void testSchedulePublishCountersToKafka() {
- MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
- dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
- dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
- executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+ assertEquals(1, discoveryValue);
+ assertEquals(1, offerValue);
+ assertEquals(1, requestValue);
+ assertEquals(1, ackValue);
}
public void compareClientPackets(Ethernet sent, Ethernet relayed) {
@@ -317,29 +310,38 @@
}
private class MockDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+
@Override
- public void initCounters(String counterClass) {
+ public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
}
@Override
- public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+ public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
}
@Override
- public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
-
- }
-
- @Override
- public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
- return new HashMap<>();
+ public DhcpL2RelayStatistics getCounters() {
+ return new DhcpL2RelayStatistics();
}
@Override
public void resetCounters(String counterClass) {
}
+
+ @Override
+ public void setDelegate(DhcpL2RelayStoreDelegate delegate) {
+ }
+
+ @Override
+ public void unsetDelegate(DhcpL2RelayStoreDelegate delegate) {
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return false;
+ }
}
}
\ No newline at end of file