[VOL-3661] PPPoE IA application - initial commit
Change-Id: Idaf23f8736cba955fe8a3049b8fc9c85b3cd3ab9
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
new file mode 100644
index 0000000..a67ffca
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+/**
+ * Shows the PPPoE sessions/users learned by the agent.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoe-users",
+ description = "Shows the PPPoE users")
+public class PppoeAgentShowUsersCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "mac", description = "MAC address related to PPPoE session.",
+ required = false, multiValued = false)
+ private String macStr = null;
+
+ @Override
+ protected void doExecute() {
+ MacAddress macAddress = null;
+ if (macStr != null && !macStr.isEmpty()) {
+ try {
+ macAddress = MacAddress.valueOf(macStr);
+ } catch (IllegalArgumentException e) {
+ log.error(e.getMessage());
+ return;
+ }
+ }
+
+ DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+ PppoeAgentService pppoeAgentService = AbstractShellCommand.get(PppoeAgentService.class);
+
+ if (macAddress != null) {
+ PppoeSessionInfo singleInfo = pppoeAgentService.getSessionsMap().get(macAddress);
+ if (singleInfo != null) {
+ Port devicePort = deviceService.getPort(singleInfo.getClientCp());
+ printPppoeInfo(macAddress, singleInfo, devicePort);
+ } else {
+ print("No session information found for provided MAC address %s", macAddress.toString());
+ }
+ } else {
+ pppoeAgentService.getSessionsMap().forEach((mac, sessionInfo) -> {
+ final Port devicePortFinal = deviceService.getPort(sessionInfo.getClientCp());
+ printPppoeInfo(mac, sessionInfo, devicePortFinal);
+ });
+ }
+ }
+
+ private void printPppoeInfo(MacAddress macAddr, PppoeSessionInfo sessionInfo, Port devicePort) {
+ PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(sessionInfo.getPacketCode());
+ ConnectPoint cp = sessionInfo.getClientCp();
+ String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+ "UNKNOWN";
+
+ print("MacAddress=%s,SessionId=%s,CurrentState=%s,LastReceivedPacket=%s,DeviceId=%s,PortNumber=%s," +
+ "SubscriberId=%s",
+ macAddr.toString(), String.valueOf(sessionInfo.getSessionId()),
+ sessionInfo.getCurrentState(), lastReceivedPkt.name(),
+ cp.deviceId().toString(), cp.port().toString(), subscriberId);
+ }
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
new file mode 100644
index 0000000..885bf36
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import java.util.Collections;
+import java.util.Map;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+
+/**
+ * Display/Reset the PPPoE Agent application statistics.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoeagent-stats",
+ description = "Display or Reset the PPPoE Agent application statistics.")
+public class PppoeAgentStatsCommand extends AbstractShellCommand {
+ private static final String CONFIRM_PHRASE = "please";
+
+ @Option(name = "-r", aliases = "--reset", description = "Resets a specific counter.\n",
+ required = false, multiValued = false)
+ PppoeAgentCounterNames reset = null;
+
+ @Option(name = "-R", aliases = "--reset-all", description = "Resets all counters.\n",
+ required = false, multiValued = false)
+ boolean resetAll = false;
+
+ @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id.\n",
+ required = false, multiValued = false)
+ String subscriberId = null;
+
+ @Option(name = "-p", aliases = "--please", description = "Confirmation phrase.",
+ required = false, multiValued = false)
+ String please = null;
+
+ @Argument(index = 0, name = "counter",
+ description = "The counter to display. In case not specified, all counters will be displayed.",
+ required = false, multiValued = false)
+ PppoeAgentCounterNames counter = null;
+
+ @Override
+ protected void doExecute() {
+ PppoeAgentCountersStore pppoeCounters = AbstractShellCommand.get(PppoeAgentCountersStore.class);
+
+ if ((subscriberId == null) || (subscriberId.equals("global"))) {
+ // All subscriber Ids
+ subscriberId = PppoeAgentEvent.GLOBAL_COUNTER;
+ }
+
+ if (resetAll || reset != null) {
+ if (please == null || !please.equals(CONFIRM_PHRASE)) {
+ print("WARNING: Be aware that you are going to reset the counters. " +
+ "Enter confirmation phrase to continue.");
+ return;
+ }
+ if (resetAll) {
+ // Reset all counters.
+ pppoeCounters.resetCounters(subscriberId);
+ } else {
+ // Reset the specified counter.
+ pppoeCounters.setCounter(subscriberId, reset, (long) 0);
+ }
+ } else {
+ Map<PppoeAgentCountersIdentifier, Long> countersMap = pppoeCounters.getCounters().counters();
+ if (countersMap.size() > 0) {
+ if (counter == null) {
+ String jsonString = "";
+ if (outputJson()) {
+ jsonString = String.format("{\"%s\":{", pppoeCounters.NAME);
+ } else {
+ print("%s [%s] :", pppoeCounters.NAME, subscriberId);
+ }
+ PppoeAgentCounterNames[] counters = PppoeAgentCounterNames.values();
+ for (int i = 0; i < counters.length; i++) {
+ PppoeAgentCounterNames counterType = counters[i];
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counterType));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ jsonString += String.format("\"%s\":%d", counterType, value);
+ if (i < counters.length - 1) {
+ jsonString += ",";
+ }
+ } else {
+ printCounter(counterType, value);
+ }
+ }
+ if (outputJson()) {
+ jsonString += "}}";
+ print("%s", jsonString);
+ }
+ } else {
+ // Show only the specified counter
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counter));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ print("{\"%s\":%d}", counter, value);
+ } else {
+ printCounter(counter, value);
+ }
+ }
+ } else {
+ print("No PPPoE Agent Counters were created yet for counter class [%s]",
+ PppoeAgentEvent.GLOBAL_COUNTER);
+ }
+ }
+ }
+
+ void printCounter(PppoeAgentCounterNames c, long value) {
+ // print in non-JSON format
+ print(" %s %s %-4d", c,
+ String.join("", Collections.nCopies(50 - c.toString().length(), ".")), value);
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
new file mode 100644
index 0000000..515908f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * CLI commands for the PPPoE agent.
+ */
+package org.opencord.pppoeagent.cli;
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..65c8310
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+ private OsgiPropertyConstants() {
+ }
+
+ public static final String ENABLE_CIRCUIT_ID_VALIDATION = "enableCircuitIdValidation";
+ public static final boolean ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT = true;
+
+ public static final String PPPOE_COUNTERS_TOPIC = "pppoeCountersTopic";
+ public static final String PPPOE_COUNTERS_TOPIC_DEFAULT = "onos.pppoe.stats.kpis";
+
+ public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+ public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+ public static final String PPPOE_MAX_MTU = "pppoeMaxMtu";
+ public static final int PPPOE_MAX_MTU_DEFAULT = 1500;
+
+ public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+ public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
+ public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+ public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
new file mode 100644
index 0000000..67c71da
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+import com.google.common.collect.Sets;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentListener;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.opencord.pppoeagent.util.CircuitIdBuilder;
+import org.opencord.pppoeagent.util.CircuitIdFieldName;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onosproject.store.service.Versioned;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADI;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADO;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADR;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADS;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADT;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_AC_SYSTEM_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_GENERIC_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_SERVICE_NAME_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC;
+
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS_DEFAULT;
+
+/**
+ * PPPoE Intermediate Agent application.
+ */
+@Component(immediate = true,
+property = {
+ PPPOE_MAX_MTU + ":Integer=" + PPPOE_MAX_MTU_DEFAULT,
+ ENABLE_CIRCUIT_ID_VALIDATION + ":Boolean=" + ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT,
+ PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
+})
+public class PppoeAgent
+ extends AbstractListenerManager<PppoeAgentEvent, PppoeAgentListener>
+ implements PppoeAgentService {
+ private static final String APP_NAME = "org.opencord.pppoeagent";
+ private static final short QINQ_VID_NONE = (short) -1;
+
+ private final InternalConfigListener cfgListener = new InternalConfigListener();
+ private final Set<ConfigFactory> factories = ImmutableSet.of(
+ new ConfigFactory<>(APP_SUBJECT_FACTORY, PppoeAgentConfig.class, "pppoeagent") {
+ @Override
+ public PppoeAgentConfig createConfig() {
+ return new PppoeAgentConfig();
+ }
+ }
+ );
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetworkConfigRegistry cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected SadisService sadisService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PppoeAgentCountersStore pppoeAgentCounters;
+
+ // OSGi Properties
+ protected int pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+ protected boolean enableCircuitIdValidation = ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+ protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
+ private ApplicationId appId;
+ private InnerDeviceListener deviceListener = new InnerDeviceListener();
+ private InnerMastershipListener changeListener = new InnerMastershipListener();
+ private PppoeAgentPacketProcessor pppoeAgentPacketProcessor = new PppoeAgentPacketProcessor();
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private PppoeAgentStoreDelegate delegate = new InnerPppoeAgentStoreDelegate();
+
+ Set<ConnectPoint> pppoeConnectPoints;
+ protected AtomicReference<ConnectPoint> pppoeServerConnectPoint = new AtomicReference<>();
+ protected boolean useOltUplink = false;
+
+ static ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap;
+
+ @Override
+ public Map<MacAddress, PppoeSessionInfo> getSessionsMap() {
+ return ImmutableMap.copyOf(sessionsMap.asJavaMap());
+ }
+
+ @Override
+ public void clearSessionsMap() {
+ sessionsMap.clear();
+ }
+
+ private final ArrayList<CircuitIdFieldName> circuitIdfields = new ArrayList<>(Arrays.asList(
+ CircuitIdFieldName.AcessNodeIdentifier,
+ CircuitIdFieldName.Slot,
+ CircuitIdFieldName.Port,
+ CircuitIdFieldName.OnuSerialNumber));
+
+ protected ExecutorService packetProcessorExecutor;
+
+ @Activate
+ protected void activate(ComponentContext context) {
+ eventDispatcher.addSink(PppoeAgentEvent.class, listenerRegistry);
+
+ appId = coreService.registerApplication(APP_NAME);
+ cfgService.addListener(cfgListener);
+ componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PppoeSessionInfo.class)
+ .register(MacAddress.class)
+ .register(SubscriberAndDeviceInformation.class)
+ .register(UniTagInformation.class)
+ .register(ConnectPoint.class)
+ .build();
+
+ sessionsMap = storageService.<MacAddress, PppoeSessionInfo>consistentMapBuilder()
+ .withName("pppoeagent-sessions")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ factories.forEach(cfgService::registerConfigFactory);
+ deviceService.addListener(deviceListener);
+ subsService = sadisService.getSubscriberInfoService();
+ mastershipService.addListener(changeListener);
+ pppoeAgentCounters.setDelegate(delegate);
+ updateConfig();
+ packetService.addProcessor(pppoeAgentPacketProcessor, PacketProcessor.director(0));
+ if (context != null) {
+ modified(context);
+ }
+ log.info("PPPoE Intermediate Agent Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ cfgService.removeListener(cfgListener);
+ factories.forEach(cfgService::unregisterConfigFactory);
+ packetService.removeProcessor(pppoeAgentPacketProcessor);
+ packetProcessorExecutor.shutdown();
+ componentConfigService.unregisterProperties(getClass(), false);
+ deviceService.removeListener(deviceListener);
+ eventDispatcher.removeSink(PppoeAgentEvent.class);
+ pppoeAgentCounters.unsetDelegate(delegate);
+ log.info("PPPoE Intermediate Agent Stopped");
+ }
+
+ private void updateConfig() {
+ PppoeAgentConfig cfg = cfgService.getConfig(appId, PppoeAgentConfig.class);
+ if (cfg == null) {
+ log.warn("PPPoE server info not available");
+ return;
+ }
+
+ synchronized (this) {
+ pppoeConnectPoints = Sets.newConcurrentHashSet(cfg.getPppoeServerConnectPoint());
+ }
+ useOltUplink = cfg.getUseOltUplinkForServerPktInOut();
+ }
+
+ /**
+ * Returns whether the passed port is the uplink port of the olt device.
+ */
+ private boolean isUplinkPortOfOlt(DeviceId dId, Port p) {
+ log.debug("isUplinkPortOfOlt: DeviceId: {} Port: {}", dId, p);
+ if (!mastershipService.isLocalMaster(dId)) {
+ return false;
+ }
+
+ Device d = deviceService.getDevice(dId);
+ SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+ if (deviceInfo != null) {
+ return (deviceInfo.uplinkPort() == p.number().toLong());
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the connectPoint which is the uplink port of the OLT.
+ */
+ private ConnectPoint getUplinkConnectPointOfOlt(DeviceId dId) {
+ Device d = deviceService.getDevice(dId);
+ SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+ log.debug("getUplinkConnectPointOfOlt DeviceId: {} devInfo: {}", dId, deviceInfo);
+ if (deviceInfo != null) {
+ PortNumber pNum = PortNumber.portNumber(deviceInfo.uplinkPort());
+ Port port = deviceService.getPort(d.id(), pNum);
+ if (port != null) {
+ return new ConnectPoint(d.id(), pNum);
+ }
+ }
+ return null;
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer newPppoeMaxMtu = getIntegerProperty(properties, PPPOE_MAX_MTU);
+ if (newPppoeMaxMtu != null) {
+ if (newPppoeMaxMtu != pppoeMaxMtu && newPppoeMaxMtu >= 0) {
+ log.info("PPPPOE MTU modified from {} to {}", pppoeMaxMtu, newPppoeMaxMtu);
+ pppoeMaxMtu = newPppoeMaxMtu;
+ } else if (newPppoeMaxMtu < 0) {
+ log.error("Invalid newPppoeMaxMtu : {}, defaulting to 1492", newPppoeMaxMtu);
+ pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+ }
+ }
+
+ Boolean newEnableCircuitIdValidation = Tools.isPropertyEnabled(properties, ENABLE_CIRCUIT_ID_VALIDATION);
+ if (newEnableCircuitIdValidation != null) {
+ if (enableCircuitIdValidation != newEnableCircuitIdValidation) {
+ log.info("Property enableCircuitIdValidation modified from {} to {}",
+ enableCircuitIdValidation, newEnableCircuitIdValidation);
+ enableCircuitIdValidation = newEnableCircuitIdValidation;
+ }
+ }
+
+ String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+
+ int oldpacketProcessorThreads = packetProcessorThreads;
+ packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+ : Integer.parseInt(s.trim());
+ if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+ if (packetProcessorExecutor != null) {
+ packetProcessorExecutor.shutdown();
+ }
+ packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+ groupedThreads("onos/pppoe",
+ "pppoe-packet-%d", log));
+ }
+ }
+
+ /**
+ * Selects a connect point through an available device for which it is the master.
+ */
+ private void selectServerConnectPoint() {
+ synchronized (this) {
+ pppoeServerConnectPoint.set(null);
+ if (pppoeConnectPoints != null) {
+ // find a connect point through a device for which we are master
+ for (ConnectPoint cp: pppoeConnectPoints) {
+ if (mastershipService.isLocalMaster(cp.deviceId())) {
+ if (deviceService.isAvailable(cp.deviceId())) {
+ pppoeServerConnectPoint.set(cp);
+ }
+ log.info("PPPOE connectPoint selected is {}", cp);
+ break;
+ }
+ }
+ }
+ log.info("PPPOE Server connectPoint is {}", pppoeServerConnectPoint.get());
+ if (pppoeServerConnectPoint.get() == null) {
+ log.error("Master of none, can't relay PPPOE messages to server");
+ }
+ }
+ }
+
+ private SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+ Port p = deviceService.getPort(cp);
+ String subscriberId = p.annotations().value(AnnotationKeys.PORT_NAME);
+ return subsService.get(subscriberId);
+ }
+
+ private SubscriberAndDeviceInformation getDevice(PacketContext context) {
+ String serialNo = deviceService.getDevice(context.inPacket().
+ receivedFrom().deviceId()).serialNumber();
+
+ return subsService.get(serialNo);
+ }
+
+ private UniTagInformation getUnitagInformationFromPacketContext(PacketContext context,
+ SubscriberAndDeviceInformation sub) {
+ return sub.uniTagList()
+ .stream()
+ .filter(u -> u.getPonCTag().toShort() == context.inPacket().parsed().getVlanID())
+ .findFirst()
+ .orElse(null);
+ }
+
+ private boolean removeSessionsByConnectPoint(ConnectPoint cp) {
+ boolean removed = false;
+ for (MacAddress key : sessionsMap.keySet()) {
+ PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+ if (entry.getClientCp().equals(cp)) {
+ sessionsMap.remove(key);
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private boolean removeSessionsByDevice(DeviceId devid) {
+ boolean removed = false;
+ for (MacAddress key : sessionsMap.keySet()) {
+ PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+ if (entry.getClientCp().deviceId().equals(devid)) {
+ sessionsMap.remove(key);
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private class PppoeAgentPacketProcessor implements PacketProcessor {
+ @Override
+ public void process(PacketContext context) {
+ packetProcessorExecutor.execute(() -> {
+ processInternal(context);
+ });
+ }
+
+ private void processInternal(PacketContext context) {
+ Ethernet packet = (Ethernet) context.inPacket().parsed().clone();
+ if (packet.getEtherType() == Ethernet.TYPE_PPPOED) {
+ processPppoedPacket(context, packet);
+ }
+ }
+
+ private void processPppoedPacket(PacketContext context, Ethernet packet) {
+ PPPoED pppoed = (PPPoED) packet.getPayload();
+ if (pppoed == null) {
+ log.warn("PPPoED payload is null");
+ return;
+ }
+
+ final byte pppoedCode = pppoed.getCode();
+ final short sessionId = pppoed.getSessionId();
+ final MacAddress clientMacAddress;
+ final ConnectPoint packetCp = context.inPacket().receivedFrom();
+ boolean serverMessage = false;
+
+ // Get the client MAC address
+ switch (pppoedCode) {
+ case PPPOED_CODE_PADT: {
+ if (sessionsMap.containsKey(packet.getDestinationMAC())) {
+ clientMacAddress = packet.getDestinationMAC();
+ serverMessage = true;
+ } else if (sessionsMap.containsKey(packet.getSourceMAC())) {
+ clientMacAddress = packet.getSourceMAC();
+ } else {
+ // In the unlikely case of receiving a PADT without an existing session
+ log.warn("PADT received for unknown session. Dropping packet.");
+ return;
+ }
+ break;
+ }
+ case PPPOED_CODE_PADI:
+ case PPPOED_CODE_PADR: {
+ clientMacAddress = packet.getSourceMAC();
+ break;
+ }
+ default: {
+ clientMacAddress = packet.getDestinationMAC();
+ serverMessage = true;
+ break;
+ }
+ }
+
+ SubscriberAndDeviceInformation subsInfo;
+ if (serverMessage) {
+ if (!sessionsMap.containsKey(clientMacAddress)) {
+ log.error("PPPoED message received from server without an existing session. Message not relayed.");
+ return;
+ } else {
+ PppoeSessionInfo sessInfo = sessionsMap.get(clientMacAddress).value();
+ subsInfo = getSubscriber(sessInfo.getClientCp());
+ }
+ } else {
+ subsInfo = getSubscriber(packetCp);
+ }
+
+ if (subsInfo == null) {
+ log.error("No Sadis info for subscriber on connect point {}. Message not relayed.", packetCp);
+ return;
+ }
+
+ log.trace("{} received from {} at {} with client mac: {}",
+ PPPoED.Type.getTypeByValue(pppoedCode).toString(),
+ serverMessage ? "server" : "client", packetCp, clientMacAddress);
+
+ if (log.isTraceEnabled()) {
+ log.trace("PPPoED message received from {} at {} {}",
+ serverMessage ? "server" : "client", packetCp, packet);
+ }
+
+ // In case of PADI, force the removal of the previous session entry
+ if ((pppoedCode == PPPOED_CODE_PADI) && (sessionsMap.containsKey(clientMacAddress))) {
+ log.trace("PADI received from MAC: {} with existing session data. Removing the existing data.",
+ clientMacAddress.toString());
+ sessionsMap.remove(clientMacAddress);
+ }
+
+ // Fill the session map entry
+ PppoeSessionInfo sessionInfo;
+ if (!sessionsMap.containsKey(clientMacAddress)) {
+ if (!serverMessage) {
+ ConnectPoint serverCp = getServerConnectPoint(packetCp.deviceId());
+ SubscriberAndDeviceInformation subscriber = getSubscriber(packetCp);
+ sessionInfo = new PppoeSessionInfo(packetCp, serverCp, pppoedCode,
+ sessionId, subscriber, clientMacAddress);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+ } else {
+ // This case was already covered.
+ return;
+ }
+ } else {
+ sessionInfo = sessionsMap.get(clientMacAddress).value();
+ }
+
+ switch (pppoedCode) {
+ case PPPOED_CODE_PADI:
+ case PPPOED_CODE_PADR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ pppoedCode == PPPOED_CODE_PADI ? PppoeAgentCounterNames.PADI : PppoeAgentCounterNames.PADR);
+
+ Ethernet padir = processPacketFromClient(context, packet, pppoed, sessionInfo, clientMacAddress);
+ if (padir != null) {
+ if (padir.serialize().length <= pppoeMaxMtu) {
+ forwardPacketToServer(padir, sessionInfo);
+ } else {
+ log.debug("MTU message size: {} exceeded configured pppoeMaxMtu: {}. Dropping Packet.",
+ padir.serialize().length, pppoeMaxMtu);
+ forwardPacketToClient(errorToClient(packet, pppoed, "MTU message size exceeded"),
+ sessionInfo, clientMacAddress);
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.MTU_EXCEEDED);
+ }
+ }
+ break;
+ case PPPOED_CODE_PADO:
+ case PPPOED_CODE_PADS:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ pppoedCode == PPPOED_CODE_PADO ? PppoeAgentCounterNames.PADO : PppoeAgentCounterNames.PADS);
+ Ethernet pados = processPacketFromServer(packet, pppoed, sessionInfo, clientMacAddress);
+ if (pados != null) {
+ forwardPacketToClient(pados, sessionInfo, clientMacAddress);
+ }
+ break;
+ case PPPOED_CODE_PADT:
+ if (serverMessage) {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PADT_FROM_SERVER);
+ forwardPacketToClient(packet, sessionInfo, clientMacAddress);
+ } else {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PADT_FROM_CLIENT);
+ forwardPacketToServer(packet, sessionInfo);
+ }
+
+ String reason = "";
+ PPPoEDTag genericErrorTag = pppoed.getTags()
+ .stream()
+ .filter(tag -> tag.getType() == PPPOED_TAG_GENERIC_ERROR)
+ .findFirst()
+ .orElse(null);
+
+ if (genericErrorTag != null) {
+ reason = new String(genericErrorTag.getValue(), StandardCharsets.UTF_8);
+ }
+ log.debug("PADT sessionId:{} client MAC:{} Terminate reason:{}.",
+ Integer.toHexString(sessionId & 0xFFFF), clientMacAddress, reason);
+
+ boolean knownSessionId = sessionInfo.getSessionId() == sessionId;
+ if (knownSessionId) {
+ PppoeSessionInfo removedSessionInfo = Versioned
+ .valueOrNull(sessionsMap.remove(clientMacAddress));
+ if (removedSessionInfo != null) {
+ post(new PppoeAgentEvent(PppoeAgentEvent.Type.TERMINATE, removedSessionInfo,
+ packetCp, clientMacAddress, reason));
+ }
+ } else {
+ log.warn("PADT received for a known MAC address but for unknown session.");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private Ethernet processPacketFromClient(PacketContext context, Ethernet ethernetPacket, PPPoED pppoed,
+ PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ byte pppoedCode = pppoed.getCode();
+
+ sessionInfo.setPacketCode(pppoedCode);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+
+ // Update Counters
+ for (PPPoEDTag tag : pppoed.getTags()) {
+ if (tag.getType() == PPPOED_TAG_GENERIC_ERROR) {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT);
+ break;
+ }
+ }
+
+ Ethernet ethFwd = ethernetPacket;
+
+ // If this is a PADI packet, there'll be a START event.
+ if (pppoedCode == PPPOED_CODE_PADI) {
+ post(new PppoeAgentEvent(PppoeAgentEvent.Type.START, sessionInfo, sessionInfo.getClientCp(),
+ clientMacAddress));
+ }
+
+ // Creates the vendor specific tag.
+ String circuitId = getCircuitId(sessionInfo.getClientCp());
+ if (circuitId == null) {
+ log.error("Failed to build circuid-id for client on connect point {}. Dropping packet.",
+ sessionInfo.getClientCp());
+ return null;
+ }
+
+ // Checks whether the circuit-id is valid, if it's not it drops the packet.
+ if (!isCircuitIdValid(circuitId, sessionInfo.getSubscriber())) {
+ log.warn("Invalid circuit ID, dropping packet.");
+ PppoeAgentEvent invalidCidEvent = new PppoeAgentEvent(PppoeAgentEvent.Type.INVALID_CID, sessionInfo,
+ context.inPacket().receivedFrom(), clientMacAddress);
+ post(invalidCidEvent);
+ return null;
+ }
+
+ String remoteId = sessionInfo.getSubscriber().remoteId();
+ byte[] vendorSpecificTag = new PPPoEDVendorSpecificTag(circuitId, remoteId).toByteArray();
+
+ // According to TR-101, R-149 (by Broadband Forum), agent must REPLACE vendor-specific tag that may come
+ // from client message with its own tag.
+ // The following block ensures that agent removes any previous vendor-specific tag.
+ List<PPPoEDTag> originalTags = pppoed.getTags();
+ if (originalTags != null) {
+ PPPoEDTag originalVendorSpecificTag = originalTags.stream()
+ .filter(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC)
+ .findFirst()
+ .orElse(null);
+
+ if (originalVendorSpecificTag != null) {
+ int tagToRemoveLength = originalVendorSpecificTag.getLength();
+ originalTags.removeIf(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC);
+ pppoed.setPayloadLength((short) (pppoed.getPayloadLength() - tagToRemoveLength));
+ }
+ }
+
+ pppoed.setTag(PPPOED_TAG_VENDOR_SPECIFIC, vendorSpecificTag);
+
+ ethFwd.setPayload(pppoed);
+ ethFwd.setQinQTPID(Ethernet.TYPE_VLAN);
+
+ UniTagInformation uniTagInformation = getUnitagInformationFromPacketContext(context,
+ sessionInfo.getSubscriber());
+ if (uniTagInformation == null) {
+ log.warn("Missing service information for connectPoint {} / cTag {}",
+ context.inPacket().receivedFrom(), context.inPacket().parsed().getVlanID());
+ return null;
+ }
+ ethFwd.setQinQVID(uniTagInformation.getPonSTag().toShort());
+ ethFwd.setPad(true);
+ return ethFwd;
+ }
+
+ private Ethernet processPacketFromServer(Ethernet ethernetPacket, PPPoED pppoed,
+ PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ // Update counters
+ List<PPPoEDTag> tags = pppoed.getTags();
+ for (PPPoEDTag tag : tags) {
+ switch (tag.getType()) {
+ case PPPOED_TAG_GENERIC_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER);
+ break;
+ case PPPOED_TAG_SERVICE_NAME_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.SERVICE_NAME_ERROR);
+ break;
+ case PPPOED_TAG_AC_SYSTEM_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+ break;
+ default:
+ break;
+ }
+ }
+
+ byte pppoedCode = pppoed.getCode();
+
+ if (pppoedCode == PPPOED_CODE_PADS) {
+ log.debug("PADS sessionId:{} client MAC:{}", Integer.toHexString(pppoed.getSessionId() & 0xFFFF),
+ clientMacAddress);
+ sessionInfo.setSessionId(pppoed.getSessionId());
+ }
+ sessionInfo.setPacketCode(pppoedCode);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+
+ PppoeAgentEvent.Type eventType = pppoedCode == PPPOED_CODE_PADS ?
+ PppoeAgentEvent.Type.SESSION_ESTABLISHED :
+ PppoeAgentEvent.Type.NEGOTIATION;
+
+ post(new PppoeAgentEvent(eventType, sessionInfo, sessionInfo.getClientCp(), clientMacAddress));
+
+ ethernetPacket.setQinQVID(QINQ_VID_NONE);
+ ethernetPacket.setPad(true);
+ return ethernetPacket;
+ }
+
+ private void updatePppoeAgentCountersStore(SubscriberAndDeviceInformation sub,
+ PppoeAgentCounterNames counterType) {
+ // Update global counter stats
+ pppoeAgentCounters.incrementCounter(PppoeAgentEvent.GLOBAL_COUNTER, counterType);
+ if (sub == null) {
+ log.warn("Counter not updated as subscriber info not found.");
+ } else {
+ // Update subscriber counter stats
+ pppoeAgentCounters.incrementCounter(sub.id(), counterType);
+ }
+ }
+
+ private String getCircuitId(ConnectPoint cp) {
+ return new CircuitIdBuilder()
+ .setConnectPoint(cp)
+ .setDeviceService(deviceService)
+ .setSubsService(subsService)
+ .setCircuitIdConfig(circuitIdfields)
+ .addCustomSeparator(CircuitIdFieldName.AcessNodeIdentifier, " ")
+ .addCustomSeparator(CircuitIdFieldName.Port, ":")
+ .build();
+ }
+
+ protected ConnectPoint getServerConnectPoint(DeviceId deviceId) {
+ ConnectPoint serverCp;
+ if (!useOltUplink) {
+ serverCp = pppoeServerConnectPoint.get();
+ } else {
+ serverCp = getUplinkConnectPointOfOlt(deviceId);
+ }
+ return serverCp;
+ }
+
+ private boolean isCircuitIdValid(String cId, SubscriberAndDeviceInformation entry) {
+ if (!enableCircuitIdValidation) {
+ log.debug("Circuit ID validation is disabled.");
+ return true;
+ }
+
+ if (entry == null) {
+ log.error("SubscriberAndDeviceInformation cannot be null.");
+ return false;
+ }
+
+ if (entry.circuitId() == null || entry.circuitId().isEmpty()) {
+ log.debug("Circuit ID not configured in SADIS entry. No check is done.");
+ return true;
+ } else {
+ if (cId.equals(entry.circuitId())) {
+ log.info("Circuit ID in packet: {} matched the configured entry on SADIS.", cId);
+ return true;
+ } else {
+ log.warn("Circuit ID in packet: {} did not match the configured entry on SADIS: {}.",
+ cId, entry.circuitId());
+ return false;
+ }
+ }
+ }
+
+ private void forwardPacketToServer(Ethernet packet, PppoeSessionInfo sessionInfo) {
+ ConnectPoint toSendTo = sessionInfo.getServerCp();
+ if (toSendTo != null) {
+ log.info("Sending PPPOE packet to server at {}", toSendTo);
+ TrafficTreatment t = DefaultTrafficTreatment.builder().setOutput(toSendTo.port()).build();
+ OutboundPacket o = new DefaultOutboundPacket(toSendTo.deviceId(), t,
+ ByteBuffer.wrap(packet.serialize()));
+ if (log.isTraceEnabled()) {
+ log.trace("Relaying packet to pppoe server at {} {}", toSendTo, packet);
+ }
+ packetService.emit(o);
+
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER);
+ } else {
+ log.error("No connect point to send msg to PPPOE Server");
+ }
+ }
+
+ private void forwardPacketToClient(Ethernet packet, PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ ConnectPoint subCp = sessionInfo.getClientCp();
+ if (subCp == null) {
+ log.error("Dropping PPPOE packet, can't find a connectpoint for MAC {}", clientMacAddress);
+ return;
+ }
+
+ log.info("Sending PPPOE packet to client at {}", subCp);
+ TrafficTreatment t = DefaultTrafficTreatment.builder()
+ .setOutput(subCp.port()).build();
+ OutboundPacket o = new DefaultOutboundPacket(
+ subCp.deviceId(), t, ByteBuffer.wrap(packet.serialize()));
+ if (log.isTraceEnabled()) {
+ log.trace("Relaying packet to pppoe client at {} {}", subCp, packet);
+ }
+
+ packetService.emit(o);
+
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER);
+ }
+
+ private Ethernet errorToClient(Ethernet packet, PPPoED p, String errString) {
+ PPPoED err = new PPPoED();
+ err.setVersion(p.getVersion());
+ err.setType(p.getType());
+ switch (p.getCode()) {
+ case PPPOED_CODE_PADI:
+ err.setCode(PPPOED_CODE_PADO);
+ break;
+ case PPPOED_CODE_PADR:
+ err.setCode(PPPOED_CODE_PADS);
+ break;
+ default:
+ break;
+ }
+ err.setCode(p.getCode());
+ err.setSessionId(p.getSessionId());
+ err.setTag(PPPOED_TAG_GENERIC_ERROR, errString.getBytes(StandardCharsets.UTF_8));
+
+ Ethernet ethPacket = new Ethernet();
+ ethPacket.setPayload(err);
+ ethPacket.setSourceMACAddress(packet.getDestinationMACAddress());
+ ethPacket.setDestinationMACAddress(packet.getSourceMACAddress());
+ ethPacket.setQinQVID(QINQ_VID_NONE);
+ ethPacket.setPad(true);
+
+ return ethPacket;
+ }
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+
+ if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+ event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
+ event.configClass().equals(PppoeAgentConfig.class)) {
+ updateConfig();
+ log.info("Reconfigured");
+ }
+ }
+ }
+
+ /**
+ * Handles Mastership changes for the devices which connect to the PPPOE server.
+ */
+ private class InnerMastershipListener implements MastershipListener {
+ @Override
+ public void event(MastershipEvent event) {
+ if (!useOltUplink) {
+ if (pppoeServerConnectPoint.get() != null &&
+ pppoeServerConnectPoint.get().deviceId().equals(event.subject())) {
+ log.trace("Mastership Event recevived for {}", event.subject());
+ // mastership of the device for our connect point has changed, reselect
+ selectServerConnectPoint();
+ }
+ }
+ }
+ }
+
+ private class InnerDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceId devId = event.subject().id();
+
+ if (log.isTraceEnabled() && !event.type().equals(DeviceEvent.Type.PORT_STATS_UPDATED)) {
+ log.trace("Device Event received for {} event {}", event.subject(), event.type());
+ }
+
+ // Handle events from any other device
+ switch (event.type()) {
+ case PORT_UPDATED:
+ if (!event.port().isEnabled()) {
+ ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+ removeSessionsByConnectPoint(cp);
+ }
+ break;
+ case PORT_REMOVED:
+ // Remove all entries related to this port from sessions map
+ ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+ removeSessionsByConnectPoint(cp);
+ break;
+ case DEVICE_REMOVED:
+ // Remove all entries related to this device from sessions map
+ removeSessionsByDevice(devId);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InnerPppoeAgentStoreDelegate implements PppoeAgentStoreDelegate {
+ @Override
+ public void notify(PppoeAgentEvent event) {
+ if (event.type().equals(PppoeAgentEvent.Type.STATS_UPDATE)) {
+ PppoeAgentEvent toPost = event;
+ if (event.getSubscriberId() != null) {
+ // infuse the event with the allocation info before posting
+ PppoeSessionInfo info = Versioned.valueOrNull(
+ sessionsMap.stream().filter(entry -> event.getSubscriberId()
+ .equals(entry.getValue().value().getSubscriber().id()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null));
+ if (info == null) {
+ log.debug("Not handling STATS_UPDATE event for session that no longer exists. {}.", event);
+ return;
+ }
+
+ toPost = new PppoeAgentEvent(event.type(), info, event.getCounterName(), event.getCounterValue(),
+ info.getClientMac(), event.getSubscriberId());
+ }
+ post(toPost);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
new file mode 100644
index 0000000..5d14c76
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PppoeAgentConfig extends Config<ApplicationId> {
+ private static final String PPPOE_CONNECT_POINTS = "pppoeServerConnectPoints";
+ private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
+
+ protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+
+ @Override
+ public boolean isValid() {
+ return hasOnlyFields(PPPOE_CONNECT_POINTS, USE_OLT_ULPORT_FOR_PKT_INOUT);
+ }
+
+ /**
+ * Returns whether the app would use the uplink port of OLT for sending/receving
+ * messages to/from the server.
+ *
+ * @return true if OLT uplink port is to be used, false otherwise
+ */
+ public boolean getUseOltUplinkForServerPktInOut() {
+ if (object == null) {
+ return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+ }
+ if (!object.has(USE_OLT_ULPORT_FOR_PKT_INOUT)) {
+ return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+ }
+ return object.path(USE_OLT_ULPORT_FOR_PKT_INOUT).asBoolean();
+ }
+
+ /**
+ * Returns the pppoe server connect points.
+ *
+ * @return pppoe server connect points
+ */
+ public Set<ConnectPoint> getPppoeServerConnectPoint() {
+ if (object == null) {
+ return new HashSet<ConnectPoint>();
+ }
+
+ if (!object.has(PPPOE_CONNECT_POINTS)) {
+ return ImmutableSet.of();
+ }
+
+ ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
+ ArrayNode arrayNode = (ArrayNode) object.path(PPPOE_CONNECT_POINTS);
+ for (JsonNode jsonNode : arrayNode) {
+ String portName = jsonNode.asText(null);
+ if (portName == null) {
+ return null;
+ }
+ try {
+ builder.add(ConnectPoint.deviceConnectPoint(portName));
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+ return builder.build();
+ }
+
+
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
new file mode 100644
index 0000000..0d58078
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents PPPoE agent counters type.
+ */
+public enum PppoeAgentCounterNames {
+ /**
+ * Number of PADI messages received from client.
+ */
+ PADI,
+ /**
+ * Number of PADO messages received from server.
+ */
+ PADO,
+ /**
+ * Number of PADR messages received from client.
+ */
+ PADR,
+ /**
+ * Number of PADS messages received from server.
+ */
+ PADS,
+ /**
+ * Number of PADT messages received from server.
+ */
+ PADT_FROM_SERVER,
+ /**
+ * Number of PADT messages received from client.
+ */
+ PADT_FROM_CLIENT,
+ /**
+ * Number of PPPoED messages sent to server.
+ */
+ PPPOED_PACKETS_TO_SERVER,
+ /**
+ * Number of PPPoED messages received from server.
+ */
+ PPPOED_PACKETS_FROM_SERVER,
+ /**
+ * Number of MTU Exceeded errors generated by the PPPoED agent.
+ */
+ MTU_EXCEEDED,
+ /**
+ * Number of Generic Errors received from server.
+ */
+ GENERIC_ERROR_FROM_SERVER,
+ /**
+ * Number of Generic Errors received from client.
+ */
+ GENERIC_ERROR_FROM_CLIENT,
+ /**
+ * Number of ServiceName Errors received from server.
+ */
+ SERVICE_NAME_ERROR,
+ /**
+ * Number of AC-System Errors received from server.
+ */
+ AC_SYSTEM_ERROR;
+
+ /**
+ * Supported types of PPPoED agent counters.
+ */
+ public static final Set<PppoeAgentCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(
+ PppoeAgentCounterNames.PADI, PppoeAgentCounterNames.PADO,
+ PppoeAgentCounterNames.PADR, PppoeAgentCounterNames.PADS,
+ PppoeAgentCounterNames.PADT_FROM_SERVER, PppoeAgentCounterNames.PADT_FROM_CLIENT,
+ PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER,
+ PppoeAgentCounterNames.MTU_EXCEEDED, PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER,
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, PppoeAgentCounterNames.SERVICE_NAME_ERROR,
+ PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
new file mode 100644
index 0000000..cca163f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents PPPoED agent counters identifier.
+ */
+public final class PppoeAgentCountersIdentifier {
+ final String counterClassKey;
+ final Enum<PppoeAgentCounterNames> counterTypeKey;
+
+ /**
+ * Creates a default global counter identifier for a given counterType.
+ *
+ * @param counterTypeKey Identifies the supported type of pppoe agent counters
+ */
+ public PppoeAgentCountersIdentifier(PppoeAgentCounterNames counterTypeKey) {
+ this.counterClassKey = PppoeAgentEvent.GLOBAL_COUNTER;
+ this.counterTypeKey = counterTypeKey;
+ }
+
+ /**
+ * Creates a counter identifier. A counter is defined by the key pair [counterClass, counterType],
+ * where counterClass can be global or the subscriber ID and counterType is the supported pppoe counter.
+ *
+ * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+ * @param counterTypeKey Identifies the supported type of pppoed relay counters
+ */
+ public PppoeAgentCountersIdentifier(String counterClassKey, PppoeAgentCounterNames counterTypeKey) {
+ this.counterClassKey = counterClassKey;
+ this.counterTypeKey = counterTypeKey;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof PppoeAgentCountersIdentifier) {
+ final PppoeAgentCountersIdentifier other = (PppoeAgentCountersIdentifier) obj;
+ return Objects.equals(this.counterClassKey, other.counterClassKey)
+ && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(counterClassKey, counterTypeKey);
+ }
+
+ @Override
+ public String toString() {
+ return "PppoeAgentCountersIdentifier{" +
+ "counterClassKey='" + counterClassKey + '\'' +
+ ", counterTypeKey=" + counterTypeKey +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
new file mode 100644
index 0000000..5b4ddfd
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import org.onosproject.store.Store;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+
+/**
+ * Represents a stored Pppoe Agent Counters. A counter entry is defined by the pair [counterClass, counterType],
+ * where counterClass can be maybe global or subscriber ID and counterType is the pppoe counter.
+ */
+public interface PppoeAgentCountersStore extends Store<PppoeAgentEvent, PppoeAgentStoreDelegate> {
+
+ String NAME = "PPPOE_Agent_stats";
+
+ /**
+ * Creates or updates PPPOE Agent counter.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ */
+ void incrementCounter(String counterClass, PppoeAgentCounterNames counterType);
+
+ /**
+ * Sets the value of a PPPOE Agent counter.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ * @param value The value of the counter
+ */
+ void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value);
+
+ /**
+ * Gets the current PPPoE Agent counter values.
+ *
+ * @return PPPoE Agent counter values
+ */
+ PppoeAgentStatistics getCounters();
+
+ /**
+ * Resets counter values for a given counter class.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ */
+ void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
new file mode 100644
index 0000000..dce1ac7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of PPPoE Agent statistics.
+ */
+public class PppoeAgentStatistics {
+ private final ImmutableMap<PppoeAgentCountersIdentifier, Long> counters;
+ private PppoeAgentStatistics(ImmutableMap<PppoeAgentCountersIdentifier, Long> counters) {
+ this.counters = counters;
+ }
+ /**
+ * Creates a new empty statistics instance.
+ */
+ public PppoeAgentStatistics() {
+ counters = ImmutableMap.of();
+ }
+ /**
+ * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+ *
+ * @param id counter ID
+ * @return counter value
+ */
+ public long get(PppoeAgentCountersIdentifier id) {
+ return counters.getOrDefault(id, 0L);
+ }
+ /**
+ * Gets the map of counters.
+ *
+ * @return map of counters
+ */
+ public Map<PppoeAgentCountersIdentifier, Long> counters() {
+ return counters;
+ }
+ /**
+ * Creates a new statistics instance with the given counter values.
+ *
+ * @param counters counters
+ * @return statistics
+ */
+ public static PppoeAgentStatistics withCounters(Map<PppoeAgentCountersIdentifier, Long> counters) {
+ ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+ counters.forEach(builder::put);
+ return new PppoeAgentStatistics(builder.build());
+ }
+ /**
+ * Adds the given statistics instance to this one (sums the common counters) and returns
+ * a new instance containing the result.
+ *
+ * @param other other instance
+ * @return result
+ */
+ public PppoeAgentStatistics add(PppoeAgentStatistics other) {
+ ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+ Set<PppoeAgentCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+ counters.forEach((id, value) -> {
+ builder.put(id, value + other.counters.getOrDefault(id, 0L));
+ keys.remove(id);
+ });
+ keys.forEach(i -> builder.put(i, other.counters.get(i)));
+ return new PppoeAgentStatistics(builder.build());
+ }
+ @Override
+ public String toString() {
+ MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+ counters.forEach((id, v) -> helper.add(id.toString(), v));
+ return helper.toString();
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
new file mode 100644
index 0000000..8be88b7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.slf4j.Logger;
+import java.nio.charset.StandardCharsets;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * PPPoE Agent Counters Manager Component.
+ */
+@Component(immediate = true,
+ property = {
+ PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+ SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+ }
+)
+public class SimplePppoeAgentCountersStore extends AbstractStore<PppoeAgentEvent, PppoeAgentStoreDelegate>
+ implements PppoeAgentCountersStore {
+ private static final String PPPOE_STATISTICS_LEADERSHIP = "pppoeagent-statistics";
+ private static final MessageSubject RESET_SUBJECT = new MessageSubject("pppoeagent-statistics-reset");
+
+ private final Logger log = getLogger(getClass());
+ private ConcurrentMap<PppoeAgentCountersIdentifier, Long> countersMap;
+
+ private EventuallyConsistentMap<NodeId, PppoeAgentStatistics> statistics;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicationService;
+ protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+ protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PppoeAgentStatistics.class)
+ .register(PppoeAgentCountersIdentifier.class)
+ .register(PppoeAgentCounterNames.class)
+ .register(ClusterMessage.class)
+ .register(MessageSubject.class)
+ .build();
+ private ScheduledExecutorService executor;
+ private ScheduledFuture<?> publisherTask;
+ private ScheduledFuture<?> syncTask;
+ private AtomicBoolean dirty = new AtomicBoolean(true);
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("Activate PPPoE Agent Counters Manager");
+ countersMap = new ConcurrentHashMap<>();
+ componentConfigService.registerProperties(getClass());
+ modified(context);
+ statistics = storageService.<NodeId, PppoeAgentStatistics>eventuallyConsistentMapBuilder()
+ .withName("pppoeagent-statistics")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+ // Initialize counter values for the global counters
+ initCounters(PppoeAgentEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+ syncStats();
+ leadershipService.runForLeadership(PPPOE_STATISTICS_LEADERSHIP);
+ executor = Executors.newScheduledThreadPool(1);
+ clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+ this::resetLocal, executor);
+ startSyncTask();
+ startPublishTask();
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+ leadershipService.withdraw(PPPOE_STATISTICS_LEADERSHIP);
+ stopPublishTask();
+ stopSyncTask();
+ executor.shutdownNow();
+ componentConfigService.unregisterProperties(getClass(), false);
+ }
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<String, Object> properties = context.getProperties();
+ String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+ int oldPublishCountersRate = publishCountersRate;
+ publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldPublishCountersRate != publishCountersRate) {
+ stopPublishTask();
+ startPublishTask();
+ }
+ s = Tools.get(properties, SYNC_COUNTERS_RATE);
+ int oldSyncCountersRate = syncCountersRate;
+ syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldSyncCountersRate != syncCountersRate) {
+ stopSyncTask();
+ startSyncTask();
+ }
+ }
+ private ScheduledFuture<?> startTask(Runnable r, int rate) {
+ return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+ 0, rate, TimeUnit.SECONDS);
+ }
+ private void stopTask(ScheduledFuture<?> task) {
+ task.cancel(true);
+ }
+ private void startSyncTask() {
+ syncTask = startTask(this::syncStats, syncCountersRate);
+ }
+ private void stopSyncTask() {
+ stopTask(syncTask);
+ }
+ private void startPublishTask() {
+ publisherTask = startTask(this::publishStats, publishCountersRate);
+ }
+ private void stopPublishTask() {
+ stopTask(publisherTask);
+ }
+
+ ImmutableMap<PppoeAgentCountersIdentifier, Long> getCountersMap() {
+ return ImmutableMap.copyOf(countersMap);
+ }
+
+ public PppoeAgentStatistics getCounters() {
+ return aggregate();
+ }
+
+ /**
+ * Initialize the supported counters map for the given counter class.
+ * @param counterClass class of counters (global, per subscriber)
+ * @param existingStats existing values to intialise the counters to
+ */
+ public void initCounters(String counterClass, PppoeAgentStatistics existingStats) {
+ checkNotNull(counterClass, "counter class can't be null");
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ PppoeAgentCountersIdentifier id = new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
+ }
+ }
+
+ /**
+ * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+ * @param counterClass class of counters (global, per subscriber)
+ * @param counterType name of counter
+ */
+ public void incrementCounter(String counterClass, PppoeAgentCounterNames counterType) {
+ checkNotNull(counterClass, "counter class can't be null");
+ if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.compute(counterIdentifier, (key, counterValue) ->
+ (counterValue != null) ? counterValue + 1 : 1L);
+ } else {
+ log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+ }
+ dirty.set(true);
+ }
+
+ /**
+ * Reset the counters map for the given counter class.
+ * @param counterClass class of counters (global, per subscriber)
+ */
+ public void resetCounters(String counterClass) {
+ byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+ ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+ clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+ }
+ private void resetLocal(ClusterMessage m) {
+ String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+ checkNotNull(counterClass, "counter class can't be null");
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
+ }
+ dirty.set(true);
+ syncStats();
+ }
+
+ /**
+ * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ * @param value counter value
+ */
+ public void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value) {
+ checkNotNull(counterClass, "counter class can't be null");
+ if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.put(counterIdentifier, value);
+ } else {
+ log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+ }
+ dirty.set(true);
+ syncStats();
+ }
+
+ private PppoeAgentStatistics aggregate() {
+ return statistics.values().stream()
+ .reduce(new PppoeAgentStatistics(), PppoeAgentStatistics::add);
+ }
+ /**
+ * Creates a snapshot of the current in-memory statistics.
+ *
+ * @return snapshot of statistics
+ */
+ private PppoeAgentStatistics snapshot() {
+ return PppoeAgentStatistics.withCounters(countersMap);
+ }
+ /**
+ * Syncs in-memory stats to the eventually consistent map.
+ */
+ private void syncStats() {
+ if (dirty.get()) {
+ statistics.put(clusterService.getLocalNode().id(), snapshot());
+ dirty.set(false);
+ }
+ }
+ private void publishStats() {
+ // Only publish events if we are the cluster leader for PPPoE Agent stats
+ if (!Objects.equals(leadershipService.getLeader(PPPOE_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ return;
+ }
+ if (aggregate().counters() != null) {
+ aggregate().counters().forEach((counterKey, counterValue) -> {
+ // Subscriber-specific counters have the subscriber ID set
+ String subscriberId = null;
+ if (!counterKey.counterClassKey.equals(PppoeAgentEvent.GLOBAL_COUNTER)) {
+ subscriberId = counterKey.counterClassKey;
+ }
+ if (delegate != null) {
+ delegate.notify(new PppoeAgentEvent(PppoeAgentEvent.Type.STATS_UPDATE, null,
+ counterKey.counterTypeKey.toString(), counterValue,
+ null, subscriberId));
+ }
+ });
+ } else {
+ log.debug("Ignoring 'publishStats' request since counters are null.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
new file mode 100644
index 0000000..84ceb21
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent implementation.
+ */
+package org.opencord.pppoeagent.impl;
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
new file mode 100644
index 0000000..56d5598
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.Port;
+import org.onosproject.rest.AbstractWebResource;
+
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentStatistics;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/**
+ * PppoeAgent web resource.
+ */
+@Path("pppoeagent-app")
+public class PppoeAgentWebResource extends AbstractWebResource {
+ private final ObjectNode root = mapper().createObjectNode();
+ private final ArrayNode node = root.putArray("entry");
+ private static final String SESSION_NOT_FOUND = "Session not found";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+
+ /**
+ * Get session info object.
+ *
+ * @param mac Session MAC address
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/session/{mac}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSubscriber(@PathParam("mac") String mac) {
+ MacAddress macAddress = MacAddress.valueOf(mac);
+ PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+ PppoeSessionInfo entry = pppoeAgent.getSessionsMap().get(macAddress);
+ if (entry == null) {
+ throw new ItemNotFoundException(SESSION_NOT_FOUND);
+ }
+
+ try {
+ node.add(encodePppoeSessionInfo(entry, macAddress));
+ return ok(mapper().writeValueAsString(root)).build();
+ } catch (IllegalArgumentException e) {
+ log.error("Error while fetching PPPoE session info for MAC {} through REST API: {}", mac, e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ } catch (JsonProcessingException e) {
+ log.error("Error assembling JSON response for PPPoE session info request for MAC {} " +
+ "through REST API: {}", mac, e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ /**
+ * Get all session info objects.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/session")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSubscribers() {
+ try {
+ PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+ pppoeAgent.getSessionsMap().forEach((mac, entry) -> {
+ node.add(encodePppoeSessionInfo(entry, mac));
+ });
+
+ return ok(mapper().writeValueAsString(root)).build();
+ } catch (Exception e) {
+ log.error("Error while fetching PPPoE sessions information through REST API: {}", e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ private ObjectNode encodePppoeSessionInfo(PppoeSessionInfo entry, MacAddress macAddress) {
+ ConnectPoint cp = entry.getClientCp();
+ Port devicePort = deviceService.getPort(cp);
+ String portLabel = "uni-" + ((cp.port().toLong() & 0xF) + 1);
+ String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+ "UNKNOWN";
+
+ return mapper().createObjectNode()
+ .put("macAddress", macAddress.toString())
+ .put("sessionId", entry.getSessionId())
+ .put("currentState", entry.getCurrentState())
+ .put("lastReceivedPacket", PPPoED.Type.getTypeByValue(entry.getPacketCode()).name())
+ .put("deviceId", cp.deviceId().toString())
+ .put("portNumber", cp.port().toString())
+ .put("portLabel", portLabel)
+ .put("subscriberId", subscriberId);
+ }
+
+ /**
+ * Gets PPPoE Agent counters for global context.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/stats")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPppoeStats() {
+ return getStats(PppoeAgentEvent.GLOBAL_COUNTER);
+ }
+
+ /**
+ * Gets PPPoE Agent counters for specific subscriber.
+ *
+ * @param subscriberId Id of subscriber.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/stats/{subscriberId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPppoeSubscriberStats(@PathParam("subscriberId") String subscriberId) {
+ return getStats(subscriberId);
+ }
+ private Response getStats(String key) {
+ PppoeAgentCountersStore pppoeCounters = get(PppoeAgentCountersStore.class);
+ try {
+ PppoeAgentStatistics pppoeStatistics = pppoeCounters.getCounters();
+ JsonNode completeNode = buildPppoeCounterNodeObject(key, pppoeStatistics.counters());
+ return ok(mapper().writeValueAsString(completeNode)).build();
+ } catch (JsonProcessingException e) {
+ log.error("Error while fetching PPPoE agent counter stats through REST API: {}", e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ private JsonNode buildPppoeCounterNodeObject(String key, Map<PppoeAgentCountersIdentifier, Long> countersMap) {
+ ObjectNode entryNode = mapper().createObjectNode();
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(key, counterType));
+ if (value == null) {
+ continue;
+ }
+ entryNode = entryNode.put(counterType.name(), String.valueOf(value));
+ }
+ return mapper().createObjectNode().set(key, entryNode);
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
new file mode 100644
index 0000000..70a18d3
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rest interface for PppoeAgent.
+ */
+package org.opencord.pppoeagent.rest;
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9bb27a0
--- /dev/null
+++ b/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2021-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ id="ONOS" version="2.5">
+ <display-name>PPPoE Agent REST API v1.0</display-name>
+
+ <servlet>
+ <servlet-name>JAX-RS Service</servlet-name>
+ <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>jersey.config.server.provider.classnames</param-name>
+ <param-value>
+ org.opencord.pppoeagent.rest.PppoeAgentWebResource
+ </param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>JAX-RS Service</servlet-name>
+ <url-pattern>/*</url-pattern>
+ </servlet-mapping>
+</web-app>
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
new file mode 100644
index 0000000..6298acf
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.TestStorageService;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class PppoeAgentTest extends PppoeAgentTestBase {
+ private PppoeAgent pppoeAgent;
+ private SimplePppoeAgentCountersStore store;
+
+ ComponentConfigService mockConfigService =
+ createMock(ComponentConfigService.class);
+
+ /**
+ * Sets up the services required by the PPPoE agent app.
+ */
+ @Before
+ public void setUp() {
+ pppoeAgent = new PppoeAgent();
+ pppoeAgent.cfgService = new MockNetworkConfigRegistry();
+ pppoeAgent.coreService = new MockCoreServiceAdapter();
+ pppoeAgent.packetService = new MockPacketService();
+ pppoeAgent.componentConfigService = mockConfigService;
+ pppoeAgent.deviceService = new MockDeviceService();
+ pppoeAgent.sadisService = new MockSadisService();
+ pppoeAgent.subsService = pppoeAgent.sadisService.getSubscriberInfoService();
+ pppoeAgent.mastershipService = new MockMastershipService();
+ pppoeAgent.storageService = new TestStorageService();
+ pppoeAgent.pppoeAgentCounters = this.store;
+
+ store = new SimplePppoeAgentCountersStore();
+ store.storageService = new TestStorageService();
+ store.clusterService = new ClusterServiceAdapter();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterCommunicationService = new MockClusterCommunicationService<>();
+ store.componentConfigService = mockConfigService;
+ TestUtils.setField(store, "eventDispatcher", new MockEventDispatcher());
+ store.activate(new MockComponentContext());
+
+ pppoeAgent.pppoeAgentCounters = this.store;
+
+ TestUtils.setField(pppoeAgent, "eventDispatcher", new MockEventDispatcher());
+ TestUtils.setField(pppoeAgent, "packetProcessorExecutor", MoreExecutors.newDirectExecutorService());
+
+ pppoeAgent.activate(new ComponentContextAdapter());
+ }
+
+ /**
+ * Tears down the PPPoE agent app.
+ */
+ @After
+ public void tearDown() {
+ pppoeAgent.deactivate();
+ }
+
+ @Test
+ public void testPppoePadi() {
+ testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADI);
+ }
+
+ @Test
+ public void testPppoePado() {
+ testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADO);
+ }
+
+ @Test
+ public void testPppoePadr() {
+ testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADR);
+ }
+
+ @Test
+ public void testPppoePads() {
+ testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADS);
+ }
+
+ @Test
+ public void testPppoePadt() {
+ // To simulate a successful PADT from subscriber a session entry is needed.
+ PppoeSessionInfo sessionInfo = new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+ PPPoED.PPPOED_CODE_PADS, (short) 1,
+ pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID),
+ CLIENT_MAC);
+ putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+ assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+
+ Ethernet padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+ sendPacket(padt, DEFAULT_CONNECT_POINT);
+ Ethernet processedPadt = (Ethernet) getPacket();
+ assertNotNull(processedPadt);
+ assertEquals(padt, processedPadt);
+ assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+ PppoeAgentEvent e = getEvent();
+ assertNotNull(e);
+ assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+ assertEquals(DEFAULT_CONNECT_POINT, e.getConnectPoint());
+ assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+ // Simulating PADT from server.
+ putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+ assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+ padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+ sendPacket(padt, SERVER_CONNECT_POINT);
+ processedPadt = (Ethernet) getPacket();
+ assertNotNull(processedPadt);
+ assertEquals(padt, processedPadt);
+ assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+ e = getEvent();
+ assertNotNull(e);
+ assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+ assertEquals(SERVER_CONNECT_POINT, e.getConnectPoint());
+ assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+ // Simulating PADT from client for unknown session (the packet must not be processed)..
+ padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+ sendPacket(padt, DEFAULT_CONNECT_POINT);
+ processedPadt = (Ethernet) getPacket();
+ assertNull(processedPadt);
+
+ // Simulating PADT from server for unknown session (the packet must not be processed).
+ padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+ sendPacket(padt, SERVER_CONNECT_POINT);
+ processedPadt = (Ethernet) getPacket();
+ assertNull(processedPadt);
+ }
+
+ @Test
+ public void testPppoeCircuitIdValidation() {
+ Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+ // Send packet with a different port number, so there will be a circuit-id mismatch.
+ sendPacket(packet, new ConnectPoint(DEVICE_ID, PortNumber.portNumber(4096L)));
+ Ethernet processedPacket = (Ethernet) getPacket();
+ assertNull(processedPacket);
+ PppoeAgentEvent e = getEvent();
+ assertNotNull(e);
+ assertEquals(PppoeAgentEvent.Type.INVALID_CID, e.type());
+
+ // Now send it from the default connect point, which should generate the valid circuit-id.
+ sendPacket(packet, DEFAULT_CONNECT_POINT);
+ processedPacket = (Ethernet) getPacket();
+ assertNotNull(processedPacket);
+ PPPoED pppoeLayer = (PPPoED) processedPacket.getPayload();
+ assertNotNull(pppoeLayer);
+ PPPoEDTag tag = pppoeLayer.getTag(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC);
+ assertNotNull(tag);
+ PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(tag.getValue());
+ assertNotNull(vendorSpecificTag);
+
+ // Checks if the configured circuit-id matches with the built one.
+ assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+ }
+
+ @Test
+ public void testPppoeCounters() {
+ short sessionId = (short) 0;
+ Ethernet padi = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+ Ethernet pado = constructPppoedPacket(PPPoED.PPPOED_CODE_PADO, SERVER_MAC, CLIENT_MAC,
+ CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+ Ethernet padr = constructPppoedPacket(PPPoED.PPPOED_CODE_PADR, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+ sessionId++;
+ Ethernet pads = constructPppoedPacket(PPPoED.PPPOED_CODE_PADS, SERVER_MAC, CLIENT_MAC,
+ CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+
+ List.of(new CounterTester(PppoeAgentCounterNames.PADI, 6, padi, DEFAULT_CONNECT_POINT),
+ new CounterTester(PppoeAgentCounterNames.PADO, 2, pado, SERVER_CONNECT_POINT),
+ new CounterTester(PppoeAgentCounterNames.PADR, 5, padr, DEFAULT_CONNECT_POINT),
+ new CounterTester(PppoeAgentCounterNames.PADS, 3, pads, SERVER_CONNECT_POINT),
+ new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER, 5, null, null),
+ new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, 11, null, null),
+ new CounterTester(PppoeAgentCounterNames.AC_SYSTEM_ERROR, 0, null, null),
+ new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, 0, null, null),
+ new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER, 0, null, null),
+ new CounterTester(PppoeAgentCounterNames.MTU_EXCEEDED, 0, null, null),
+ new CounterTester(PppoeAgentCounterNames.SERVICE_NAME_ERROR, 0, null, null))
+ .forEach(CounterTester::test);
+ }
+
+ @Test
+ public void testSessionsMap() {
+ assertEquals(0, pppoeAgent.getSessionsMap().size());
+ Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+ sendPacket(packet, DEFAULT_CONNECT_POINT);
+ assertEquals(1, pppoeAgent.getSessionsMap().size());
+
+ int randomPacketsNumber = 15;
+ sendMultiplePadi(randomPacketsNumber);
+ assertEquals(randomPacketsNumber + 1, pppoeAgent.getSessionsMap().size());
+ PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+ assertSessionInfo(sessionInfo, PPPoED.PPPOED_CODE_PADI, (short) 0);
+
+ packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+ sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+ assertEquals(randomPacketsNumber, pppoeAgent.getSessionsMap().size());
+ }
+
+ @Test
+ public void testDeviceEvents() {
+ // Guarantee map is empty.
+ assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+ // Fill sessionsMap by sending 10 PADI packets for random mac addresses.
+ int numPackets = 10;
+ sendMultiplePadi(numPackets);
+ assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+
+ // Generate PORT_REMOVED event and inject into the device listener.
+ DeviceListener deviceListener = TestUtils.getField(pppoeAgent, "deviceListener");
+ Device device = pppoeAgent.deviceService.getDevice(DEVICE_ID);
+ DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_REMOVED,
+ device,
+ new MockPort(PortNumber.portNumber(1L)));
+ deviceListener.event(deviceEvent);
+
+ // Check if session map is empty again.
+ assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+ // Perform the same test but for PORT_UPDATED event.
+ sendMultiplePadi(numPackets);
+ assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+ deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_UPDATED,
+ device,
+ new MockPort(PortNumber.portNumber(1L), false));
+ deviceListener.event(deviceEvent);
+ assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+ // Same test for DEVICE_REMOVED.
+ sendMultiplePadi(numPackets);
+ assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+ deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
+ deviceListener.event(deviceEvent);
+ assertEquals(0, pppoeAgent.getSessionsMap().size());
+ }
+
+ private void sendMultiplePadi(int num) {
+ for (int i = 0; i < num; i++) {
+ MacAddress macAddress;
+ // A trick to guarantee the Mac address won't repeat (this case may never occur).
+ do {
+ macAddress = randomizeMacAddress();
+ } while (pppoeAgent.getSessionsMap().containsKey(macAddress));
+
+ Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, macAddress, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+ sendPacket(packet, DEFAULT_CONNECT_POINT);
+ }
+ }
+
+ private void testPppoeUpstreamPacket(byte packetCode) {
+ Ethernet packet = constructPppoedPacket(packetCode, CLIENT_MAC, MacAddress.BROADCAST,
+ CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+ sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+ Ethernet processedPacket = (Ethernet) getPacket();
+ assertNotNull(processedPacket);
+
+ PPPoED pppoedLayer = (PPPoED) processedPacket.getPayload();
+ assertNotNull(pppoedLayer);
+
+ List<PPPoEDTag> pppoedTagList = pppoedLayer.getTags();
+ assertEquals(1, pppoedTagList.size());
+
+ PPPoEDTag ppPoEDTag = pppoedTagList.get(0);
+ assertEquals(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC, ppPoEDTag.getType());
+
+ PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(ppPoEDTag.getValue());
+ assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+ assertEquals(CLIENT_REMOTE_ID, vendorSpecificTag.getRemoteId());
+ assertEquals(Integer.valueOf(PPPoEDVendorSpecificTag.BBF_IANA_VENDOR_ID), vendorSpecificTag.getVendorId());
+
+ // The only difference between the original and processed is the tag list,
+ // so after removing it the packets must be equal.
+ pppoedLayer.setPayload(null);
+ pppoedLayer.setPayloadLength((short) 0);
+ processedPacket.setPayload(pppoedLayer);
+ assertEquals(packet, processedPacket);
+
+ PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+ assertNotNull(sessionInfo);
+ assertSessionInfo(sessionInfo, packetCode, (short) 0);
+ }
+
+ private void testPppoeDownstreamPacket(byte packetCode) {
+ // Simulating a session entry of a previous packet.
+ byte previousPacketCode = packetCode == PPPoED.PPPOED_CODE_PADO ? PPPoED.PPPOED_CODE_PADI :
+ PPPoED.PPPOED_CODE_PADR;
+
+ SubscriberAndDeviceInformation deviceInfo = pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID);
+
+ putInfoOnSessionMap(CLIENT_MAC, new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+ previousPacketCode, (short) 0, deviceInfo, CLIENT_MAC));
+
+ short sessionId = (short) (packetCode == PPPoED.PPPOED_CODE_PADS ? 1 : 0);
+ Ethernet packet = constructPppoedPacket(packetCode, SERVER_MAC, CLIENT_MAC,
+ CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+ sendPacket(packet, SERVER_CONNECT_POINT);
+
+ Ethernet processedPacket = (Ethernet) getPacket();
+ assertNotNull(processedPacket);
+
+ // sTag is removed before sending the packet to client.
+ packet.setQinQVID(VlanId.UNTAGGED);
+ assertEquals(packet, processedPacket);
+
+ PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+ assertNotNull(sessionInfo);
+ assertSessionInfo(sessionInfo, packetCode, sessionId);
+ }
+
+ private void assertSessionInfo(PppoeSessionInfo sessionInfo, Byte packetCode, short sessionId) {
+ assertEquals(packetCode, sessionInfo.getPacketCode());
+ assertEquals(sessionId, sessionInfo.getSessionId());
+ assertEquals(DEFAULT_CONNECT_POINT, sessionInfo.getClientCp());
+ assertEquals(CLIENT_MAC, sessionInfo.getClientMac());
+ }
+
+ private void putInfoOnSessionMap(MacAddress key, PppoeSessionInfo sessionInfo) {
+ ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap = TestUtils.getField(pppoeAgent, "sessionsMap");
+ sessionsMap.put(key, sessionInfo);
+ }
+
+ class CounterTester {
+ CounterTester(String subscriber, PppoeAgentCounterNames counter,
+ long expectedValue, Ethernet packetModel,
+ ConnectPoint cp) {
+ this.subscriber = subscriber;
+ this.counter = counter;
+ this.expectedValue = expectedValue;
+ this.packetModel = packetModel;
+ this.cp = cp;
+ }
+
+ CounterTester(PppoeAgentCounterNames counter, long expectedValue,
+ Ethernet packetModel, ConnectPoint cp) {
+ this(PppoeAgentEvent.GLOBAL_COUNTER, counter, expectedValue, packetModel, cp);
+ }
+
+ String subscriber;
+ PppoeAgentCounterNames counter;
+ long expectedValue;
+ Ethernet packetModel;
+ ConnectPoint cp;
+
+ void sendModel() {
+ for (int i = 0; i < expectedValue; i++) {
+ sendPacket(packetModel, cp);
+ }
+ }
+
+ void assertCounterValue() {
+ long actualValue = store.getCountersMap()
+ .get(new PppoeAgentCountersIdentifier(subscriber, counter));
+ assertEquals(expectedValue, actualValue);
+ }
+
+ void test() {
+ if (packetModel != null && cp != null) {
+ sendModel();
+ }
+ assertCounterValue();
+ }
+ }
+}
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
new file mode 100644
index 0000000..59dc216
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
@@ -0,0 +1,751 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onlab.packet.BasePacket;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.net.packet.DefaultPacketContext;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Common methods for PpppoeAgent app tests.
+ */
+public class PppoeAgentTestBase {
+ static final int UPLINK_PORT = 5;
+ static final String OLT_DEV_ID = "of:00000000000000aa";
+ static final String OLT_SERIAL_NUMBER = "OLT123456789";
+
+ static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
+ static final VlanId CLIENT_C_TAG_2 = VlanId.vlanId((short) 998);
+ static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
+ static final String CLIENT_ID_1 = "SUBSCRIBER_ID_1";
+ static final short CLIENT_C_PBIT = 6;
+ static final String CLIENT_NAS_PORT_ID = "ONU123456789";
+ static final String CLIENT_REMOTE_ID = "remote0";
+
+ static final MacAddress CLIENT_MAC = MacAddress.valueOf("B8:26:D4:09:E5:D1");
+ static final MacAddress SERVER_MAC = MacAddress.valueOf("74:86:7A:FB:07:86");
+ static final MacAddress OLT_MAC_ADDRESS = MacAddress.valueOf("01:02:03:04:05:06");
+
+ static final ConnectPoint DEFAULT_CONNECT_POINT = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
+ static final String CLIENT_CIRCUIT_ID = String.format("%s 0/%s:%s", OLT_SERIAL_NUMBER,
+ (DEFAULT_CONNECT_POINT.port().toLong() >> 12) + 1,
+ CLIENT_NAS_PORT_ID);
+
+ static final DeviceId DEVICE_ID = DeviceId.deviceId(OLT_DEV_ID);
+ static final String SCHEME_NAME = "pppoeagent";
+
+ static final ConnectPoint SERVER_CONNECT_POINT = ConnectPoint.deviceConnectPoint("of:00000000000000aa/5");
+
+ static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+
+ List<BasePacket> savedPackets = new LinkedList<>();
+ PacketProcessor packetProcessor;
+
+ /**
+ * Saves the given packet onto the saved packets list.
+ *
+ * @param packet packet to save
+ */
+ void savePacket(BasePacket packet) {
+ savedPackets.add(packet);
+ }
+
+ /**
+ * Gets and removes the packet in the 1st position of savedPackets list.
+ *
+ * @return the packet in 1st position of savedPackets list.
+ */
+ BasePacket getPacket() {
+ return savedPackets.size() > 0 ? savedPackets.remove(0) : null;
+ }
+
+ /**
+ * Gets the last generated event.
+ *
+ * @return the last generated pppoe agent event.
+ */
+ PppoeAgentEvent getEvent() {
+ List<Event> savedEvents = MockEventDispatcher.eventList;
+ return savedEvents.size() > 0 ? (PppoeAgentEvent) savedEvents.get(savedEvents.size() - 1) : null;
+ }
+
+ /**
+ * Sends an Ethernet packet to the process method of the Packet Processor.
+ *
+ * @param pkt Ethernet packet
+ * @param cp ConnectPoint cp
+ */
+ void sendPacket(Ethernet pkt, ConnectPoint cp) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(pkt.serialize());
+ InboundPacket inPacket = new DefaultInboundPacket(cp, pkt, byteBuffer);
+
+ PacketContext context = new MockPacketContext(127L, inPacket, null, false);
+ packetProcessor.process(context);
+ }
+
+ /**
+ * Mocks core service adaptor that provides an appId.
+ */
+ class MockCoreServiceAdapter extends CoreServiceAdapter {
+
+ @Override
+ public ApplicationId registerApplication(String name) {
+ return new DefaultApplicationId(10, name);
+ }
+ }
+
+ /**
+ * Mocks device service to provide the of device object.
+ */
+ class MockDeviceService extends DeviceServiceAdapter {
+
+ private ProviderId providerId = new ProviderId("of", "foo");
+ private final Device device1 = new PppoeAgentTestBase.MockDevice(providerId, DEVICE_ID, Device.Type.SWITCH,
+ "foo.inc", "0", "0", OLT_SERIAL_NUMBER, new ChassisId(),
+ DEVICE_ANNOTATIONS);
+
+ @Override
+ public Device getDevice(DeviceId devId) {
+ return device1;
+ }
+
+ @Override
+ public Port getPort(ConnectPoint cp) {
+ return new PppoeAgentTestBase.MockPort(cp.port());
+ }
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ return new PppoeAgentTestBase.MockPort(portNumber);
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId d) {
+ return true;
+ }
+ }
+
+ /**
+ * Mocks device object.
+ */
+ class MockDevice extends DefaultDevice {
+
+ public MockDevice(ProviderId providerId, DeviceId id, Type type,
+ String manufacturer, String hwVersion, String swVersion,
+ String serialNumber, ChassisId chassisId, Annotations... annotations) {
+ super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+ chassisId, annotations);
+ }
+ }
+
+ /**
+ * Mocks port object.
+ */
+ class MockPort implements Port {
+ private PortNumber portNumber;
+ private boolean isEnabled = true;
+ MockPort(PortNumber portNumber) {
+ this.portNumber = portNumber;
+ }
+
+ MockPort(PortNumber portNumber, boolean isEnabled) {
+ this.portNumber = portNumber;
+ this.isEnabled = isEnabled;
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return this.isEnabled;
+ }
+ @Override
+ public long portSpeed() {
+ return 1000;
+ }
+ @Override
+ public Element element() {
+ return null;
+ }
+ @Override
+ public PortNumber number() {
+ return this.portNumber;
+ }
+ @Override
+ public Annotations annotations() {
+ return new MockAnnotations();
+ }
+ @Override
+ public Type type() {
+ return Port.Type.FIBER;
+ }
+
+ private class MockAnnotations implements Annotations {
+
+ @Override
+ public String value(String val) {
+ return CLIENT_NAS_PORT_ID;
+ }
+ @Override
+ public Set<String> keys() {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Mocks mastership service to enable local-master operations.
+ */
+ class MockMastershipService extends MastershipServiceAdapter {
+ @Override
+ public boolean isLocalMaster(DeviceId d) {
+ return true;
+ }
+ }
+
+ /**
+ * Keeps a reference to the PacketProcessor and saves the OutboundPackets.
+ */
+ class MockPacketService extends PacketServiceAdapter {
+
+ @Override
+ public void addProcessor(PacketProcessor processor, int priority) {
+ packetProcessor = processor;
+ }
+
+ @Override
+ public void emit(OutboundPacket packet) {
+ try {
+ Ethernet eth = Ethernet.deserializer().deserialize(packet.data().array(),
+ 0, packet.data().array().length);
+ savePacket(eth);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Mocks Sadis service.
+ */
+ class MockSadisService implements SadisService {
+ @Override
+ public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+ return new PppoeAgentTestBase.MockSubService();
+ }
+
+ @Override
+ public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+ return null;
+ }
+ }
+
+ /**
+ * Mocks Sadis content with a device and a subscriber entry.
+ */
+ class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+
+ PppoeAgentTestBase.MockSubscriberAndDeviceInformation device =
+ new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(OLT_DEV_ID, VlanId.NONE, VlanId.NONE,
+ VlanId.NONE, null, null,
+ OLT_MAC_ADDRESS,
+ Ip4Address.valueOf("10.10.10.10"),
+ UPLINK_PORT, null);
+
+ PppoeAgentTestBase.MockSubscriberAndDeviceInformation sub =
+ new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(CLIENT_ID_1, CLIENT_C_TAG, CLIENT_C_TAG_2,
+ CLIENT_S_TAG, CLIENT_NAS_PORT_ID,
+ CLIENT_CIRCUIT_ID, null, null,
+ -1, CLIENT_REMOTE_ID);
+
+ @Override
+ public SubscriberAndDeviceInformation get(String id) {
+ if (id.equals(OLT_SERIAL_NUMBER)) {
+ return device;
+ } else {
+ return sub;
+ }
+ }
+
+ @Override
+ public void invalidateAll() {}
+ @Override
+ public void invalidateId(String id) {}
+ @Override
+ public SubscriberAndDeviceInformation getfromCache(String id) {
+ return null;
+ }
+ }
+
+ /**
+ * Mock Sadis object to populate service.
+ */
+ class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+ MockSubscriberAndDeviceInformation(String id, VlanId cTag, VlanId cTag2,
+ VlanId sTag, String nasPortId,
+ String circuitId, MacAddress hardId,
+ Ip4Address ipAddress, int uplinkPort,
+ String remoteId) {
+ this.setHardwareIdentifier(hardId);
+ this.setId(id);
+ this.setIPAddress(ipAddress);
+ this.setNasPortId(nasPortId);
+ this.setCircuitId(circuitId);
+ this.setUplinkPort(uplinkPort);
+ this.setRemoteId(remoteId);
+
+ List<UniTagInformation> uniTagInformationList = new ArrayList<>();
+
+ UniTagInformation uniTagInformation = new UniTagInformation.Builder()
+ .setPonCTag(cTag)
+ .setPonSTag(sTag)
+ .setUsPonCTagPriority(CLIENT_C_PBIT)
+ .build();
+
+ UniTagInformation uniTagInformation2 = new UniTagInformation.Builder()
+ .setPonCTag(cTag2)
+ .setPonSTag(sTag)
+ .setUsPonCTagPriority(CLIENT_C_PBIT)
+ .build();
+
+ uniTagInformationList.add(uniTagInformation);
+ uniTagInformationList.add(uniTagInformation2);
+ this.setUniTagList(uniTagInformationList);
+ }
+ }
+
+ /**
+ * Mocks component context.
+ */
+ class MockComponentContext implements ComponentContext {
+ @Override
+ public Dictionary<String, Object> getProperties() {
+ Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+ return cfgDict;
+ }
+
+ @Override
+ public Object locateService(String name) {
+ return null;
+ }
+
+ @Override
+ public Object locateService(String name, ServiceReference reference) {
+ return null;
+ }
+
+ @Override
+ public Object[] locateServices(String name) {
+ return null;
+ }
+
+ @Override
+ public BundleContext getBundleContext() {
+ return null;
+ }
+
+ @Override
+ public Bundle getUsingBundle() {
+ return null;
+ }
+
+ @Override
+ public ComponentInstance getComponentInstance() {
+ return null;
+ }
+
+ @Override
+ public void enableComponent(String name) {
+ }
+
+ @Override
+ public void disableComponent(String name) {
+ }
+
+ @Override
+ public ServiceReference getServiceReference() {
+ return null;
+ }
+ }
+
+ /**
+ * Mocks the network config registry.
+ */
+ @SuppressWarnings("unchecked")
+ class MockNetworkConfigRegistry
+ extends NetworkConfigRegistryAdapter {
+ @Override
+ public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+ PppoeAgentConfig pppoeAgentConfig = new MockPppoeAgentConfig();
+ return (C) pppoeAgentConfig;
+ }
+ }
+
+ /**
+ * Mocks the network config registry.
+ */
+ class MockPppoeAgentConfig extends PppoeAgentConfig {
+ @Override
+ public boolean getUseOltUplinkForServerPktInOut() {
+ return true;
+ }
+ }
+
+ /**
+ * Mocks the DefaultPacketContext.
+ */
+ final class MockPacketContext extends DefaultPacketContext {
+
+ private MockPacketContext(long time, InboundPacket inPkt,
+ OutboundPacket outPkt, boolean block) {
+ super(time, inPkt, outPkt, block);
+ }
+
+ @Override
+ public void send() {
+ // We don't send anything out.
+ }
+ }
+
+ /**
+ * Creates a mock for event delivery service.
+ */
+ static class MockEventDispatcher extends DefaultEventSinkRegistry
+ implements EventDeliveryService {
+ static List<Event> eventList = new LinkedList<>();
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized void post(Event event) {
+ EventSink sink = getSink(event.getClass());
+ checkState(sink != null, "No sink for event %s", event);
+ sink.process(event);
+ eventList.add(event);
+ }
+
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return 0;
+ }
+ }
+
+ /**
+ * Creates a mock object for a scheduled executor service.
+ */
+ class MockExecutor implements ScheduledExecutorService {
+ private ScheduledExecutorService executor;
+
+ MockExecutor(ScheduledExecutorService executor) {
+ this.executor = executor;
+ }
+
+ String lastMethodCalled = "";
+ long lastInitialDelay;
+ long lastDelay;
+ TimeUnit lastUnit;
+
+ public void assertLastMethodCalled(String method, long initialDelay, long delay, TimeUnit unit) {
+ assertEquals(method, lastMethodCalled);
+ assertEquals(initialDelay, lastInitialDelay);
+ assertEquals(delay, lastDelay);
+ assertEquals(unit, lastUnit);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ lastMethodCalled = "scheduleRunnable";
+ lastDelay = delay;
+ lastUnit = unit;
+ return null;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ lastMethodCalled = "scheduleCallable";
+ lastDelay = delay;
+ lastUnit = unit;
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ lastMethodCalled = "scheduleAtFixedRate";
+ lastInitialDelay = initialDelay;
+ lastDelay = period;
+ lastUnit = unit;
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ lastMethodCalled = "scheduleWithFixedDelay";
+ lastInitialDelay = initialDelay;
+ lastDelay = delay;
+ lastUnit = unit;
+ command.run();
+ return null;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws ExecutionException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Creates a PPPoED message encapsulated by an Ethernet object.
+ *
+ * @param type PPoED message type.
+ * @param source source address.
+ * @param destination destination address of the message.
+ * @param cVlan inner vlan.
+ * @param sVlan outer vlan.
+ * @param sessionId session-id number.
+ *
+ * @return Ethernet packet with PPPoED payload.
+ */
+ Ethernet constructPppoedPacket(byte type, MacAddress source, MacAddress destination,
+ VlanId cVlan, VlanId sVlan, short sessionId) {
+ Ethernet pppoedPacket = new Ethernet();
+ pppoedPacket.setSourceMACAddress(source);
+ pppoedPacket.setDestinationMACAddress(destination);
+ pppoedPacket.setEtherType(Ethernet.TYPE_PPPOED);
+ pppoedPacket.setVlanID(cVlan.toShort());
+ pppoedPacket.setPriorityCode((byte) CLIENT_C_PBIT);
+ pppoedPacket.setQinQTPID(Ethernet.TYPE_VLAN);
+ pppoedPacket.setQinQVID(sVlan.toShort());
+
+ PPPoED pppoedLayer = new PPPoED();
+ pppoedLayer.setCode(type);
+ pppoedLayer.setSessionId(sessionId);
+ pppoedPacket.setPayload(pppoedLayer);
+
+ return pppoedPacket;
+ }
+
+ public class MockClusterCommunicationService<M> implements ClusterCommunicationService {
+ private Consumer handler;
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber, ExecutorService executor) {
+ }
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ }
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ handler.accept(message);
+ }
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return null;
+ }
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+ }
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Function<byte[], R> decoder,
+ NodeId toNodeId, Duration timeout) {
+ return null;
+ }
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+ }
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+ }
+ @Override
+ public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Consumer<M> handler, Executor executor) {
+ this.handler = handler;
+ }
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ }
+ }
+
+ /**
+ * Creates a random MAC address.
+ *
+ * @return random MAC address object.
+ */
+ MacAddress randomizeMacAddress() {
+ byte[] mac = new byte[6];
+ Random r = new Random();
+ r.nextBytes(mac);
+ return MacAddress.valueOf(mac);
+ }
+}
\ No newline at end of file