[VOL-4246] Feature parity with the previous implementation
Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 1868421..ee184a5 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.opencord.olt.impl;
+import com.google.common.collect.ImmutableMap;
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.IPv6;
@@ -26,18 +28,31 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
@@ -46,13 +61,11 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.host.HostService;
import org.onosproject.net.meter.MeterId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -67,26 +80,46 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Dictionary;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
+import static org.opencord.olt.impl.OltUtils.flowOpToString;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
-/**
- * Provisions flow rules on access devices.
- */
@Component(immediate = true, property = {
ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
ENABLE_DHCP_V4 + ":Boolean=" + ENABLE_DHCP_V4_DEFAULT,
@@ -94,32 +127,20 @@
ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
- DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
+ DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
+ // FIXME remove this option as potentially dangerous in production
+ WAIT_FOR_REMOVAL + ":Boolean=" + WAIT_FOR_REMOVAL_DEFAULT
})
-public class OltFlowService implements AccessDeviceFlowService {
- private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
- private static final String APP_NAME = "org.opencord.olt";
- private static final int NONE_TP_ID = -1;
- private static final int NO_PCP = -1;
- private static final Integer MAX_PRIORITY = 10000;
- private static final Integer MIN_PRIORITY = 1000;
- private static final String INSTALLED = "installed";
- private static final String REMOVED = "removed";
- private static final String INSTALLATION = "installation";
- private static final String REMOVAL = "removal";
- private static final String V4 = "V4";
- private static final String V6 = "V6";
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected FlowObjectiveService flowObjectiveService;
+public class OltFlowService implements OltFlowServiceInterface {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
+ protected ComponentConfigService cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL,
bind = "bindSadisService",
@@ -128,17 +149,65 @@
protected volatile SadisService sadisService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OltMeterServiceInterface oltMeterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OltDeviceServiceInterface oltDeviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowRuleService flowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected AccessDeviceMeterService oltMeterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ protected BaseInformationService<BandwidthProfileInformation> bpService;
+
+ private static final String APP_NAME = "org.opencord.olt";
+ protected ApplicationId appId;
+ private static final Integer MAX_PRIORITY = 10000;
+ private static final Integer MIN_PRIORITY = 1000;
+ private static final short EAPOL_DEFAULT_VLAN = 4091;
+ private static final int NONE_TP_ID = -1;
+ private static final String V4 = "V4";
+ private static final String V6 = "V6";
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected UniTagInformation defaultEapolUniTag = new UniTagInformation.Builder()
+ .setServiceName("defaultEapol").build();
+ protected UniTagInformation nniUniTag = new UniTagInformation.Builder()
+ .setServiceName("nni")
+ .setTechnologyProfileId(NONE_TP_ID)
+ .setPonCTag(VlanId.NONE)
+ .setUniTagMatch(VlanId.ANY)
+ .setUsPonCTagPriority(-1)
+ .build();
+
+ /**
+ * Connect Point status map.
+ * Used to keep track of which cp has flows that needs to be removed when the status changes.
+ */
+ protected Map<ServiceKey, OltPortStatus> cpStatus;
+ private final ReentrantReadWriteLock cpStatusLock = new ReentrantReadWriteLock();
+ private final Lock cpStatusWriteLock = cpStatusLock.writeLock();
+ private final Lock cpStatusReadLock = cpStatusLock.readLock();
+
+ /**
+ * This map contains the subscriber that have been provisioned by the operator.
+ * They may or may not have flows, depending on the port status.
+ * The map is used to define whether flows need to be provisioned when a port comes up.
+ */
+ protected Map<ServiceKey, Boolean> provisionedSubscribers;
+ private final ReentrantReadWriteLock provisionedSubscribersLock = new ReentrantReadWriteLock();
+ private final Lock provisionedSubscribersWriteLock = provisionedSubscribersLock.writeLock();
+ private final Lock provisionedSubscribersReadLock = provisionedSubscribersLock.readLock();
+
/**
* Create DHCP trap flow on NNI port(s).
*/
@@ -174,42 +243,76 @@
**/
protected int defaultTechProfileId = DEFAULT_TP_ID_DEFAULT;
- protected ApplicationId appId;
- protected BaseInformationService<BandwidthProfileInformation> bpService;
- protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
+ protected boolean waitForRemoval = WAIT_FOR_REMOVAL_DEFAULT;
+
+ public enum FlowOperation {
+ ADD,
+ REMOVE;
+
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
+ }
+
+ public enum FlowDirection {
+ UPSTREAM,
+ DOWNSTREAM,
+ }
+
+ public enum OltFlowsStatus {
+ NONE,
+ PENDING_ADD,
+ ADDED,
+ PENDING_REMOVE,
+ REMOVED,
+ ERROR
+ }
+
+ protected InternalFlowListener internalFlowListener;
@Activate
public void activate(ComponentContext context) {
- if (sadisService != null) {
- bpService = sadisService.getBandwidthProfileService();
- subsService = sadisService.getSubscriberInfoService();
- } else {
- log.warn(SADIS_NOT_RUNNING);
- }
- componentConfigService.registerProperties(getClass());
- appId = coreService.getAppId(APP_NAME);
+ cfgService.registerProperties(getClass());
+ appId = coreService.registerApplication(APP_NAME);
+ internalFlowListener = new InternalFlowListener();
+
+ modified(context);
+
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .register(UniTagInformation.class)
- .register(SubscriberFlowInfo.class)
+ .register(OltFlowsStatus.class)
+ .register(FlowDirection.class)
+ .register(OltPortStatus.class)
+ .register(OltFlowsStatus.class)
.register(AccessDevicePort.class)
- .register(AccessDevicePort.Type.class)
- .register(LinkedBlockingQueue.class)
+ .register(new ServiceKeySerializer(), ServiceKey.class)
+ .register(UniTagInformation.class)
.build();
- pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
- .withName("volt-pending-eapol")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build().asJavaMap();
- log.info("started");
- }
+ cpStatus = storageService.<ServiceKey, OltPortStatus>consistentMapBuilder()
+ .withName("volt-cp-status")
+ .withApplicationId(appId)
+ .withSerializer(Serializer.using(serializer))
+ .build().asJavaMap();
+
+ provisionedSubscribers = storageService.<ServiceKey, Boolean>consistentMapBuilder()
+ .withName("volt-provisioned-subscriber")
+ .withApplicationId(appId)
+ .withSerializer(Serializer.using(serializer))
+ .build().asJavaMap();
+
+ flowRuleService.addListener(internalFlowListener);
+
+ log.info("Started");
+ }
@Deactivate
public void deactivate(ComponentContext context) {
- componentConfigService.unregisterProperties(getClass(), false);
- log.info("stopped");
+ cfgService.unregisterProperties(getClass(), false);
+ flowRuleService.removeListener(internalFlowListener);
+ log.info("Stopped");
}
@Modified
@@ -247,106 +350,712 @@
enablePppoe = pppoe;
}
+ Boolean wait = Tools.isPropertyEnabled(properties, WAIT_FOR_REMOVAL);
+ if (wait != null) {
+ waitForRemoval = wait;
+ }
+
String tpId = get(properties, DEFAULT_TP_ID);
defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
- log.info("modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
- "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
- "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}",
- enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
- enableIgmpOnNni, enableEapol, enablePppoe,
- defaultTechProfileId);
+ log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
+ "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
+ "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
+ "waitForRemoval:{}",
+ enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
+ enableIgmpOnNni, enableEapol, enablePppoe,
+ defaultTechProfileId, waitForRemoval);
}
- protected void bindSadisService(SadisService service) {
- sadisService = service;
- bpService = sadisService.getBandwidthProfileService();
- subsService = sadisService.getSubscriberInfoService();
- log.info("Sadis-service binds to onos.");
- }
-
- protected void unbindSadisService(SadisService service) {
- sadisService = null;
- bpService = null;
- subsService = null;
- log.info("Sadis-service unbinds from onos.");
- }
-
@Override
- public void processDhcpFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream,
- Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
- if (upstream) {
- // for UNI ports
- if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
- log.debug("Dhcp provisioning is disabled for UNI port {} for service {}",
- port, tagInformation.getServiceName());
- dhcpFuture.ifPresent(f -> f.complete(null));
- return;
- }
- } else {
- // for NNI ports
- if (!enableDhcpOnNni) {
- log.debug("Dhcp provisioning is disabled for NNI port {}", port);
- dhcpFuture.ifPresent(f -> f.complete(null));
- return;
- }
- }
- int techProfileId = tagInformation != null ? tagInformation.getTechnologyProfileId() : NONE_TP_ID;
- VlanId cTag = tagInformation != null ? tagInformation.getPonCTag() : VlanId.NONE;
- VlanId unitagMatch = tagInformation != null ? tagInformation.getUniTagMatch() : VlanId.ANY;
- Byte vlanPcp = tagInformation != null && tagInformation.getUsPonCTagPriority() != NO_PCP
- ? (byte) tagInformation.getUsPonCTagPriority() : null;
-
-
- if (enableDhcpV4) {
- int udpSrc = (upstream) ? 68 : 67;
- int udpDst = (upstream) ? 67 : 68;
-
- EthType ethType = EthType.EtherType.IPV4.ethType();
- byte protocol = IPv4.PROTOCOL_UDP;
-
- addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
- }
-
- if (enableDhcpV6) {
- int udpSrc = (upstream) ? 547 : 546;
- int udpDst = (upstream) ? 546 : 547;
-
- EthType ethType = EthType.EtherType.IPV6.ethType();
- byte protocol = IPv6.PROTOCOL_UDP;
-
- addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
+ public ImmutableMap<ServiceKey, OltPortStatus> getConnectPointStatus() {
+ try {
+ cpStatusReadLock.lock();
+ return ImmutableMap.copyOf(cpStatus);
+ } finally {
+ cpStatusReadLock.unlock();
}
}
- private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
- EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
- int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
- Byte vlanPcp, boolean upstream,
- boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
+ @Override
+ public ImmutableMap<ServiceKey, UniTagInformation> getProgrammedSubscribers() {
+ // NOTE we might want to remove this conversion and directly use cpStatus as it contains
+ // all the required information about suscribers
+ Map<ServiceKey, UniTagInformation> subscribers =
+ new HashMap<>();
+ try {
+ cpStatusReadLock.lock();
+
+ cpStatus.forEach((sk, status) -> {
+ if (
+ // not NNI Port
+ !oltDeviceService.isNniPort(deviceService.getDevice(sk.getPort().connectPoint().deviceId()),
+ sk.getPort().connectPoint().port()) &&
+ // not EAPOL flow
+ !sk.getService().equals(defaultEapolUniTag)
+ ) {
+ subscribers.put(sk, sk.getService());
+ }
+ });
+
+ return ImmutableMap.copyOf(subscribers);
+ } finally {
+ cpStatusReadLock.unlock();
+ }
+ }
+
+ @Override
+ public Map<ServiceKey, Boolean> getRequestedSubscribers() {
+ try {
+ provisionedSubscribersReadLock.lock();
+ return ImmutableMap.copyOf(provisionedSubscribers);
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void handleNniFlows(Device device, Port port, FlowOperation action) {
+
+ // always handle the LLDP flow
+ processLldpFilteringObjective(device.id(), port, action);
+
+ if (enableDhcpOnNni) {
+ if (enableDhcpV4) {
+ log.debug("Installing DHCPv4 trap flow on NNI {} for device {}", portWithName(port), device.id());
+ processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ 67, 68, EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP,
+ null, null, nniUniTag);
+ }
+ if (enableDhcpV6) {
+ log.debug("Installing DHCPv6 trap flow on NNI {} for device {}", portWithName(port), device.id());
+ processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ 546, 547, EthType.EtherType.IPV6.ethType(), IPv6.PROTOCOL_UDP,
+ null, null, nniUniTag);
+ }
+ } else {
+ log.info("DHCP is not required on NNI {} for device {}", portWithName(port), device.id());
+ }
+
+ if (enableIgmpOnNni) {
+ log.debug("Installing IGMP flow on NNI {} for device {}", portWithName(port), device.id());
+ processIgmpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
+ }
+
+ if (enablePppoe) {
+ log.debug("Installing PPPoE flow on NNI {} for device {}", port.number(), device.id());
+ processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
+ }
+ }
+
+ @Override
+ public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
+ String oltBandwidthProfileId) {
+
+ // we only need to something if EAPOL is enabled
+ if (!enableEapol) {
+ return true;
+ }
+
+ if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+ return addDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+ } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+ return removeDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+ } else {
+ log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
+ return false;
+ }
+
+ }
+
+ private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
+
+ if (!oltMeterService.createMeter(sub.device.id(), bandwidthProfileId)) {
+ if (log.isTraceEnabled()) {
+ log.trace("waiting on meter for bp {} and sub {}", bandwidthProfileId, sub);
+ }
+ return false;
+ }
+ if (hasDefaultEapol(sub.port)) {
+ return true;
+ }
+ return handleEapolFlow(sub, bandwidthProfileId,
+ oltBandwidthProfileId, FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+
+ }
+
+ private boolean removeDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfile, String oltBandwidthProfile) {
+ // NOTE that we are not checking for meters as they must have been created to install the flow in first place
+ return handleEapolFlow(sub, bandwidthProfile, oltBandwidthProfile,
+ FlowOperation.REMOVE, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+ }
+
+ @Override
+ public boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwidthProfile,
+ String multicastServiceName) {
+ // NOTE that we are taking defaultBandwithProfile as a parameter as that can be configured in the Olt component
+ if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+ return addSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+ } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+ return removeSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+ } else {
+ log.error("don't know how to handle {}", sub);
+ return false;
+ }
+ }
+
+ private boolean addSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+ String multicastServiceName) {
+ if (log.isTraceEnabled()) {
+ log.trace("Provisioning of subscriber on {} started", portWithName(sub.port));
+ }
+ if (enableEapol) {
+ if (hasDefaultEapol(sub.port)) {
+ // remove EAPOL flow and throw exception so that we'll retry later
+ if (!isDefaultEapolPendingRemoval(sub.port)) {
+ removeDefaultFlows(sub, defaultBandwithProfile, defaultBandwithProfile);
+ }
+
+ if (waitForRemoval) {
+ // NOTE wait for removal is a flag only needed to make sure VOLTHA
+ // does not explode with the flows remove/add in the same batch
+ log.debug("Awaiting for default flows removal for {}", portWithName(sub.port));
+ return false;
+ } else {
+ log.warn("continuing provisioning on {}", portWithName(sub.port));
+ }
+ }
+
+ }
+
+ // NOTE createMeters will return if the meters are not installed
+ if (!oltMeterService.createMeters(sub.device.id(),
+ sub.subscriberAndDeviceInformation)) {
+ return false;
+ }
+
+ // NOTE we need to add the DHCP flow regardless so that the host can be discovered and the MacAddress added
+ handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.ADD,
+ sub.subscriberAndDeviceInformation);
+
+ if (isMacLearningEnabled(sub.subscriberAndDeviceInformation)
+ && !isMacAddressAvailable(sub.device.id(), sub.port,
+ sub.subscriberAndDeviceInformation)) {
+ log.debug("Awaiting for macAddress on {}", portWithName(sub.port));
+ return false;
+ }
+
+ handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
+ sub.subscriberAndDeviceInformation, multicastServiceName);
+
+ handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
+ handleSubscriberIgmpFlows(sub, FlowOperation.ADD);
+
+ log.info("Provisioning of subscriber on {} completed", portWithName(sub.port));
+ return true;
+ }
+
+ private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+ String multicastServiceName) {
+
+ if (log.isTraceEnabled()) {
+ log.trace("Removal of subscriber on {} started",
+ portWithName(sub.port));
+ }
+ SubscriberAndDeviceInformation si = subsService.get(sub.portName());
+ if (si == null) {
+ log.error("Subscriber information not found in sadis for port {} during subscriber removal",
+ portWithName(sub.port));
+ // NOTE that we are returning true so that the subscriber is removed from the queue
+ // and we can move on provisioning others
+ return true;
+ }
+
+ handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
+
+ if (enableEapol) {
+ // remove the tagged eapol
+ handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+
+ // and add the default one back
+ if (sub.port.isEnabled()) {
+ // NOTE we remove the subscriber when the port goes down
+ // but in that case we don't need to add default eapol
+ handleEapolFlow(sub, defaultBandwithProfile, defaultBandwithProfile,
+ FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+ }
+ }
+ handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
+
+ handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+ // FIXME check the return status of the flow and return accordingly
+ log.info("Removal of subscriber on {} completed", portWithName(sub.port));
+ return true;
+ }
+
+ @Override
+ public boolean hasDefaultEapol(Port port) {
+ OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+ // NOTE we consider ERROR as a present EAPOL flow as ONOS
+ // will keep trying to add it
+ return status != null && (status.defaultEapolStatus == OltFlowsStatus.ADDED ||
+ status.defaultEapolStatus == OltFlowsStatus.PENDING_ADD ||
+ status.defaultEapolStatus == OltFlowsStatus.ERROR);
+ }
+
+ private OltPortStatus getOltPortStatus(Port port, UniTagInformation uniTagInformation) {
+ try {
+ cpStatusReadLock.lock();
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uniTagInformation);
+ OltPortStatus status = cpStatus.get(sk);
+ return status;
+ } finally {
+ cpStatusReadLock.unlock();
+ }
+ }
+
+ public boolean isDefaultEapolPendingRemoval(Port port) {
+ OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during EAPOL flow check {} for port {} and UniTagInformation {}",
+ status, portWithName(port), defaultEapolUniTag);
+ }
+ return status != null && status.defaultEapolStatus == OltFlowsStatus.PENDING_REMOVE;
+ }
+
+ @Override
+ public boolean hasDhcpFlows(Port port, UniTagInformation uti) {
+ OltPortStatus status = getOltPortStatus(port, uti);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during DHCP flow check {} for port {} and service {}",
+ status, portWithName(port), uti.getServiceName());
+ }
+ return status != null &&
+ (status.dhcpStatus == OltFlowsStatus.ADDED ||
+ status.dhcpStatus == OltFlowsStatus.PENDING_ADD);
+ }
+
+ @Override
+ public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
+
+ OltPortStatus status = getOltPortStatus(port, uti);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during subscriber flow check {} for port {} and service {}",
+ status, portWithName(port), uti.getServiceName());
+ }
+ return status != null && (status.subscriberFlowsStatus == OltFlowsStatus.ADDED ||
+ status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
+ }
+
+ @Override
+ public void purgeDeviceFlows(DeviceId deviceId) {
+ log.debug("Purging flows on device {}", deviceId);
+ flowRuleService.purgeFlowRules(deviceId);
+
+ // removing the status from the cpStatus map
+ try {
+ cpStatusWriteLock.lock();
+ Iterator<Map.Entry<ServiceKey, OltPortStatus>> iter = cpStatus.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<ServiceKey, OltPortStatus> entry = iter.next();
+ if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+ cpStatus.remove(entry.getKey());
+ }
+ }
+ } finally {
+ cpStatusWriteLock.unlock();
+ }
+
+ // removing subscribers from the provisioned map
+ try {
+ provisionedSubscribersWriteLock.lock();
+ Iterator<Map.Entry<ServiceKey, Boolean>> iter = provisionedSubscribers.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<ServiceKey, Boolean> entry = iter.next();
+ if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+ provisionedSubscribers.remove(entry.getKey());
+ }
+ }
+ } finally {
+ provisionedSubscribersWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
+ // if any service is programmed on this port, returns true
+ AtomicBoolean provisioned = new AtomicBoolean(false);
+ try {
+ provisionedSubscribersReadLock.lock();
+ for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
+ if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+ provisioned.set(true);
+ break;
+ }
+ }
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ return provisioned.get();
+ }
+
+ @Override
+ public boolean isSubscriberServiceProvisioned(ServiceKey sk) {
+ try {
+ provisionedSubscribersReadLock.lock();
+ Boolean provisioned = provisionedSubscribers.get(sk);
+ if (provisioned == null || !provisioned) {
+ return false;
+ }
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status) {
+ try {
+ provisionedSubscribersWriteLock.lock();
+ provisionedSubscribers.put(sk, status);
+ } finally {
+ provisionedSubscribersWriteLock.unlock();
+ }
+ }
+
+ private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+ String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
+
+ // create a subscriberKey for the EAPOL flow
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(sub.port), defaultEapolUniTag);
+
+ // NOTE we only need to keep track of the default EAPOL flow in the
+ // connectpoint status map
+ if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+ OltFlowsStatus status = action == FlowOperation.ADD ?
+ OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+ updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+
+ }
+
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ int techProfileId = getDefaultTechProfileId(sub.port);
+ MeterId meterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), bandwidthProfile);
+
+ // in the delete case the meter should still be there as we remove
+ // the meters only if no flows are pointing to them
+ if (meterId == null) {
+ log.debug("MeterId is null for BandwidthProfile {} on device {}",
+ bandwidthProfile, sub.device.id());
+ return false;
+ }
+
+ MeterId oltMeterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), oltBandwidthProfile);
+ if (oltMeterId == null) {
+ log.debug("MeterId is null for OltBandwidthProfile {} on device {}",
+ oltBandwidthProfile, sub.device.id());
+ return false;
+ }
+
+ log.info("{} EAPOL flow for {} with vlanId {} and BandwidthProfile {} (meterId {})",
+ flowOpToString(action), portWithName(sub.port), vlanId, bandwidthProfile, meterId);
+
+ FilteringObjective.Builder eapolAction;
+
+ if (action == FlowOperation.ADD) {
+ eapolAction = filterBuilder.permit();
+ } else if (action == FlowOperation.REMOVE) {
+ eapolAction = filterBuilder.deny();
+ } else {
+ log.error("Operation {} not supported", action);
+ return false;
+ }
+
+ FilteringObjective.Builder baseEapol = eapolAction
+ .withKey(Criteria.matchInPort(sub.port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()));
+
+ // NOTE we only need to add the treatment to install the flow,
+ // we can remove it based in the match
+ FilteringObjective.Builder eapol;
+
+ TrafficTreatment treatment = treatmentBuilder
+ .meter(meterId)
+ .writeMetadata(createTechProfValueForWriteMetadata(
+ vlanId,
+ techProfileId, oltMeterId), 0)
+ .setOutput(PortNumber.CONTROLLER)
+ .pushVlan()
+ .setVlanId(vlanId)
+ .build();
+ eapol = baseEapol
+ .withMeta(treatment);
+
+ FilteringObjective eapolObjective = eapol
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("EAPOL flow objective {} for {}",
+ completeFlowOpToString(action), portWithName(sub.port));
+ if (log.isTraceEnabled()) {
+ log.trace("EAPOL flow details for port {}: {}", portWithName(sub.port), objective);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Cannot {} eapol flow for {} : {}", action,
+ portWithName(sub.port), error);
+
+ if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+ updateConnectPointStatus(sk,
+ OltFlowsStatus.ERROR, null, null);
+ }
+ }
+ });
+
+ flowObjectiveService.filter(sub.device.id(), eapolObjective);
+
+ log.info("{} EAPOL filter for {}", completeFlowOpToString(action), portWithName(sub.port));
+ return true;
+ }
+
+ // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
+ private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+ SubscriberAndDeviceInformation si) {
+ if (!enableEapol) {
+ return true;
+ }
+ // TODO verify we need an EAPOL flow for EACH service
+ AtomicBoolean success = new AtomicBoolean(true);
+ si.uniTagList().forEach(u -> {
+ // we assume that if subscriber flows are installed, tagged EAPOL is installed as well
+ boolean hasFlows = hasSubscriberFlows(sub.port, u);
+
+ // if the subscriber flows are present the EAPOL flow is present thus we don't need to install it,
+ // if the subscriber does not have flows the EAPOL flow is not present thus we don't need to remove it
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not {} EAPOL on {}({}) as subscriber flow status is {}", flowOpToString(action),
+ portWithName(sub.port), u.getServiceName(), hasFlows);
+ return;
+ }
+ log.info("{} EAPOL flows for subscriber {} on {} and service {}",
+ flowOpToString(action), si.id(), portWithName(sub.port), u.getServiceName());
+ if (!handleEapolFlow(sub, u.getUpstreamBandwidthProfile(),
+ u.getUpstreamOltBandwidthProfile(),
+ action, u.getPonCTag())) {
+ //
+ log.error("Failed to {} EAPOL with suscriber tags", action);
+ //TODO this sets it for all services, maybe some services succeeded.
+ success.set(false);
+ }
+ });
+ return success.get();
+ }
+
+ private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+ sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
+ if (uti.getIsIgmpRequired()) {
+ DeviceId deviceId = sub.device.id();
+ // if we reached here a meter already exists
+ MeterId meterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+ MeterId oltMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+ processIgmpFilteringObjectives(deviceId, sub.port,
+ action, FlowDirection.UPSTREAM, meterId, oltMeterId, uti.getTechnologyProfileId(),
+ uti.getPonCTag(), uti.getUniTagMatch(), uti.getUsPonCTagPriority());
+ }
+ });
+ }
+
+ private boolean checkSadisRunning() {
+ if (bpService == null) {
+ log.warn("Sadis is not running");
+ return false;
+ }
+ return true;
+ }
+
+ private int getDefaultTechProfileId(Port port) {
+ if (!checkSadisRunning()) {
+ return defaultTechProfileId;
+ }
+ if (port != null) {
+ SubscriberAndDeviceInformation info = subsService.get(getPortName(port));
+ if (info != null && info.uniTagList().size() == 1) {
+ return info.uniTagList().get(0).getTechnologyProfileId();
+ }
+ }
+ return defaultTechProfileId;
+ }
+
+ protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+ Long writeMetadata;
+
+ if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+ writeMetadata = (long) techProfileId << 32;
+ } else {
+ writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+ }
+ if (upstreamOltMeterId == null) {
+ return writeMetadata;
+ } else {
+ return writeMetadata | upstreamOltMeterId.id();
+ }
+ }
+
+ private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+ FilteringObjective lldp = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+ .withMeta(DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} LLDP filter for {}.", completeFlowOpToString(action), portWithName(port));
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Falied to {} LLDP filter on {} because {}", action, portWithName(port),
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(deviceId, lldp);
+ }
+
+ protected void handleSubscriberDhcpFlows(DeviceId deviceId, Port port,
+ FlowOperation action,
+ SubscriberAndDeviceInformation si) {
+ si.uniTagList().forEach(uti -> {
+
+ if (!uti.getIsDhcpRequired()) {
+ return;
+ }
+
+ // if it's an ADD skip if flows are there,
+ // if it's a DELETE skip if flows are not there
+ boolean hasFlows = hasDhcpFlows(port, uti);
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not dealing with DHCP {} on {} as DHCP flow status is {}", action,
+ uti.getServiceName(), hasFlows);
+ return;
+ }
+
+ log.info("{} DHCP flows for subscriber on {} and service {}",
+ flowOpToString(action), portWithName(port), uti.getServiceName());
+
+ // if we reached here a meter already exists
+ MeterId meterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+ MeterId oltMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+ if (enableDhcpV4) {
+ processDhcpFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, 68, 67,
+ EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP, meterId, oltMeterId,
+ uti);
+ }
+ if (enableDhcpV6) {
+ log.error("DHCP V6 not supported for subscribers");
+ }
+ });
+ }
+
+ // FIXME return boolean, if this fails we need to retry
+ protected void handleSubscriberDataFlows(Device device, Port port,
+ FlowOperation action,
+ SubscriberAndDeviceInformation si, String multicastServiceName) {
+
+ Optional<Port> nniPort = oltDeviceService.getNniPort(device);
+ if (nniPort.isEmpty()) {
+ log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
+ si.id(), portWithName(port));
+ return;
+ }
+ si.uniTagList().forEach(uti -> {
+
+ boolean hasFlows = hasSubscriberFlows(port, uti);
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not dealing with DP flows {} on {} as subscriber flow status is {}", action,
+ uti.getServiceName(), hasFlows);
+ return;
+ }
+
+ if (multicastServiceName.equals(uti.getServiceName())) {
+ log.debug("This is the multicast service ({}) for subscriber {} on {}, " +
+ "dataplane flows are not needed",
+ uti.getServiceName(), si.id(), portWithName(port));
+ return;
+ }
+
+ log.info("{} Data plane flows for subscriber {} on {} and service {}",
+ flowOpToString(action), si.id(), portWithName(port), uti.getServiceName());
+
+ // upstream flows
+ MeterId usMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
+ MeterId oltUsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
+ processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+ oltUsMeterId, uti);
+
+ // downstream flows
+ MeterId dsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
+ MeterId oltDsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
+ processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+ oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+ });
+ }
+
+ private void processDhcpFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ int udpSrc, int udpDst, EthType ethType, byte protocol,
+ MeterId meterId, MeterId oltMeterId, UniTagInformation uti) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+ log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
+
+
+ OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
+ OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+ updateConnectPointStatus(sk, null, null, status);
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
}
- if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
+ if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(
+ createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
+ uti.getTechnologyProfileId(), oltMeterId), 0);
}
- FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
+ FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
.withKey(Criteria.matchInPort(port.number()))
.addCondition(Criteria.matchEthType(ethType))
.addCondition(Criteria.matchIPProtocol(protocol))
@@ -356,87 +1065,124 @@
.withPriority(MAX_PRIORITY);
//VLAN changes and PCP matching need to happen only in the upstream directions
- if (upstream) {
- treatmentBuilder.setVlanId(cTag);
- if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
- dhcpUpstreamBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ if (direction == FlowDirection.UPSTREAM) {
+ treatmentBuilder.setVlanId(uti.getPonCTag());
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+ dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
}
- if (vlanPcp != null) {
- treatmentBuilder.setVlanPcp(vlanPcp);
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
}
- dhcpUpstreamBuilder.withMeta(treatmentBuilder
- .setOutput(PortNumber.CONTROLLER).build());
+ dhcpBuilder.withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build());
- FilteringObjective dhcpUpstream = dhcpUpstreamBuilder.add(new ObjectiveContext() {
+ FilteringObjective dhcpUpstream = dhcpBuilder.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("DHCP {} filter for {} {}.",
- (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
- (install) ? INSTALLED : REMOVED);
- dhcpFuture.ifPresent(f -> f.complete(null));
+ log.info("{} DHCP {} filter for {}.",
+ completeFlowOpToString(action), (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ portWithName(port));
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.error("DHCP {} filter for {} failed {} because {}",
- (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
- (install) ? INSTALLATION : REMOVAL,
+ (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ portWithName(port),
+ action,
error);
- dhcpFuture.ifPresent(f -> f.complete(error));
+ updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
}
});
- flowObjectiveService.filter(port.deviceId(), dhcpUpstream);
+ flowObjectiveService.filter(deviceId, dhcpUpstream);
+ }
+
+ private void processIgmpFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ MeterId meterId, MeterId oltMeterId, int techProfileId,
+ VlanId cTag, VlanId unitagMatch, int vlanPcp) {
+
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ if (direction == FlowDirection.UPSTREAM) {
+
+ if (techProfileId != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+ techProfileId, oltMeterId), 0);
+ }
+
+
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+ filterBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(cTag)) {
+ treatmentBuilder.setVlanId(cTag);
+ }
+
+ if (vlanPcp != -1) {
+ treatmentBuilder.setVlanPcp((byte) vlanPcp);
+ }
+ }
+
+ filterBuilder = (action == FlowOperation.ADD) ? filterBuilder.permit() : filterBuilder.deny();
+
+ FilteringObjective igmp = filterBuilder
+ .withKey(Criteria.matchInPort(port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+ .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+ .withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Igmp filter for {} {}.", portWithName(port), action);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Igmp filter for {} failed {} because {}.", portWithName(port), action,
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(deviceId, igmp);
}
- @Override
- public void processPPPoEDFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream) {
- if (!enablePppoe) {
- log.debug("PPPoED filtering is disabled. Ignoring request.");
- return;
- }
-
- int techProfileId = NONE_TP_ID;
- VlanId cTag = VlanId.NONE;
- VlanId unitagMatch = VlanId.ANY;
- Byte vlanPcp = null;
-
- if (tagInformation != null) {
- techProfileId = tagInformation.getTechnologyProfileId();
- cTag = tagInformation.getPonCTag();
- unitagMatch = tagInformation.getUniTagMatch();
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- vlanPcp = (byte) tagInformation.getUsPonCTagPriority();
- }
- }
+ private void processPPPoEDFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ MeterId meterId, MeterId oltMeterId, int techProfileId,
+ VlanId cTag, VlanId unitagMatch, Byte vlanPcp) {
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
}
if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
+ treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
}
- DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
+ DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
+ ? builder.permit() : builder.deny())
.withKey(Criteria.matchInPort(port.number()))
.addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
.fromApp(appId)
.withPriority(10000);
- if (upstream) {
+ if (direction == FlowDirection.UPSTREAM) {
treatmentBuilder.setVlanId(cTag);
if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
pppoedBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
@@ -451,398 +1197,50 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("PPPoED filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
+ log.info("PPPoED filter for {} {}.", portWithName(port), action);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("PPPoED filter for {} failed {} because {}", port,
- (install) ? INSTALLATION : REMOVAL, error);
+ log.info("PPPoED filter for {} failed {} because {}", portWithName(port),
+ action, error);
}
});
- flowObjectiveService.filter(port.deviceId(), pppoed);
+ flowObjectiveService.filter(deviceId, pppoed);
}
- @Override
- public void processIgmpFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream) {
- if (upstream) {
- // for UNI ports
- if (tagInformation != null && !tagInformation.getIsIgmpRequired()) {
- log.debug("Igmp provisioning is disabled for UNI port {} for service {}",
- port, tagInformation.getServiceName());
- return;
- }
- } else {
- // for NNI ports
- if (!enableIgmpOnNni) {
- log.debug("Igmp provisioning is disabled for NNI port {}", port);
- return;
- }
- }
-
- log.debug("{} IGMP flows on {}", (install) ? "Installing" : "Removing", port);
- DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
- TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- if (upstream) {
-
- if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
- tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
- }
-
-
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
- }
-
- if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getUniTagMatch())) {
- filterBuilder.addCondition(Criteria.matchVlanId(tagInformation.getUniTagMatch()));
- }
-
- if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getPonCTag())) {
- treatmentBuilder.setVlanId(tagInformation.getPonCTag());
- }
-
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
- }
- }
-
- filterBuilder = install ? filterBuilder.permit() : filterBuilder.deny();
-
- FilteringObjective igmp = filterBuilder
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
- .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
- .withMeta(treatmentBuilder
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Igmp filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("Igmp filter for {} failed {} because {}.", port, (install) ? INSTALLATION : REMOVAL,
- error);
- }
- });
-
- flowObjectiveService.filter(port.deviceId(), igmp);
- }
-
- @Override
- public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
- CompletableFuture<ObjectiveError> filterFuture,
- VlanId vlanId, boolean install) {
-
- if (!enableEapol) {
- log.debug("Eapol filtering is disabled. Completing filteringFuture immediately for the device {}",
- port.deviceId());
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- return;
- }
- log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
- BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
- BandwidthProfileInformation oltBpInfo;
- if (oltBpId.isPresent()) {
- oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
- } else {
- oltBpInfo = bpInfo;
- }
- if (bpInfo == null) {
- log.warn("Bandwidth profile {} is not found. Authentication flow"
- + " will not be installed on {}", bpId, port);
- if (filterFuture != null) {
- filterFuture.complete(ObjectiveError.BADPARAMS);
- }
- return;
- }
-
- ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
- DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
- TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- // check if meter exists and create it only for an install
- final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
- MeterId oltMeterId = null;
- if (oltBpId.isPresent()) {
- oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
- }
- log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
- "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
- if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
- if (install) {
- log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder()
- .setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId,
- null, bpInfo.id(), null, oltBpInfo.id());
- pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
- if (queue == null) {
- queue = new LinkedBlockingQueue<>();
- }
- queue.add(fi);
- return queue;
- });
-
- //If false the meter is already being installed, skipping installation
- if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
- !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
- return;
- }
- List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
- bwpList.stream().distinct().filter(Objects::nonNull)
- .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
- cp, filterBuilder, treatmentBuilder));
- } else {
- // this case should not happen as the request to remove an eapol
- // flow should mean that the flow points to a meter that exists.
- // Nevertheless we can still delete the flow as we only need the
- // correct 'match' to do so.
- log.warn("Unknown meter id for bp {}, still proceeding with "
- + "delete of eapol flow on {}", bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder().setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId, null,
- bpInfo.id(), null, oltBpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
- }
- } else {
- log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder().setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId, null,
- bpInfo.id(), null, oltBpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
- //No need for the future, meter is present.
- return;
- }
- }
-
- private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
- CompletableFuture<ObjectiveError> filterFuture,
- boolean install, ConnectPoint cp,
- DefaultFilteringObjective.Builder filterBuilder,
- TrafficTreatment.Builder treatmentBuilder) {
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
- MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
- DeviceId deviceId = port.deviceId();
- meterFuture.thenAccept(result -> {
- //for each pending eapol flow we check if the meter is there.
- pendingEapolForDevice.compute(deviceId, (id, queue) -> {
- if (queue != null && !queue.isEmpty()) {
- while (true) {
- //TODO this might return the reference and not the actual object
- // so it can be actually swapped underneath us.
- SubscriberFlowInfo fi = queue.peek();
- if (fi == null) {
- log.debug("No more subscribers eapol flows on {}", deviceId);
- queue = new LinkedBlockingQueue<>();
- break;
- }
- log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
- if (result == null) {
- MeterId mId = oltMeterService
- .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
- MeterId oltMeterId = oltMeterService
- .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
- if (mId != null && oltMeterId != null) {
- log.debug("Meter installation completed for subscriber on {}, " +
- "handling EAPOL trap flow", port);
- fi.setUpMeterId(mId);
- fi.setUpOltMeterId(oltMeterId);
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
- mId, oltMeterId);
- queue.remove(fi);
- } else {
- log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
- "oltMeterId {}", fi, meterId, oltMeterId);
- }
- } else {
- log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
- "Result {} and MeterId {}", port, result, meterId);
- queue.remove(fi);
- }
- oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
- }
- } else {
- log.info("No pending EAPOLs on {}", port.deviceId());
- queue = new LinkedBlockingQueue<>();
- }
- return queue;
- });
- });
- }
-
- private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
- boolean install, ConnectPoint cp,
- DefaultFilteringObjective.Builder filterBuilder,
- TrafficTreatment.Builder treatmentBuilder,
- SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
- log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
- mId, fi.getUpBpInfo(), fi.getUniPort(),
- (install) ? "Installing" : "Removing");
- int techProfileId = getDefaultTechProfileId(fi.getUniPort());
- // can happen in case of removal
- if (mId != null) {
- treatmentBuilder.meter(mId);
- }
- //Authentication trap flow uses only tech profile id as write metadata value
- FilteringObjective eapol = (install ? filterBuilder.permit() : filterBuilder.deny())
- .withKey(Criteria.matchInPort(fi.getUniPort().number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
- .withMeta(treatmentBuilder
- .writeMetadata(createTechProfValueForWm(
- fi.getTagInfo().getPonCTag(),
- techProfileId, oltMeterId), 0)
- .setOutput(PortNumber.CONTROLLER)
- .pushVlan()
- .setVlanId(fi.getTagInfo().getPonCTag())
- .build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Eapol filter {} for {} on {} with meter {}.",
- objective.id(), (install) ? INSTALLED : REMOVED, fi.getUniPort(), mId);
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("Eapol filter {} for {} with meter {} " +
- "failed {} because {}", objective.id(), fi.getUniPort(), mId,
- (install) ? INSTALLATION : REMOVAL,
- error);
- if (filterFuture != null) {
- filterFuture.complete(error);
- }
- }
- });
- flowObjectiveService.filter(fi.getDevId(), eapol);
- }
-
- /**
- * Installs trap filtering objectives for particular traffic types on an
- * NNI port.
- *
- * @param nniPort NNI port
- * @param install true to install, false to remove
- */
- @Override
- public void processNniFilteringObjectives(AccessDevicePort nniPort, boolean install) {
- log.info("{} flows for NNI port {}",
- install ? "Adding" : "Removing", nniPort);
- processLldpFilteringObjective(nniPort, install);
- processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
- processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
- processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
- }
-
-
- @Override
- public void processLldpFilteringObjective(AccessDevicePort port, boolean install) {
- DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
- FilteringObjective lldp = (install ? builder.permit() : builder.deny())
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
- .withMeta(DefaultTrafficTreatment.builder()
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("LLDP filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("LLDP filter for {} failed {} because {}", port, (install) ? INSTALLATION : REMOVAL,
- error);
- }
- });
-
- flowObjectiveService.filter(port.deviceId(), lldp);
- }
-
- @Override
- public ForwardingObjective.Builder createTransparentBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId meterId,
- UniTagInformation tagInfo,
- boolean upstream) {
-
+ private void processUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+ FlowOperation action,
+ MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
+ UniTagInformation uti) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchVlanId(tagInfo.getPonSTag())
- .matchInPort(upstream ? subscriberPort.number() : uplinkPort.number())
- .matchInnerVlanId(tagInfo.getPonCTag())
- .build();
-
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
- if (meterId != null) {
- tBuilder.meter(meterId);
- }
-
- TrafficTreatment treatment = tBuilder
- .setOutput(upstream ? uplinkPort.number() : subscriberPort.number())
- .writeMetadata(createMetadata(upstream ? tagInfo.getPonSTag() : tagInfo.getPonCTag(),
- tagInfo.getTechnologyProfileId(),
- upstream ? uplinkPort.number() : subscriberPort.number()), 0)
- .build();
-
- return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
- DefaultAnnotations.builder().build());
- }
-
- @Override
- public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation uniTagInformation) {
-
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchInPort(subscriberPort.number())
- .matchVlanId(uniTagInformation.getUniTagMatch())
+ .matchInPort(port.number())
+ .matchVlanId(uti.getUniTagMatch())
.build();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
//if the subscriberVlan (cTag) is different than ANY it needs to set.
- if (uniTagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
treatmentBuilder.pushVlan()
- .setVlanId(uniTagInformation.getPonCTag());
+ .setVlanId(uti.getPonCTag());
}
- if (uniTagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonCTagPriority());
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
treatmentBuilder.pushVlan()
- .setVlanId(uniTagInformation.getPonSTag());
+ .setVlanId(uti.getPonSTag());
- if (uniTagInformation.getUsPonSTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonSTagPriority());
+ if (uti.getUsPonSTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
}
- treatmentBuilder.setOutput(uplinkPort.number())
- .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
- uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
+ treatmentBuilder.setOutput(nniPort.number())
+ .writeMetadata(createMetadata(uti.getPonCTag(),
+ uti.getTechnologyProfileId(), nniPort.number()), 0L);
DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -855,55 +1253,84 @@
annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
}
- return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+ DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selector,
+ treatmentBuilder.build(), MIN_PRIORITY,
annotationBuilder.build());
+
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} Upstream Data plane filter for {}.",
+ completeFlowOpToString(action), sk);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Upstream Data plane filter for {} failed {} because {}.",
+ sk, action, error);
+ updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+ }
+ };
+
+ ForwardingObjective flow = null;
+ if (action == FlowOperation.ADD) {
+ flow = flowBuilder.add(context);
+ } else if (action == FlowOperation.REMOVE) {
+ flow = flowBuilder.remove(context);
+ } else {
+ log.error("Flow action not supported: {}", action);
+ }
+
+ if (flow != null) {
+ flowObjectiveService.forward(deviceId, flow);
+ }
}
- @Override
- public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId downstreamMeterId,
- MeterId downstreamOltMeterId,
- UniTagInformation tagInformation,
- Optional<MacAddress> macAddress) {
-
+ private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+ FlowOperation action,
+ MeterId downstreamMeterId,
+ MeterId downstreamOltMeterId,
+ UniTagInformation uti,
+ MacAddress macAddress) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
//subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
- .matchVlanId(tagInformation.getPonSTag())
- .matchInPort(uplinkPort.number())
- .matchInnerVlanId(tagInformation.getPonCTag());
+ .matchVlanId(uti.getPonSTag())
+ .matchInPort(nniPort.number())
+ .matchInnerVlanId(uti.getPonCTag());
-
- if (tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
- selectorBuilder.matchMetadata(tagInformation.getPonCTag().toShort());
+ if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ selectorBuilder.matchMetadata(uti.getPonCTag().toShort());
}
- if (tagInformation.getDsPonSTagPriority() != NO_PCP) {
- selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
+ if (uti.getDsPonCTagPriority() != -1) {
+ selectorBuilder.matchVlanPcp((byte) uti.getDsPonCTagPriority());
}
- macAddress.ifPresent(selectorBuilder::matchEthDst);
+ if (macAddress != null) {
+ selectorBuilder.matchEthDst(macAddress);
+ }
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
.popVlan()
- .setOutput(subscriberPort.number());
+ .setOutput(port.number());
- treatmentBuilder.writeMetadata(createMetadata(tagInformation.getPonCTag(),
- tagInformation.getTechnologyProfileId(),
- subscriberPort.number()), 0);
+ treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+ uti.getTechnologyProfileId(),
+ port.number()), 0);
// Upstream pbit is used to remark inner vlan pbit.
// Upstream is used to avoid trusting the BNG to send the packet with correct pbit.
// this is done because ds mode 0 is used because ds mode 3 or 6 that allow for
// all pbit acceptance are not widely supported by vendors even though present in
// the OMCI spec.
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
- if (!VlanId.NONE.equals(tagInformation.getUniTagMatch()) &&
- tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
- treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
+ if (!VlanId.NONE.equals(uti.getUniTagMatch()) &&
+ uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ treatmentBuilder.setVlanId(uti.getUniTagMatch());
}
DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -918,13 +1345,39 @@
annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
}
- return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
- annotationBuilder.build());
- }
+ DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+ treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
- @Override
- public void clearDeviceState(DeviceId deviceId) {
- pendingEapolForDevice.remove(deviceId);
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} Downstream Data plane filter for {}.",
+ completeFlowOpToString(action), sk);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.info("Downstream Data plane filter for {} failed {} because {}.",
+ sk, action, error);
+ updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+ }
+ };
+
+ ForwardingObjective flow = null;
+ if (action == FlowOperation.ADD) {
+ flow = flowBuilder.add(context);
+ } else if (action == FlowOperation.REMOVE) {
+ flow = flowBuilder.remove(context);
+ } else {
+ log.error("Flow action not supported: {}", action);
+ }
+
+ if (flow != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Forwarding rule {}", flow);
+ }
+ flowObjectiveService.forward(deviceId, flow);
+ }
}
private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -941,70 +1394,6 @@
.withTreatment(treatment);
}
- /**
- * Returns the write metadata value including tech profile reference and innerVlan.
- * For param cVlan, null can be sent
- *
- * @param cVlan c (customer) tag of one subscriber
- * @param techProfileId tech profile id of one subscriber
- * @param upstreamOltMeterId upstream meter id for OLT device.
- * @return the write metadata value including tech profile reference and innerVlan
- */
- private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
- Long writeMetadata;
-
- if (cVlan == null || VlanId.NONE.equals(cVlan)) {
- writeMetadata = (long) techProfileId << 32;
- } else {
- writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
- }
- if (upstreamOltMeterId == null) {
- return writeMetadata;
- } else {
- return writeMetadata | upstreamOltMeterId.id();
- }
- }
-
- private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
- if (bpService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return null;
- }
- if (bandwidthProfile == null) {
- return null;
- }
- return bpService.get(bandwidthProfile);
- }
-
- /**
- * It will be used to support AT&T use case (for EAPOL flows).
- * If multiple services are found in uniServiceList, returns default tech profile id
- * If one service is found, returns the found one
- *
- * @param port uni port
- * @return the default technology profile id
- */
- private int getDefaultTechProfileId(AccessDevicePort port) {
- if (subsService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return defaultTechProfileId;
- }
- if (port != null) {
- SubscriberAndDeviceInformation info = subsService.get(port.name());
- if (info != null && info.uniTagList().size() == 1) {
- return info.uniTagList().get(0).getTechnologyProfileId();
- }
- }
- return defaultTechProfileId;
- }
-
- /**
- * Write metadata instruction value (metadata) is 8 bytes.
- * <p>
- * MS 2 bytes: C Tag
- * Next 2 bytes: Technology Profile Id
- * Next 4 bytes: Port number (uni or nni)
- */
private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
if (techProfileId == NONE_TP_ID) {
techProfileId = DEFAULT_TP_ID_DEFAULT;
@@ -1013,5 +1402,318 @@
return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
}
+ private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
+ AtomicBoolean requiresMacLearning = new AtomicBoolean();
+ requiresMacLearning.set(false);
+ si.uniTagList().forEach(uniTagInfo -> {
+ if (uniTagInfo.getEnableMacLearning()) {
+ requiresMacLearning.set(true);
+ }
+ });
+
+ return requiresMacLearning.get();
+ }
+
+ /**
+ * Checks whether the subscriber has the MacAddress configured or discovered.
+ *
+ * @param deviceId DeviceId for this subscriber
+ * @param port Port for this subscriber
+ * @param si SubscriberAndDeviceInformation
+ * @return boolean
+ */
+ protected boolean isMacAddressAvailable(DeviceId deviceId, Port port, SubscriberAndDeviceInformation si) {
+ AtomicBoolean isConfigured = new AtomicBoolean();
+ isConfigured.set(true);
+
+ si.uniTagList().forEach(uniTagInfo -> {
+ boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
+ boolean configureMac = isMacAddressValid(uniTagInfo);
+ boolean discoveredMac = false;
+ Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+ .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+ if (optHost.isPresent() && optHost.get().mac() != null) {
+ discoveredMac = true;
+ }
+ if (isMacLearningEnabled && !configureMac && !discoveredMac) {
+ log.debug("Awaiting for macAddress on {} for service {}",
+ portWithName(port), uniTagInfo.getServiceName());
+ isConfigured.set(false);
+ }
+ });
+
+ return isConfigured.get();
+ }
+
+ protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
+ boolean configuredMac = isMacAddressValid(uniTagInfo);
+ if (configuredMac) {
+ return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
+ } else if (uniTagInfo.getEnableMacLearning()) {
+ Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+ .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+ if (optHost.isPresent() && optHost.get().mac() != null) {
+ return optHost.get().mac();
+ }
+ }
+ return null;
+ }
+
+ private boolean isMacAddressValid(UniTagInformation tagInformation) {
+ return tagInformation.getConfiguredMacAddress() != null &&
+ !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+ !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+ }
+
+ protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
+ OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+ try {
+ cpStatusWriteLock.lock();
+ OltPortStatus status = cpStatus.get(key);
+
+ if (status == null) {
+ status = new OltPortStatus(
+ eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
+ subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
+ dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+ );
+ } else {
+ if (eapolStatus != null) {
+ status.defaultEapolStatus = eapolStatus;
+ }
+ if (subscriberFlowsStatus != null) {
+ status.subscriberFlowsStatus = subscriberFlowsStatus;
+ }
+ if (dhcpStatus != null) {
+ status.dhcpStatus = dhcpStatus;
+ }
+ }
+
+ cpStatus.put(key, status);
+ } finally {
+ cpStatusWriteLock.unlock();
+ }
+ }
+
+ protected class InternalFlowListener implements FlowRuleListener {
+ @Override
+ public void event(FlowRuleEvent event) {
+ if (appId.id() != (event.subject().appId())) {
+ return;
+ }
+
+ if (!oltDeviceService.isLocalLeader(event.subject().deviceId())) {
+ if (log.isTraceEnabled()) {
+ log.trace("ignoring flow event {} " +
+ "as not leader for {}", event, event.subject().deviceId());
+ }
+ return;
+ }
+
+ switch (event.type()) {
+ case RULE_ADDED:
+ case RULE_REMOVED:
+ Port port = getCpFromFlowRule(event.subject());
+ if (port == null) {
+ log.error("Can't find port in flow {}", event.subject());
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("flow event {} on cp {}: {}", event.type(),
+ portWithName(port), event.subject());
+ }
+ updateCpStatus(event.type(), port, event.subject());
+ return;
+ case RULE_ADD_REQUESTED:
+ case RULE_REMOVE_REQUESTED:
+ // NOTE that PENDING_ADD/REMOVE is set when we create the flowObjective
+ return;
+ default:
+ return;
+ }
+ }
+
+ protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
+ OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
+ if (isDefaultEapolFlow(flowRule)) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
+ defaultEapolUniTag);
+ if (log.isTraceEnabled()) {
+ log.trace("update defaultEapolStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, status, null, null);
+ } else if (isDhcpFlow(flowRule)) {
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ if (sk == null) {
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("update dhcpStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, null, null, status);
+ } else if (isDataFlow(flowRule)) {
+
+ if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
+ getCpFromFlowRule(flowRule).number())) {
+ // the NNI has data-plane for every subscriber, doesn't make sense to track them
+ return;
+ }
+
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ if (sk == null) {
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("update dataplaneStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, null, status, null);
+ }
+ }
+
+ private boolean isDefaultEapolFlow(FlowRule flowRule) {
+ EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+ if (c == null) {
+ return false;
+ }
+ if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+ AtomicBoolean isDefault = new AtomicBoolean(false);
+ flowRule.treatment().allInstructions().forEach(instruction -> {
+ if (instruction.type() == L2MODIFICATION) {
+ L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+ if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+ L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+ (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+ if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+ isDefault.set(true);
+ return;
+ }
+ }
+ }
+ });
+ return isDefault.get();
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the flow is a DHCP flow.
+ * Matches both upstream and downstream flows.
+ *
+ * @param flowRule The FlowRule to evaluate
+ * @return boolean
+ */
+ private boolean isDhcpFlow(FlowRule flowRule) {
+ IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
+ .getCriterion(Criterion.Type.IP_PROTO);
+ if (ipCriterion == null) {
+ return false;
+ }
+
+ UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
+
+ if (src == null) {
+ return false;
+ }
+ return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
+ (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
+ }
+
+ private boolean isDataFlow(FlowRule flowRule) {
+ // we consider subscriber flows the one that matches on VLAN_VID
+ // method is valid only because it's the last check after EAPOL and DHCP.
+ // this matches mcast flows as well, if we want to avoid that we can
+ // filter out the elements that have groups in the treatment or
+ // mcastIp in the selector
+ // IPV4_DST:224.0.0.22/32
+ // treatment=[immediate=[GROUP:0x1]]
+
+ return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
+ }
+
+ private Port getCpFromFlowRule(FlowRule flowRule) {
+ DeviceId deviceId = flowRule.deviceId();
+ PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+ if (inPort != null) {
+ PortNumber port = inPort.port();
+ return deviceService.getPort(deviceId, port);
+ }
+ return null;
+ }
+
+ private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
+ Port flowPort = getCpFromFlowRule(flowRule);
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
+
+ Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
+ if (si == null && !isNni) {
+ log.error("Subscriber information not found in sadis for port {}", portWithName(flowPort));
+ return null;
+ }
+
+ if (isNni) {
+ return new ServiceKey(new AccessDevicePort(flowPort), nniUniTag);
+ }
+
+ Optional<UniTagInformation> found = Optional.empty();
+ VlanId flowVlan = null;
+ if (isDhcpFlow(flowRule)) {
+ // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
+ L2ModificationInstruction.ModVlanIdInstruction instruction =
+ (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
+ flowVlan = instruction.vlanId();
+ } else {
+ // for now we assume that if it's not DHCP it's dataplane (or at least tagged)
+ VlanIdCriterion vlanIdCriterion =
+ (VlanIdCriterion) flowRule.selector().getCriterion(Criterion.Type.VLAN_VID);
+ if (vlanIdCriterion == null) {
+ log.warn("cannot match the flow to a subscriber service as it does not carry vlans");
+ return null;
+ }
+ flowVlan = vlanIdCriterion.vlanId();
+ }
+
+ VlanId finalFlowVlan = flowVlan;
+ found = si.uniTagList().stream().filter(uti ->
+ uti.getPonCTag().equals(finalFlowVlan) ||
+ uti.getPonSTag().equals(finalFlowVlan) ||
+ uti.getUniTagMatch().equals(finalFlowVlan)
+ ).findFirst();
+
+
+ if (found.isEmpty()) {
+ log.warn("Cannot map flow rule {} to Service in {}", flowRule, si);
+ }
+
+ return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
+
+ }
+
+ private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
+ switch (type) {
+ case RULE_ADD_REQUESTED:
+ return OltFlowsStatus.PENDING_ADD;
+ case RULE_ADDED:
+ return OltFlowsStatus.ADDED;
+ case RULE_REMOVE_REQUESTED:
+ return OltFlowsStatus.PENDING_REMOVE;
+ case RULE_REMOVED:
+ return OltFlowsStatus.REMOVED;
+ default:
+ return OltFlowsStatus.NONE;
+ }
+ }
+ }
+
+ protected void bindSadisService(SadisService service) {
+ this.subsService = service.getSubscriberInfoService();
+ this.bpService = service.getBandwidthProfileService();
+ log.info("Sadis service is loaded");
+ }
+
+ protected void unbindSadisService(SadisService service) {
+ this.subsService = null;
+ this.bpService = null;
+ log.info("Sadis service is unloaded");
+ }
}