VOL-2710: Add unit test cases for kafka-onos
Change-Id: Ieff0a32b3552eb8843fa933e57fa4148cb54853b
diff --git a/pom.xml b/pom.xml
index aab74c2..9c129d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,28 @@
<version>${igmp.api.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <version>${onos.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${onos.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 6e34ceb..896a514 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -59,7 +59,7 @@
private static final Class<KafkaConfig>
KAFKA_CONFIG_CLASS = KafkaConfig.class;
- private static final String APP_NAME = "org.opencord.kafka";
+ protected static final String APP_NAME = "org.opencord.kafka";
private ApplicationId appId;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index 4d3a443..34699cb 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -94,9 +94,9 @@
new InternalAaaMachineStatisticsListener();
// topics
- private static final String TOPIC = "authentication.events";
- private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
- private static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
+ protected static final String TOPIC = "authentication.events";
+ protected static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
+ protected static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
// auth event params
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 9090f66..8ee6f22 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -22,8 +22,8 @@
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
import org.opencord.kafka.EventBusService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
@@ -63,7 +63,7 @@
private final AccessDeviceListener listener = new InternalAccessDeviceListener();
- private static final String TOPIC = "onu.events";
+ protected static final String TOPIC = "onu.events";
// event fields
private static final String STATUS = "status";
diff --git a/src/main/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegration.java
index 17b7088..0cf49af 100644
--- a/src/main/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegration.java
@@ -50,7 +50,7 @@
protected volatile PppoeBngControlHandler ignore;
private final AtomicReference<PppoeBngControlHandler> pppoeBngControlRef = new AtomicReference<>();
- private static final String TOPIC_PPPOE = "bng.pppoe";
+ protected static final String TOPIC_PPPOE = "bng.pppoe";
private static final String TIMESTAMP = "timestamp";
private static final String EVENT_TYPE = "eventType";
private static final String OLT_DEVICE_ID = "deviceId";
diff --git a/src/main/java/org/opencord/kafka/integrations/BngStatsKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/BngStatsKafkaIntegration.java
index e20ecbb..32f901b 100644
--- a/src/main/java/org/opencord/kafka/integrations/BngStatsKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/BngStatsKafkaIntegration.java
@@ -56,7 +56,7 @@
protected volatile BngStatsService ignore;
private final AtomicReference<BngStatsService> bngStatsServiceRef = new AtomicReference<>();
- private static final String TOPIC_STATS = "bng.stats";
+ protected static final String TOPIC_STATS = "bng.stats";
private static final String SUBSCRIBER_S_TAG = "sTag";
private static final String SUBSCRIBER_C_TAG = "cTag";
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index 68450d1..9b97ba5 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -54,10 +54,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- private final DeviceListener listener = new InternalDeviceListener();
+ protected final DeviceListener listener = new InternalDeviceListener();
- private static final String TOPIC = "onos.kpis";
- private static final String PORT_EVENT_TOPIC = "onos.events.port";
+ protected static final String TOPIC = "onos.kpis";
+ protected static final String PORT_EVENT_TOPIC = "onos.events.port";
// event fields
private static final String TIMESTAMP = "timestamp";
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index 494635f..43153c3 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -62,8 +62,8 @@
private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
// topics
- private static final String TOPIC = "dhcp.events";
- private static final String DHCP_STATS_TOPIC = "onos.dhcp.stats.kpis";
+ protected static final String TOPIC = "dhcp.events";
+ protected static final String DHCP_STATS_TOPIC = "onos.dhcp.stats.kpis";
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
diff --git a/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
index 4da7e43..c424d67 100644
--- a/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
@@ -58,7 +58,7 @@
new InternalIgmpStatisticsListener();
//TOPIC
- private static final String IGMP_STATISTICS_TOPIC = "onos.igmp.stats.kpis";
+ protected static final String IGMP_STATISTICS_TOPIC = "onos.igmp.stats.kpis";
// IGMP stats event params
private static final String IGMP_JOIN_REQ = "igmpJoinReq";
diff --git a/src/main/java/org/opencord/kafka/integrations/McastKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/McastKafkaIntegration.java
index 8712333..1574c7f 100644
--- a/src/main/java/org/opencord/kafka/integrations/McastKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/McastKafkaIntegration.java
@@ -55,7 +55,7 @@
private final CordMcastStatisticsEventListener cordMcastStatisticsEventListener =
new InternalCorcMcastStatisticsListener();
- private static final String MCAST_OPERATIONAL_STATUS_TOPIC = "mcastOperationalStatus.events";
+ protected static final String MCAST_OPERATIONAL_STATUS_TOPIC = "mcastOperationalStatus.events";
//cord mcast stats event params
private static final String TIMESTAMP = "timestamp";
diff --git a/src/test/java/org/opencord/kafka/impl/KafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/impl/KafkaIntegrationTest.java
new file mode 100644
index 0000000..fc859c5
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/impl/KafkaIntegrationTest.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.ConfigApplyDelegate;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test to verify KafkaIntegration.
+ */
+class KafkaIntegrationTest {
+ private static final String ASSERT_MESSAGE = "Config not updated";
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ protected NetworkConfigListener configListener;
+ protected ConfigApplyDelegate delegate;
+ protected ObjectMapper mapper;
+ protected ApplicationId subject;
+ protected NetworkConfigEvent event;
+ private KafkaConfig config = new KafkaConfig();
+ private KafkaIntegration kafkaIntegration;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ kafkaIntegration = new KafkaIntegration();
+ kafkaIntegration.coreService = new MockCoreService();
+ kafkaIntegration.clusterService = new ClusterServiceAdapter();
+ setupConfig("/localKafkaConfig.json");
+ kafkaIntegration.activate();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ kafkaIntegration.deactivate();
+ kafkaIntegration = null;
+ }
+
+ /**
+ * Local configuration verification using Resources file.
+ */
+ @Test
+ void testConfigAdded() {
+ event = new NetworkConfigEvent(NetworkConfigEvent.Type.CONFIG_ADDED, subject,
+ config, null, KafkaConfig.class);
+ configListener.event(event);
+ KafkaConfig expectedConfig = kafkaIntegration.configRegistry
+ .getConfig(kafkaIntegration.coreService.getAppId(KafkaIntegration.APP_NAME),
+ KafkaConfig.class);
+ assertEquals(ASSERT_MESSAGE, BOOTSTRAP_SERVERS, expectedConfig.getBootstrapServers());
+ }
+
+ public void setupConfig(String localConfig) throws Exception {
+ delegate = new MockConfigDelegate();
+ mapper = new ObjectMapper();
+ subject = kafkaIntegration.coreService.registerApplication(KafkaIntegration.APP_NAME);
+ config.init(subject, "kafka-local-mode-test", node(localConfig), mapper, delegate);
+ kafkaIntegration.configRegistry = new MockNetworkConfigRegistry(subject, config);
+ }
+
+ protected JsonNode node(String jsonFile) throws Exception {
+ final InputStream jsonStream = KafkaConfig.class.getResourceAsStream(jsonFile);
+ return mapper.readTree(jsonStream);
+ }
+
+ /**
+ * Mocks an ONOS configuration delegate to allow JSON based configuration to
+ * be tested.
+ */
+ private static final class MockConfigDelegate implements ConfigApplyDelegate {
+ @Override
+ public void onApply(@SuppressWarnings("rawtypes") Config config) {
+ config.apply();
+ }
+ }
+
+ /**
+ * Mocks Core service adapter.
+ */
+ private static class MockCoreService extends CoreServiceAdapter {
+
+ private List<ApplicationId> idList = new ArrayList<>();
+ private Map<String, ApplicationId> idMap = new HashMap<>();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onosproject.core.CoreServiceAdapter#getAppId(java.lang.Short)
+ */
+ @Override
+ public ApplicationId getAppId(Short id) {
+ if (id >= idList.size()) {
+ return null;
+ }
+ return idList.get(id);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onosproject.core.CoreServiceAdapter#getAppId(java.lang.String)
+ */
+ @Override
+ public ApplicationId getAppId(String name) {
+ return idMap.get(name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onosproject.core.CoreServiceAdapter#registerApplication(java.lang
+ * .String)
+ */
+ @Override
+ public ApplicationId registerApplication(String name) {
+ ApplicationId appId = idMap.get(name);
+ if (appId == null) {
+ appId = new MockApplicationId((short) idList.size(), name);
+ idList.add(appId);
+ idMap.put(name, appId);
+ }
+ return appId;
+ }
+ }
+
+ /*
+ Mocks application id.
+ */
+ private static class MockApplicationId implements ApplicationId {
+
+ private final short id;
+ private final String name;
+
+ public MockApplicationId(short id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override
+ public short id() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+ }
+
+
+ /**
+ * Mocks the ONOS network configuration registry so that the application
+ * under test can access a JSON defined configuration.
+ */
+ private class MockNetworkConfigRegistry<S> extends NetworkConfigRegistryAdapter {
+ private final KafkaConfig config;
+
+ public MockNetworkConfigRegistry(final S subject, final KafkaConfig config) {
+ this.config = config;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <S, C extends Config<S>> C getConfig(final S subject, final Class<C> configClass) {
+ return (C) config;
+ }
+
+ @Override
+ public void addListener(NetworkConfigListener listener) {
+ configListener = listener;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/AaaKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/AaaKafkaIntegrationTest.java
new file mode 100644
index 0000000..f6fabde
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/AaaKafkaIntegrationTest.java
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.RADIUS;
+import org.opencord.aaa.AaaMachineStatisticsDelegate;
+import org.opencord.aaa.AaaMachineStatisticsEventListener;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaStatistics;
+import org.opencord.aaa.AaaStatisticsSnapshot;
+import org.opencord.aaa.AaaSupplicantMachineStats;
+import org.opencord.aaa.AuthenticationEventListener;
+import org.opencord.aaa.AuthenticationRecord;
+import org.opencord.aaa.AuthenticationService;
+import org.opencord.aaa.AuthenticationStatisticsEventListener;
+import org.opencord.aaa.AuthenticationStatisticsService;
+import org.opencord.aaa.RadiusCommunicator;
+import org.opencord.aaa.RadiusOperationalStatusEventDelegate;
+import org.opencord.aaa.RadiusOperationalStatusEventListener;
+import org.opencord.aaa.RadiusOperationalStatusService;
+import org.opencord.kafka.EventBusService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test cases for AaaKafkaIntegration.
+ */
+class AaaKafkaIntegrationTest extends KafkaIntegrationTestBase {
+
+
+ private AaaKafkaIntegration aaaKafkaInt;
+ private AuthenticationEventListener authEventListener;
+ private AuthenticationStatisticsEventListener authStatisticsEventListener;
+ private RadiusOperationalStatusEventListener radiusOperStatusEventListener;
+ private AaaMachineStatisticsEventListener aaaMachineStatisticsEventListener;
+
+ @BeforeEach
+ void setUp() {
+ aaaKafkaInt = new AaaKafkaIntegration();
+
+ aaaKafkaInt.deviceService = new MockDeviceService();
+ aaaKafkaInt.eventBusService = new MockEventBusService();
+ aaaKafkaInt.ignore = new MockAuthenticationService();
+ aaaKafkaInt.ignore2 = new MockAuthenticationStatisticsService();
+ aaaKafkaInt.ignore3 = new MockRadiusOperationalStatusService();
+ aaaKafkaInt.ignore4 = new MockAaaMachineStatisticsService();
+ aaaKafkaInt.bindAuthenticationService(aaaKafkaInt.ignore);
+ aaaKafkaInt.bindAuthenticationStatService(aaaKafkaInt.ignore2);
+ aaaKafkaInt.bindRadiusOperationalStatusService(aaaKafkaInt.ignore3);
+ aaaKafkaInt.bindAaaMachineStatisticsService(aaaKafkaInt.ignore4);
+ aaaKafkaInt.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ aaaKafkaInt.deactivate();
+ aaaKafkaInt.unbindRadiusOperationalStatusService(aaaKafkaInt.ignore3);
+ aaaKafkaInt.unbindAaaMachineStatisticsService(aaaKafkaInt.ignore4);
+ aaaKafkaInt = null;
+ }
+
+ /**
+ * testAuthenticationEvent to perform unit test for
+ * the AuthenticationEvent event.
+ */
+ @Test
+ void testAuthenticationEvent() {
+ authEventListener.event(getAuthenticationEvent());
+ assertEquals(MockEventBusService.authCounter, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * testAuthenticationStatisticsEvent to perform unit test for
+ * the AuthenticationStatisticsEvent event.
+ */
+ @Test
+ void testAuthenticationStatisticsEvent() {
+ authStatisticsEventListener.event(getAuthenticationStatisticsEvent());
+ assertEquals(MockEventBusService.authStatsCounter, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * testRadiusOperationalStatusEvent to perform unit test for
+ * the RadiusOperationalStatusEvent event.
+ */
+ @Test
+ void testRadiusOperationalStatusEvent() {
+ radiusOperStatusEventListener.event(getRadiusOperationalStatusEvent());
+ assertEquals(MockEventBusService.radiusOperstate, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * testAaaMachineStatisticsEvent to perform unit test for
+ * the AaaMachineStatisticsEvent event.
+ */
+ @Test
+ void testAaaMachineStatisticsEvent() {
+ aaaMachineStatisticsEventListener.event(getAaaMachineStatisticsEvent());
+ assertEquals(MockEventBusService.authStatsCounter, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * EventBusService mocker class.
+ */
+ private static class MockEventBusService implements EventBusService {
+ static int authCounter;
+ static int authStatsCounter;
+ static int radiusOperstate;
+ static int otherCounter;
+
+ MockEventBusService() {
+ authCounter = 0;
+ authStatsCounter = 0;
+ radiusOperstate = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ switch (topic) {
+ case AaaKafkaIntegration.TOPIC:
+ authCounter++;
+ break;
+ case AaaKafkaIntegration.AUTHENTICATION_STATISTICS_TOPIC:
+ authStatsCounter++;
+ break;
+ case AaaKafkaIntegration.RADIUS_OPERATION_STATUS_TOPIC:
+ radiusOperstate++;
+ break;
+ default:
+ otherCounter++;
+ break;
+ }
+ }
+ }
+
+ /**
+ * AuthenticationService mocker class.
+ */
+ private class MockAuthenticationService implements AuthenticationService {
+ @Override
+ public Iterable<AuthenticationRecord> getAuthenticationRecords() {
+ return null;
+ }
+
+ @Override
+ public boolean removeAuthenticationStateByMac(MacAddress macAddress) {
+ return false;
+ }
+
+ @Override
+ public AaaSupplicantMachineStats getSupplicantMachineStats(String s) {
+ return null;
+ }
+
+ @Override
+ public void addListener(AuthenticationEventListener listener) {
+ authEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(AuthenticationEventListener listener) {
+ authEventListener = null;
+ }
+ }
+
+ /**
+ * AuthenticationStatisticsService mocker class.
+ */
+ private class MockAuthenticationStatisticsService implements AuthenticationStatisticsService {
+ @Override
+ public AaaStatistics getAaaStats() {
+ return null;
+ }
+
+ @Override
+ public AaaStatisticsSnapshot getClusterStatistics() {
+ return null;
+ }
+
+ @Override
+ public void handleRoundtripTime(byte b) {
+
+ }
+
+ @Override
+ public void calculatePacketRoundtripTime() {
+
+ }
+
+ @Override
+ public void putOutgoingIdentifierToMap(byte b) {
+
+ }
+
+ @Override
+ public void resetAllCounters() {
+
+ }
+
+ @Override
+ public void addListener(AuthenticationStatisticsEventListener listener) {
+ authStatisticsEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(AuthenticationStatisticsEventListener listener) {
+ authStatisticsEventListener = null;
+ }
+ }
+
+ /**
+ * RadiusOperationalStatusService mocker class.
+ */
+ private class MockRadiusOperationalStatusService implements RadiusOperationalStatusService {
+ @Override
+ public RadiusOperationalStatusEventDelegate getRadiusOprStDelegate() {
+ return null;
+ }
+
+ @Override
+ public String getRadiusServerOperationalStatus() {
+ return null;
+ }
+
+ @Override
+ public void setStatusServerReqSent(boolean b) {
+
+ }
+
+ @Override
+ public void setRadiusOperationalStatusEvaluationMode(
+ RadiusOperationalStatusService.RadiusOperationalStatusEvaluationMode
+ radiusOperationalStatusEvaluationMode) {
+
+ }
+
+ @Override
+ public void setOperationalStatusServerTimeoutInMillis(long l) {
+
+ }
+
+ @Override
+ public void checkServerOperationalStatus() {
+
+ }
+
+ @Override
+ public boolean isRadiusResponseForOperationalStatus(byte b) {
+ return false;
+ }
+
+ @Override
+ public void handleRadiusPacketForOperationalStatus(RADIUS radius) {
+
+ }
+
+ @Override
+ public void initialize(byte[] bytes, String s, RadiusCommunicator radiusCommunicator) {
+
+ }
+
+ @Override
+ public void setOutTimeInMillis(byte b) {
+
+ }
+
+ @Override
+ public void addListener(RadiusOperationalStatusEventListener listener) {
+ radiusOperStatusEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(RadiusOperationalStatusEventListener listener) {
+ radiusOperStatusEventListener = null;
+ }
+ }
+
+ /**
+ * AaaMachineStatisticsService mocker class.
+ */
+ private class MockAaaMachineStatisticsService implements AaaMachineStatisticsService {
+ @Override
+ public AaaSupplicantMachineStats getSupplicantStats(Object o) {
+ return null;
+ }
+
+ @Override
+ public AaaMachineStatisticsDelegate getMachineStatsDelegate() {
+ return null;
+ }
+
+ @Override
+ public void logAaaSupplicantMachineStats(AaaSupplicantMachineStats aaaSupplicantMachineStats) {
+
+ }
+
+ @Override
+ public void addListener(AaaMachineStatisticsEventListener listener) {
+ aaaMachineStatisticsEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(AaaMachineStatisticsEventListener listener) {
+ aaaMachineStatisticsEventListener = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegrationTest.java
new file mode 100644
index 0000000..e158828
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegrationTest.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.opencord.kafka.EventBusService;
+import org.opencord.olt.AccessDeviceListener;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.olt.AccessSubscriberId;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Set of unit test cases for AccessDeviceKafkaIntegration.
+ */
+class AccessDeviceKafkaIntegrationTest extends KafkaIntegrationTestBase {
+
+ private AccessDeviceKafkaIntegration accessDeviceKakfa;
+ private AccessDeviceListener accessDeviceListener;
+
+ @BeforeEach
+ void setUp() {
+ accessDeviceKakfa = new AccessDeviceKafkaIntegration();
+
+ accessDeviceKakfa.deviceService = new MockDeviceService();
+ accessDeviceKakfa.eventBusService = new MockEventBusService();
+ accessDeviceKakfa.accessDeviceService = new MockAccessDeviceService();
+ accessDeviceKakfa.bindAccessDeviceService(accessDeviceKakfa.accessDeviceService);
+ accessDeviceKakfa.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ accessDeviceKakfa.deactivate();
+ accessDeviceKakfa = null;
+ }
+
+ /**
+ * testcase to perform UNI_ADDED AccessDeviceEvent.
+ */
+ @Test
+ void testUniAdded() {
+ accessDeviceListener.event(getUniAdded());
+ assertEquals(MockEventBusService.kafkaEvents, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * testcase to perform UNI_REMOVED AccessDeviceEvent.
+ */
+ @Test
+ void testUniRemoved() {
+ accessDeviceListener.event(getUniRemoved());
+ assertEquals(MockEventBusService.kafkaEvents, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int kafkaEvents;
+ static int otherCounter;
+
+ MockEventBusService() {
+ kafkaEvents = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ if (topic.equals(AccessDeviceKafkaIntegration.TOPIC)) {
+ kafkaEvents++;
+ } else {
+ otherCounter++;
+ }
+ }
+ }
+
+ private class MockAccessDeviceService implements AccessDeviceService {
+ @Override
+ public boolean provisionSubscriber(ConnectPoint connectPoint) {
+ return false;
+ }
+
+ @Override
+ public boolean removeSubscriber(ConnectPoint connectPoint) {
+ return false;
+ }
+
+ @Override
+ public boolean provisionSubscriber(AccessSubscriberId accessSubscriberId,
+ Optional<VlanId> optional, Optional<VlanId> optional1,
+ Optional<Integer> optional2) {
+ return false;
+ }
+
+ @Override
+ public boolean removeSubscriber(AccessSubscriberId accessSubscriberId,
+ Optional<VlanId> optional, Optional<VlanId> optional1,
+ Optional<Integer> optional2) {
+ return false;
+ }
+
+ @Override
+ public List<DeviceId> fetchOlts() {
+ return null;
+ }
+
+ @Override
+ public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
+ return null;
+ }
+
+ @Override
+ public void addListener(AccessDeviceListener listener) {
+ accessDeviceListener = listener;
+ }
+
+ @Override
+ public void removeListener(AccessDeviceListener listener) {
+ accessDeviceListener = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegrationTest.java
new file mode 100644
index 0000000..d0bf59d
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/BngPppoeKafkaIntegrationTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opencord.bng.PppoeBngControlHandler;
+import org.opencord.bng.PppoeEventListener;
+import org.opencord.kafka.EventBusService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test cases for BngPppoeKafkaIntegration.
+ */
+class BngPppoeKafkaIntegrationTest extends KafkaIntegrationTestBase {
+
+ private BngPppoeKafkaIntegration bngPppoeKafka;
+ private PppoeEventListener pppoeEventListener;
+
+ @BeforeEach
+ void setUp() {
+ bngPppoeKafka = new BngPppoeKafkaIntegration();
+ bngPppoeKafka.eventBusService = new MockEventBusService();
+ bngPppoeKafka.ignore = new MockPppoeBngControlHandler();
+ bngPppoeKafka.bindPppoeBngControl(bngPppoeKafka.ignore);
+ bngPppoeKafka.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ bngPppoeKafka.deactivate();
+ }
+
+ /**
+ * testcase to perform PppoeEvent.
+ */
+ @Test
+ void testbngPpoeEvent() {
+ pppoeEventListener.event(getPppoeEvent());
+ assertEquals(MockEventBusService.kafkaEvents, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int kafkaEvents;
+ static int otherCounter;
+
+ MockEventBusService() {
+ kafkaEvents = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ if (topic.equals(BngPppoeKafkaIntegration.TOPIC_PPPOE)) {
+ kafkaEvents++;
+ } else {
+ otherCounter++;
+ }
+ }
+ }
+
+ private class MockPppoeBngControlHandler implements PppoeBngControlHandler {
+ @Override
+ public void addListener(PppoeEventListener listener) {
+ pppoeEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(PppoeEventListener listener) {
+ pppoeEventListener = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/BngStatsKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/BngStatsKafkaIntegrationTest.java
new file mode 100644
index 0000000..e05139c
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/BngStatsKafkaIntegrationTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onosproject.net.behaviour.BngProgrammable;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.opencord.bng.BngStatsEventListener;
+import org.opencord.bng.BngStatsService;
+import org.opencord.kafka.EventBusService;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test cases to verify BngStatsKafkaIntegration.
+ */
+class BngStatsKafkaIntegrationTest extends KafkaIntegrationTestBase {
+
+
+ private BngStatsKafkaIntegration bngStatsKafkaIntegration;
+ private BngStatsEventListener bngStatsEventListener;
+
+ @BeforeEach
+ void setUp() {
+ bngStatsKafkaIntegration = new BngStatsKafkaIntegration();
+ bngStatsKafkaIntegration.eventBusService = new MockEventBusService();
+ bngStatsKafkaIntegration.ignore = new MockBngStatsService();
+ bngStatsKafkaIntegration.bindBngStatsService(bngStatsKafkaIntegration.ignore);
+ bngStatsKafkaIntegration.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ bngStatsKafkaIntegration.deactivate();
+ }
+
+ @Test
+ void testBngStatsEvent() {
+ bngStatsEventListener.event(getBngStatsEvent());
+ assertEquals(MockEventBusService.kafkaEvents, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int kafkaEvents;
+ static int otherCounter;
+
+ MockEventBusService() {
+ kafkaEvents = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ if (topic.equals(BngStatsKafkaIntegration.TOPIC_STATS)) {
+ kafkaEvents++;
+ } else {
+ otherCounter++;
+ }
+ }
+ }
+
+ private class MockBngStatsService implements BngStatsService {
+ @Override
+ public Map<BngProgrammable.BngCounterType, PiCounterCellData> getStats(String s) {
+ return null;
+ }
+
+ @Override
+ public PiCounterCellData getControlStats() {
+ return null;
+ }
+
+ @Override
+ public void addListener(BngStatsEventListener listener) {
+ bngStatsEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(BngStatsEventListener listener) {
+ bngStatsEventListener = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/DeviceKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/DeviceKafkaIntegrationTest.java
new file mode 100644
index 0000000..e7c8269
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/DeviceKafkaIntegrationTest.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onosproject.net.device.DeviceEvent;
+import org.opencord.kafka.EventBusService;
+
+import static org.junit.Assert.assertEquals;
+import static org.opencord.kafka.integrations.MockDeviceService.DEVICE_ID_1;
+
+/**
+ * set of unit test cases for DeviceKafkaIntegration.
+ */
+class DeviceKafkaIntegrationTest extends KafkaIntegrationTestBase {
+ private DeviceKafkaIntegration deviceKafkaIntegration;
+
+ @BeforeEach
+ void setUp() {
+ deviceKafkaIntegration = new DeviceKafkaIntegration();
+ deviceKafkaIntegration.eventBusService = new MockEventBusService();
+ deviceKafkaIntegration.deviceService = new MockDeviceService();
+ deviceKafkaIntegration.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ deviceKafkaIntegration.deactivate();
+ }
+
+ /**
+ * testcase to verify Port updated event.
+ */
+ @Test
+ void testPortStateUpdate() {
+ DeviceEvent event = new DeviceEvent(DeviceEvent.Type.PORT_UPDATED,
+ deviceKafkaIntegration.deviceService.getDevice(DEVICE_ID_1),
+ new MockDeviceService.MockPort());
+ deviceKafkaIntegration.listener.event(event);
+ assertEquals(MockEventBusService.events, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * testcase to verify Port stats update event.
+ */
+ @Test
+ void testPortStatsUpdate() {
+ DeviceEvent event = new DeviceEvent(DeviceEvent.Type.PORT_STATS_UPDATED,
+ deviceKafkaIntegration.deviceService.getDevice(DEVICE_ID_1),
+ new MockDeviceService.MockPort());
+ deviceKafkaIntegration.listener.event(event);
+ assertEquals(MockEventBusService.kpis, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int kpis;
+ static int events;
+ static int otherCounter;
+
+ MockEventBusService() {
+ kpis = 0;
+ events = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ switch (topic) {
+ case DeviceKafkaIntegration.TOPIC:
+ kpis++;
+ break;
+ case DeviceKafkaIntegration.PORT_EVENT_TOPIC:
+ events++;
+ break;
+ default:
+ otherCounter++;
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegrationTest.java
new file mode 100644
index 0000000..f726b22
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegrationTest.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onlab.packet.DHCP;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.dhcpl2relay.DhcpAllocationInfo;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayListener;
+import org.opencord.dhcpl2relay.DhcpL2RelayService;
+import org.opencord.kafka.EventBusService;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.opencord.kafka.integrations.MockDeviceService.DEVICE_ID_1;
+
+/**
+ * set of unit test cases for DhcpL2RelayKafkaIntegration.
+ */
+class DhcpL2RelayKafkaIntegrationTest extends KafkaIntegrationTestBase {
+
+ private static final String DHCP_COUNTER_TOPIC = "DHCPREQUEST";
+ private DhcpL2RelayKafkaIntegration dhcpL2RelayKafkaIntegration;
+ private DhcpL2RelayListener dhcpL2RelayListener;
+
+ @BeforeEach
+ void setUp() {
+ dhcpL2RelayKafkaIntegration = new DhcpL2RelayKafkaIntegration();
+ dhcpL2RelayKafkaIntegration.clusterService = new ClusterServiceAdapter();
+ dhcpL2RelayKafkaIntegration.deviceService = new MockDeviceService();
+ dhcpL2RelayKafkaIntegration.eventBusService = new MockEventBusService();
+ dhcpL2RelayKafkaIntegration.ignore = new MockDhcpL2RelayService();
+ dhcpL2RelayKafkaIntegration.bindDhcpL2RelayService(dhcpL2RelayKafkaIntegration.ignore);
+ dhcpL2RelayKafkaIntegration.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ dhcpL2RelayKafkaIntegration.deactivate();
+ }
+
+ @Test
+ void testDhcpL2RelayStatsUpdate() {
+ Map.Entry<String, AtomicLong> entryCounter = Maps.immutableEntry(DHCP_COUNTER_TOPIC,
+ new AtomicLong(1));
+ DhcpAllocationInfo allocationInfo = new DhcpAllocationInfo(
+ OLT_CONNECT_POINT,
+ DHCP.MsgType.DHCPREQUEST, ONU_SERIAL, OLT_MAC,
+ LOCAL_IP);
+ DhcpL2RelayEvent event = new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE,
+ allocationInfo, OLT_CONNECT_POINT, entryCounter, DHCP_COUNTER_TOPIC, null);
+ dhcpL2RelayListener.event(event);
+ assertEquals(MockEventBusService.dhcpStats, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * test to verify the DhcpL2RelayEvent.STATS_UPDATE event.
+ */
+ @Test
+ void testDhcpL2RelayStatsSubscriberUpdate() {
+ Map.Entry<String, AtomicLong> entryCounter = Maps.immutableEntry(DHCP_COUNTER_TOPIC, new AtomicLong(1));
+
+ DhcpAllocationInfo allocationInfo = new DhcpAllocationInfo(
+ OLT_CONNECT_POINT,
+ DHCP.MsgType.DHCPREQUEST, ONU_SERIAL, OLT_MAC,
+ LOCAL_IP);
+ DhcpL2RelayEvent event = new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE,
+ allocationInfo, OLT_CONNECT_POINT, entryCounter, DHCP_COUNTER_TOPIC, ONU_SERIAL);
+ dhcpL2RelayListener.event(event);
+ assertEquals(MockEventBusService.dhcpStats, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ /**
+ * test to verify the DhcpL2RelayEvent.UPDATE event.
+ */
+ @Test
+ void testDhcpL2RelayUpdate() {
+ ConnectPoint cp = new ConnectPoint(DEVICE_ID_1, PORT.number());
+ Map.Entry<String, AtomicLong> entryCounter = Maps.immutableEntry(DHCP_COUNTER_TOPIC, new AtomicLong(1));
+ DhcpAllocationInfo allocationInfo = new DhcpAllocationInfo(
+ new ConnectPoint(DEVICE_ID_1, PORT.number()),
+ DHCP.MsgType.DHCPREQUEST, ONU_SERIAL, OLT_MAC,
+ LOCAL_IP);
+ DhcpL2RelayEvent event = new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED,
+ allocationInfo, cp, entryCounter, DHCP_COUNTER_TOPIC, null);
+ dhcpL2RelayListener.event(event);
+ assertEquals(MockEventBusService.dhcpEvents, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int dhcpStats;
+ static int dhcpEvents;
+ static int otherCounter;
+
+ MockEventBusService() {
+ dhcpStats = 0;
+ dhcpEvents = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ switch (topic) {
+ case DhcpL2RelayKafkaIntegration.DHCP_STATS_TOPIC:
+ dhcpStats++;
+ break;
+ case DhcpL2RelayKafkaIntegration.TOPIC:
+ dhcpEvents++;
+ break;
+ default:
+ otherCounter++;
+ break;
+ }
+ }
+ }
+
+ private class MockDhcpL2RelayService implements DhcpL2RelayService {
+ @Override
+ public void addListener(DhcpL2RelayListener listener) {
+ dhcpL2RelayListener = listener;
+ }
+
+ @Override
+ public void removeListener(DhcpL2RelayListener listener) {
+ dhcpL2RelayListener = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java
new file mode 100644
index 0000000..b97fc89
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opencord.igmpproxy.IgmpStatistics;
+import org.opencord.igmpproxy.IgmpStatisticsEventListener;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.kafka.EventBusService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test cases for IgmpKafkaIntegration.
+ */
+public class IgmpKafkaIntegrationTest extends KafkaIntegrationTestBase {
+ private IgmpKafkaIntegration igmpKafkaIntegration;
+ private IgmpStatisticsEventListener igmpStatisticsEventListener;
+
+ @BeforeEach
+ void setup() {
+ igmpKafkaIntegration = new IgmpKafkaIntegration();
+ igmpKafkaIntegration.eventBusService = new MockEventBusService();
+ igmpKafkaIntegration.ignore = new MockIgmpStatisticsService();
+ igmpKafkaIntegration.bindIgmpStatService(igmpKafkaIntegration.ignore);
+ igmpKafkaIntegration.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ igmpKafkaIntegration.unbindIgmpStatService(igmpKafkaIntegration.ignore);
+ igmpKafkaIntegration.deactivate();
+ }
+
+ /**
+ * test to verify IgmpStatisticsEvent event.
+ */
+ @Test
+ void testIgmpStatisticsEvent() {
+ igmpStatisticsEventListener.event(getIgmpStatisticsEvent());
+ assertEquals(MockEventBusService.igmpstats, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int igmpstats;
+ static int otherCounter;
+
+ MockEventBusService() {
+ igmpstats = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ if (topic.equals(IgmpKafkaIntegration.IGMP_STATISTICS_TOPIC)) {
+ igmpstats++;
+ } else {
+ otherCounter++;
+ }
+ }
+ }
+
+ private class MockIgmpStatisticsService implements IgmpStatisticsService {
+ @Override
+ public IgmpStatistics getIgmpStats() {
+ return new IgmpStatistics();
+ }
+
+ @Override
+ public void addListener(IgmpStatisticsEventListener listener) {
+ igmpStatisticsEventListener = listener;
+ }
+
+ @Override
+ public void removeListener(IgmpStatisticsEventListener listener) {
+ igmpStatisticsEventListener = null;
+ }
+ }
+}
diff --git a/src/test/java/org/opencord/kafka/integrations/KafkaIntegrationTestBase.java b/src/test/java/org/opencord/kafka/integrations/KafkaIntegrationTestBase.java
new file mode 100644
index 0000000..e66e8e5
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/KafkaIntegrationTestBase.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.kafka.integrations;
+
+import com.google.common.collect.Lists;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.BngProgrammable;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaStatistics;
+import org.opencord.aaa.AaaSupplicantMachineStats;
+import org.opencord.aaa.AuthenticationEvent;
+import org.opencord.aaa.AuthenticationStatisticsEvent;
+import org.opencord.aaa.RadiusOperationalStatusEvent;
+import org.opencord.bng.BngStatsEvent;
+import org.opencord.bng.BngStatsEventSubject;
+import org.opencord.bng.PppoeBngAttachment;
+import org.opencord.bng.PppoeEvent;
+import org.opencord.bng.PppoeEventSubject;
+import org.opencord.cordmcast.CordMcastStatistics;
+import org.opencord.cordmcast.CordMcastStatisticsEvent;
+import org.opencord.igmpproxy.IgmpStatistics;
+import org.opencord.igmpproxy.IgmpStatisticsEvent;
+import org.opencord.olt.AccessDeviceEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.opencord.kafka.integrations.MockDeviceService.DEVICE_ID_1;
+
+/**
+ * Base class for the KafkaIntegration tests classes.
+ */
+public class KafkaIntegrationTestBase {
+ protected static final PortNumber PORT_NUMBER = PortNumber.portNumber(1);
+ protected static final Port PORT = new MockDeviceService.MockPort();
+ protected static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
+ protected static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
+ protected static final Short TPID = 8;
+ protected static final VlanId C_TAG = VlanId.vlanId((short) 999);
+ protected static final VlanId S_TAG = VlanId.vlanId((short) 111);
+ protected static final String ONU_SERIAL = "TWSH00008084";
+ protected static final IpAddress LOCAL_IP = IpAddress.valueOf("127.0.0.1");
+ protected static final MacAddress OLT_MAC = MacAddress.valueOf("c6:b1:cd:40:dc:93");
+ protected static final Short SESSION_ID = 2;
+ protected static final ConnectPoint OLT_CONNECT_POINT = new ConnectPoint(MockDeviceService.DEVICE_ID_1,
+ PORT_NUMBER);
+
+ protected AuthenticationEvent getAuthenticationEvent() {
+ return new AuthenticationEvent(AuthenticationEvent.Type.APPROVED,
+ OLT_CONNECT_POINT);
+ }
+
+ protected AuthenticationStatisticsEvent getAuthenticationStatisticsEvent() {
+ return new AuthenticationStatisticsEvent(
+ AuthenticationStatisticsEvent.Type.STATS_UPDATE,
+ new AaaStatistics());
+ }
+
+ protected RadiusOperationalStatusEvent getRadiusOperationalStatusEvent() {
+ return new RadiusOperationalStatusEvent(
+ RadiusOperationalStatusEvent.Type.RADIUS_OPERATIONAL_STATUS,
+ "AUTHENTICATED");
+ }
+
+ protected AaaMachineStatisticsEvent getAaaMachineStatisticsEvent() {
+ return new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE,
+ new AaaSupplicantMachineStats());
+ }
+
+ protected AccessDeviceEvent getUniAdded() {
+ return new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED,
+ DEVICE_ID_1, PORT, CLIENT_S_TAG, CLIENT_C_TAG, (int) TPID);
+ }
+
+ protected AccessDeviceEvent getUniRemoved() {
+ return new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED,
+ DEVICE_ID_1, PORT, CLIENT_S_TAG, CLIENT_C_TAG, (int) TPID);
+ }
+
+ protected PppoeEvent getPppoeEvent() {
+ return new PppoeEvent(PppoeEvent.EventType.AUTH_SUCCESS,
+ new PppoeEventSubject(OLT_CONNECT_POINT,
+ LOCAL_IP, OLT_MAC,
+ ONU_SERIAL, SESSION_ID, S_TAG, C_TAG));
+ }
+
+ protected BngStatsEvent getBngStatsEvent() {
+ PppoeBngAttachment pppoeBngAttachment = (PppoeBngAttachment) PppoeBngAttachment.builder()
+ .withPppoeSessionId(SESSION_ID)
+ .withCTag(C_TAG)
+ .withIpAddress(LOCAL_IP)
+ .withMacAddress(OLT_MAC)
+ .withOltConnectPoint(OLT_CONNECT_POINT)
+ .withOnuSerial(ONU_SERIAL)
+ .withQinqTpid(TPID)
+ .withSTag(S_TAG)
+ .build();
+ Map<BngProgrammable.BngCounterType, PiCounterCellData> attachmentStats = new HashMap<>();
+ attachmentStats.put(BngProgrammable.BngCounterType.CONTROL_PLANE,
+ new PiCounterCellData(1024, 1024));
+ BngStatsEventSubject subject = new BngStatsEventSubject("PppoeKey",
+ pppoeBngAttachment, attachmentStats);
+ return new BngStatsEvent(BngStatsEvent.EventType.STATS_UPDATED, subject);
+ }
+
+ protected IgmpStatisticsEvent getIgmpStatisticsEvent() {
+ return new IgmpStatisticsEvent(
+ IgmpStatisticsEvent.Type.STATS_UPDATE, new IgmpStatistics());
+ }
+
+ protected CordMcastStatisticsEvent getCordMcastStatisticsEvent() {
+ List<CordMcastStatistics> statsList = Lists.newArrayList(
+ new CordMcastStatistics(IpAddress.valueOf("172.16.34.34"),
+ "192.168.0.21", VlanId.vlanId("100")),
+ new CordMcastStatistics(IpAddress.valueOf("172.16.35.35"),
+ "192.168.0.22", VlanId.vlanId("101"))
+ );
+ return new CordMcastStatisticsEvent(
+ CordMcastStatisticsEvent.Type.STATUS_UPDATE, statsList);
+ }
+}
diff --git a/src/test/java/org/opencord/kafka/integrations/McastKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/McastKafkaIntegrationTest.java
new file mode 100644
index 0000000..135239a
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/McastKafkaIntegrationTest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.kafka.integrations;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onlab.packet.VlanId;
+import org.opencord.cordmcast.CordMcastStatisticsEventListener;
+import org.opencord.cordmcast.CordMcastStatisticsService;
+import org.opencord.kafka.EventBusService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * set of unit test cases for McastKafkaIntegration.
+ */
+public class McastKafkaIntegrationTest extends KafkaIntegrationTestBase {
+ private McastKafkaIntegration mcastKafkaIntegration;
+ private CordMcastStatisticsEventListener mcastStatsListerner;
+
+ @BeforeEach
+ void setup() {
+ mcastKafkaIntegration = new McastKafkaIntegration();
+ mcastKafkaIntegration.eventBusService = new MockEventBusService();
+ mcastKafkaIntegration.cordMcastStatisticsService = new MockCordMcastStatisticsService();
+ mcastKafkaIntegration.bindMcastStatisticsService(mcastKafkaIntegration.cordMcastStatisticsService);
+ mcastKafkaIntegration.activate();
+ }
+
+ @AfterEach
+ void tearDown() {
+ mcastKafkaIntegration.unbindMcastStatisticsService(mcastKafkaIntegration.cordMcastStatisticsService);
+ mcastKafkaIntegration.deactivate();
+ }
+
+ /**
+ * test to verify CordMcastStatisticsEvent event.
+ */
+ @Test
+ void testHandleEvent() {
+ mcastStatsListerner.event(getCordMcastStatisticsEvent());
+ assertEquals(MockEventBusService.mcastStats, 1);
+ assertEquals(MockEventBusService.otherCounter, 0);
+
+ }
+
+ private static class MockEventBusService implements EventBusService {
+ static int mcastStats;
+ static int otherCounter;
+
+ MockEventBusService() {
+ mcastStats = 0;
+ otherCounter = 0;
+ }
+
+ @Override
+ public void send(String topic, JsonNode data) {
+ if (topic.equals(McastKafkaIntegration.MCAST_OPERATIONAL_STATUS_TOPIC)) {
+ mcastStats++;
+ } else {
+ otherCounter++;
+ }
+ }
+ }
+
+ private class MockCordMcastStatisticsService implements CordMcastStatisticsService {
+ @Override
+ public void setVlanValue(VlanId vlanId) {
+ }
+
+ @Override
+ public void addListener(CordMcastStatisticsEventListener listener) {
+ mcastStatsListerner = listener;
+ }
+
+ @Override
+ public void removeListener(CordMcastStatisticsEventListener listener) {
+ mcastStatsListerner = null;
+ }
+ }
+}
diff --git a/src/test/java/org/opencord/kafka/integrations/MockDeviceService.java b/src/test/java/org/opencord/kafka/integrations/MockDeviceService.java
new file mode 100644
index 0000000..5d83cc8
--- /dev/null
+++ b/src/test/java/org/opencord/kafka/integrations/MockDeviceService.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.ElementId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.provider.ProviderId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * DeviceServiceAdapter mocker class.
+ */
+public class MockDeviceService extends DeviceServiceAdapter {
+
+ public static final String OLT_DEV_ID = "of:0000c6b1cd40dc93";
+ public static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
+ private static final String SCHEME_NAME = "kafka-onos";
+ private static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+ private final ProviderId providerId = new ProviderId("of", "foo");
+ private final Device device1 = new MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
+ "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
+ DEVICE_ANNOTATIONS);
+
+
+ @Override
+ public Device getDevice(DeviceId devId) {
+ return device1;
+ }
+
+ @Override
+ public Iterable<Device> getDevices() {
+ List<Device> devices = new ArrayList<>();
+ devices.add(device1);
+ return devices;
+ }
+
+ @Override
+ public Port getPort(ConnectPoint cp) {
+ return new MockPort();
+ }
+
+ @Override
+ public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
+ PortStatistics ps = new PortStatistics() {
+ @Override
+ public PortNumber portNumber() {
+ return PortNumber.portNumber(1);
+ }
+
+ @Override
+ public long packetsReceived() {
+ return 100;
+ }
+
+ @Override
+ public long packetsSent() {
+ return 10;
+ }
+
+ @Override
+ public long bytesReceived() {
+ return 100;
+ }
+
+ @Override
+ public long bytesSent() {
+ return 10;
+ }
+
+ @Override
+ public long packetsRxDropped() {
+ return 0;
+ }
+
+ @Override
+ public long packetsTxDropped() {
+ return 1;
+ }
+
+ @Override
+ public long packetsRxErrors() {
+ return 1;
+ }
+
+ @Override
+ public long packetsTxErrors() {
+ return 1;
+ }
+
+ @Override
+ public long durationSec() {
+ return 100;
+ }
+
+ @Override
+ public long durationNano() {
+ return 100 * 1000;
+ }
+
+ @Override
+ public boolean isZero() {
+ return false;
+ }
+ };
+ return Lists.newArrayList(ps);
+ }
+
+ /**
+ * Port object mock.
+ */
+ public static class MockPort implements Port {
+
+ @Override
+ public boolean isEnabled() {
+ return true;
+ }
+
+ public long portSpeed() {
+ return 1000;
+ }
+
+ public Element element() {
+ return new Element() {
+ @Override
+ public ElementId id() {
+ return DEVICE_ID_1;
+ }
+
+ @Override
+ public Annotations annotations() {
+ return null;
+ }
+
+ @Override
+ public ProviderId providerId() {
+ return null;
+ }
+
+ @Override
+ public <B extends Behaviour> B as(Class<B> projectionClass) {
+ return null;
+ }
+
+ @Override
+ public <B extends Behaviour> boolean is(Class<B> projectionClass) {
+ return false;
+ }
+ };
+ }
+
+ public PortNumber number() {
+ return PortNumber.portNumber(1);
+ }
+
+ public Annotations annotations() {
+ return new MockAnnotations();
+ }
+
+ public Type type() {
+ return Port.Type.FIBER;
+ }
+
+ private static class MockAnnotations implements Annotations {
+
+ @Override
+ public String value(String val) {
+ return "nni-";
+ }
+
+ public Set<String> keys() {
+ return Sets.newHashSet("portName");
+ }
+ }
+ }
+
+ /**
+ * Device mock.
+ */
+ protected static class MockDevice extends DefaultDevice {
+
+ /*
+ Mocks OLT device.
+ */
+ public MockDevice(ProviderId providerId, DeviceId id, Type type,
+ String manufacturer, String hwVersion, String swVersion,
+ String serialNumber, ChassisId chassisId, Annotations... annotations) {
+ super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+ chassisId, annotations);
+ }
+ }
+}
diff --git a/src/test/resources/localKafkaConfig.json b/src/test/resources/localKafkaConfig.json
new file mode 100644
index 0000000..d27dea5
--- /dev/null
+++ b/src/test/resources/localKafkaConfig.json
@@ -0,0 +1,4 @@
+{
+ "bootstrapServers": "localhost:9092"
+}
+