[VOL-4246] Feature parity with the previous implementation
Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java b/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java
new file mode 100644
index 0000000..b6d2213
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+
+import java.util.Objects;
+
+/**
+ * OLT device port.
+ */
+public class AccessDevicePort {
+
+ private ConnectPoint cp;
+ private String name;
+
+ /**
+ * Creates an AccessDevicePort with given ONOS port.
+ *
+ * @param port ONOS port
+ */
+ public AccessDevicePort(Port port) {
+ this.cp = ConnectPoint.deviceConnectPoint(port.element().id() + "/" + port.number().toLong());
+ this.name = OltUtils.getPortName(port);
+ }
+
+ /**
+ * Creates an AccessDevicePort with given ONOS connectPoint and name.
+ *
+ * @param cp ONOS connect point
+ * @param name OLT port name
+ */
+ public AccessDevicePort(ConnectPoint cp, String name) {
+ this.cp = cp;
+ this.name = name;
+ }
+
+ /**
+ * Get ONOS ConnectPoint object.
+ *
+ * @return ONOS connect point
+ */
+ public ConnectPoint connectPoint() {
+ return this.cp;
+ }
+
+ /**
+ * Get OLT port name which is combination of serial number and uni index.
+ *
+ * @return OLT port name (ex: BBSM00010001-1)
+ */
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public String toString() {
+ return cp.toString() + '[' + name + ']';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AccessDevicePort that = (AccessDevicePort) o;
+ return Objects.equals(cp, that.cp) &&
+ Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cp, name);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
deleted file mode 100644
index 52e9b96..0000000
--- a/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.olt.impl;
-
-import com.google.common.hash.Hashing;
-import org.onosproject.cluster.NodeId;
-
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Returns a server hosting a given key based on consistent hashing.
- */
-public class ConsistentHasher {
-
- private static class Entry implements Comparable<Entry> {
- private final NodeId server;
- private final int hash;
-
- public Entry(NodeId server, int hash) {
- this.server = server;
- this.hash = hash;
- }
-
- public Entry(int hash) {
- server = null;
- this.hash = hash;
- }
-
- @Override
- public int compareTo(Entry o) {
- if (this.hash > o.hash) {
- return 1;
- } else if (this.hash < o.hash) {
- return -1;
- } // else
- return 0;
- }
- }
-
- private final int weight;
-
- private List<Entry> table;
-
- /**
- * Creates a new hasher with the given list of servers.
- *
- * @param servers list of servers
- * @param weight weight
- */
- public ConsistentHasher(List<NodeId> servers, int weight) {
- this.weight = weight;
-
- this.table = new ArrayList<>();
-
- servers.forEach(this::addServer);
- }
-
- /**
- * Adds a new server to the list of servers.
- *
- * @param server server ID
- */
- public synchronized void addServer(NodeId server) {
- // Add weight number of buckets for each server
- for (int i = 0; i < weight; i++) {
- String label = server.toString() + i;
- int hash = getHash(label);
- Entry e = new Entry(server, hash);
- table.add(e);
- }
-
- Collections.sort(table);
- }
-
- /**
- * Removes a server from the list of servers.
- *
- * @param server server ID
- */
- public synchronized void removeServer(NodeId server) {
- table.removeIf(e -> e.server.equals(server));
- }
-
- /**
- * Hashes a given input string and finds that server that should
- * handle the given ID.
- *
- * @param s input
- * @return server ID
- */
- public synchronized NodeId hash(String s) {
- Entry temp = new Entry(getHash(s));
-
- int pos = Collections.binarySearch(this.table, temp);
-
- // translate a negative not-found result into the closest following match
- if (pos < 0) {
- pos = Math.abs(pos + 1);
- }
-
- // wraparound if the hash was after the last entry in the table
- if (pos == this.table.size()) {
- pos = 0;
- }
-
- return table.get(pos).server;
- }
-
- private int getHash(String s) {
- // stable, uniformly-distributed hash
- return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
- }
-}
diff --git a/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java b/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java
new file mode 100644
index 0000000..4280eaf
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.Port;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import java.util.Objects;
+
+import static org.opencord.olt.impl.OltUtils.portWithName;
+
+/**
+ * Contains a subscriber's information and status for a specific device and port.
+ */
+public class DiscoveredSubscriber {
+
+ /**
+ * Describe whether the subscriber needs to be added or removed.
+ */
+ public enum Status {
+ ADDED,
+ REMOVED,
+ }
+
+ public Port port;
+ public Device device;
+ public Enum<Status> status;
+ public boolean hasSubscriber;
+ public SubscriberAndDeviceInformation subscriberAndDeviceInformation;
+
+ /**
+ * Creates the class with the proper information.
+ *
+ * @param device the device of the subscriber
+ * @param port the port
+ * @param status the status for this specific subscriber
+ * @param hasSubscriber is the subscriber present
+ * @param si the information about the tags/dhcp and other info.
+ */
+ public DiscoveredSubscriber(Device device, Port port, Status status, boolean hasSubscriber,
+ SubscriberAndDeviceInformation si) {
+ this.device = device;
+ this.port = port;
+ this.status = status;
+ this.hasSubscriber = hasSubscriber;
+ subscriberAndDeviceInformation = si;
+ }
+
+ /**
+ * Returns the port name for the subscriber.
+ *
+ * @return the port name.
+ */
+ public String portName() {
+ return OltUtils.getPortName(port);
+ }
+
+ @Override
+ public String toString() {
+
+ return String.format("%s (status: %s, provisionSubscriber: %s)",
+ portWithName(this.port),
+ this.status, this.hasSubscriber
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DiscoveredSubscriber that = (DiscoveredSubscriber) o;
+ return hasSubscriber == that.hasSubscriber &&
+ port.equals(that.port) &&
+ device.equals(that.device) &&
+ status.equals(that.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(port, device, status, hasSubscriber, subscriberAndDeviceInformation);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/MeterData.java b/impl/src/main/java/org/opencord/olt/impl/MeterData.java
new file mode 100644
index 0000000..208383d
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/MeterData.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.meter.MeterCellId;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterState;
+
+import java.util.Objects;
+
+/**
+ * Class containing Meter Data.
+ */
+public class MeterData {
+ private MeterCellId meterCellId;
+ private MeterState meterStatus;
+ private String bandwidthProfileName;
+
+ public MeterData(MeterCellId meterCellId, MeterState meterStatus, String bandwidthProfile) {
+ this.meterCellId = meterCellId;
+ this.meterStatus = meterStatus;
+ this.bandwidthProfileName = bandwidthProfile;
+ }
+
+ public void setMeterCellId(MeterCellId meterCellId) {
+ this.meterCellId = meterCellId;
+ }
+
+ public void setMeterStatus(MeterState meterStatus) {
+ this.meterStatus = meterStatus;
+ }
+
+ public MeterId getMeterId() {
+ return (MeterId) meterCellId;
+ }
+
+ public MeterCellId getMeterCellId() {
+ return meterCellId;
+ }
+
+ public MeterState getMeterStatus() {
+ return meterStatus;
+ }
+
+ public String getBandwidthProfileName() {
+ return bandwidthProfileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MeterData meterData = (MeterData) o;
+ return Objects.equals(meterCellId, meterData.meterCellId) &&
+ meterStatus == meterData.meterStatus &&
+ Objects.equals(bandwidthProfileName, meterData.bandwidthProfileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(meterCellId, meterStatus, bandwidthProfileName);
+ }
+
+ @Override
+ public String toString() {
+ return "MeterData{" +
+ "meterCellId=" + meterCellId +
+ ", meterStatus=" + meterStatus +
+ ", bandwidthProfile='" + bandwidthProfileName + '\'' +
+ '}';
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 74c5811..747a0b4 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,18 +15,11 @@
*/
package org.opencord.olt.impl;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
@@ -35,36 +28,17 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.Host;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.flowobjective.ForwardingObjective;
-import org.onosproject.net.flowobjective.Objective;
-import org.onosproject.net.flowobjective.ObjectiveContext;
-import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
-import org.onosproject.net.host.HostService;
-import org.onosproject.net.meter.MeterId;
-import org.onosproject.store.primitives.DefaultConsistentMap;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
-import org.opencord.olt.AccessDevicePort;
import org.opencord.olt.AccessDeviceService;
-import org.opencord.olt.AccessSubscriberId;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
-import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -78,73 +52,60 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static java.util.stream.Collectors.*;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
/**
- * Provisions rules on access devices.
+ * OLT Application.
*/
@Component(immediate = true,
property = {
DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
- EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
- EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
- PROVISION_DELAY + ":Integer=" + PROVISION_DELAY_DEFAULT,
+ FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
+ SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
+ REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
})
public class Olt
extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
implements AccessDeviceService {
- private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
- private static final String APP_NAME = "org.opencord.olt";
-
- private static final short EAPOL_DEFAULT_VLAN = 4091;
- private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
-
- public static final int HASH_WEIGHT = 10;
- public static final long PENDING_SUBS_MAP_TIMEOUT_MILLIS = 30000L;
-
- private final Logger log = getLogger(getClass());
-
- private static final String NNI = "nni-";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected CoreService coreService;
+ protected MastershipService mastershipService;
- //Dependency on driver service is to ensure correct startup order
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected DriverService driverService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL,
bind = "bindSadisService",
@@ -153,31 +114,23 @@
protected volatile SadisService sadisService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected AccessDeviceFlowService oltFlowService;
+ protected OltDeviceServiceInterface oltDeviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected AccessDeviceMeterService oltMeterService;
+ protected OltFlowServiceInterface oltFlowService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OltMeterServiceInterface oltMeterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
+ protected CoreService coreService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
+ protected ApplicationId appId;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected FlowRuleService flowRuleService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected HostService hostService;
+ private static final String ONOS_OLT_SERVICE = "onos/olt-service";
/**
* Default bandwidth profile id that is used for authentication trap flows.
@@ -190,456 +143,322 @@
protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
/**
- * Default amounts of eapol retry.
+ * Number of threads used to process flows.
**/
- protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+ protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
/**
- * Delay between EAPOL removal and data plane flows provisioning.
+ * Number of threads used to process flows.
+ **/
+ protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
+
+ /**
+ * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
+ **/
+ protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ /**
+ * A queue to asynchronously process events.
*/
- protected int provisionDelay = PROVISION_DELAY_DEFAULT;
-
- private final DeviceListener deviceListener = new InternalDeviceListener();
- private final ClusterEventListener clusterListener = new InternalClusterListener();
- private final HostListener hostListener = new InternalHostListener();
-
- private ConsistentHasher hasher;
+ protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private BaseInformationService<BandwidthProfileInformation> bpService;
- private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
- groupedThreads("onos/olt-service",
- "olt-installer-%d"));
+ /**
+ * Listener for OLT devices events.
+ */
+ protected OltDeviceListener deviceListener = new OltDeviceListener();
+ protected ScheduledExecutorService discoveredSubscriberExecutor =
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+ "discovered-cp-%d", log));
- protected ExecutorService eventExecutor;
- protected ExecutorService hostEventExecutor;
- protected ExecutorService retryExecutor;
- protected ScheduledExecutorService provisionExecutor;
+ protected ScheduledExecutorService queueExecutor =
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+ "discovered-cp-restore-%d", log));
- private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
- private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
+ /**
+ * Executor used to defer flow provisioning to a different thread pool.
+ */
+ protected ExecutorService flowsExecutor;
- protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
- private ConsistentMultimap<ConnectPoint, SubscriberFlowInfo> waitingMacSubscribers;
+ /**
+ * Executor used to defer subscriber handling from API call to a different thread pool.
+ */
+ protected ExecutorService subscriberExecutor;
+
+ private static final String APP_NAME = "org.opencord.olt";
+
+ private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
+ private final Lock queueWriteLock = queueLock.writeLock();
+ private final Lock queueReadLock = queueLock.readLock();
@Activate
- public void activate(ComponentContext context) {
- eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
- "events-%d", log));
- hostEventExecutor = Executors.newFixedThreadPool(8, groupedThreads("onos/olt", "mac-events-%d", log));
- retryExecutor = Executors.newCachedThreadPool();
- provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
- "provision-%d", log));
+ protected void activate(ComponentContext context) {
+ cfgService.registerProperties(getClass());
modified(context);
- ApplicationId appId = coreService.registerApplication(APP_NAME);
- componentConfigService.registerProperties(getClass());
+ appId = coreService.registerApplication(APP_NAME);
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(ConnectPoint.class)
+ .register(DiscoveredSubscriber.class)
+ .register(DiscoveredSubscriber.Status.class)
+ .register(SubscriberAndDeviceInformation.class)
.register(UniTagInformation.class)
- .register(SubscriberFlowInfo.class)
- .register(AccessDevicePort.class)
- .register(AccessDevicePort.Type.class)
.register(LinkedBlockingQueue.class)
.build();
- programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
- .withName("volt-programmed-subs")
+ eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
+ .withName("volt-subscriber-queues")
+ .withApplicationId(appId)
.withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build();
-
- failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
- .withName("volt-failed-subs")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build();
-
- KryoNamespace macSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(SubscriberFlowInfo.class)
- .register(AccessDevicePort.class)
- .register(AccessDevicePort.Type.class)
- .register(UniTagInformation.class)
- .build();
-
- waitingMacSubscribers = storageService.<ConnectPoint, SubscriberFlowInfo>consistentMultimapBuilder()
- .withName("volt-waiting-mac-subs")
- .withSerializer(Serializer.using(macSerializer))
- .withApplicationId(appId)
- .build();
- //TODO possibly use consistent multimap with compute on key and element to avoid
- // lock on all objects of the map, while instead obtaining and releasing the lock
- // on a per subscriber basis.
- pendingSubscribersForDevice = new DefaultConsistentMap<>(
- storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
- .withName("volt-pending-subs")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .buildAsyncMap(), PENDING_SUBS_MAP_TIMEOUT_MILLIS).asJavaMap();
- eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
-
- if (sadisService != null) {
- subsService = sadisService.getSubscriberInfoService();
- bpService = sadisService.getBandwidthProfileService();
- } else {
- log.warn(SADIS_NOT_RUNNING);
- }
-
- List<NodeId> readyNodes = clusterService.getNodes().stream()
- .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
- .map(ControllerNode::id)
- .collect(toList());
- hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
- clusterService.addListener(clusterListener);
-
- // look for all provisioned devices in Sadis and create EAPOL flows for the
- // UNI ports
- Iterable<Device> devices = deviceService.getDevices();
- for (Device d : devices) {
- if (isLocalLeader(d.id())) {
- checkAndCreateDeviceFlows(d);
- }
- }
+ .build().asJavaMap();
deviceService.addListener(deviceListener);
- hostService.addListener(hostListener);
- log.info("Started with Application ID {}", appId.id());
+
+ discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
+ eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
+ log.info("Started");
+
+ deviceListener.handleExistingPorts();
}
@Deactivate
- public void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
- clusterService.removeListener(clusterListener);
- deviceService.removeListener(deviceListener);
- hostService.removeListener(hostListener);
- eventDispatcher.removeSink(AccessDeviceEvent.class);
- eventExecutor.shutdown();
- hostEventExecutor.shutdown();
- retryExecutor.shutdown();
- provisionExecutor.shutdown();
+ protected void deactivate(ComponentContext context) {
+ cfgService.unregisterProperties(getClass(), false);
+ discoveredSubscriberExecutor.shutdown();
+ flowsExecutor.shutdown();
+ subscriberExecutor.shutdown();
+ deviceListener.deactivate();
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
-
- try {
+ if (context != null) {
String bpId = get(properties, DEFAULT_BP_ID);
defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
- String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
- eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
- Integer.parseInt(eapolDeleteRetryNew.trim());
+ String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
+ int oldFlowProcessingThreads = flowProcessingThreads;
+ flowProcessingThreads = isNullOrEmpty(flowThreads) ?
+ oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
- log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
- defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
+ if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
+ if (flowsExecutor != null) {
+ flowsExecutor.shutdown();
+ }
+ flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
+ groupedThreads(ONOS_OLT_SERVICE,
+ "flows-installer-%d"));
+ }
- } catch (Exception e) {
- log.error("Error while modifying the properties", e);
- defaultBpId = DEFAULT_BP_ID_DEFAULT;
- multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+ String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
+ int oldSubscriberProcessingThreads = subscriberProcessingThreads;
+ subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
+ oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
+
+ if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
+ if (subscriberExecutor != null) {
+ subscriberExecutor.shutdown();
+ }
+ subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
+ groupedThreads(ONOS_OLT_SERVICE,
+ "subscriber-installer-%d"));
+ }
+
+ String queueDelay = get(properties, REQUEUE_DELAY);
+ requeueDelay = isNullOrEmpty(queueDelay) ?
+ REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
}
+ log.info("Modified. Values = {}: {}, {}: {}, " +
+ "{}:{}, {}:{}, {}:{}",
+ DEFAULT_BP_ID, defaultBpId,
+ DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
+ FLOW_PROCESSING_THREADS, flowProcessingThreads,
+ SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
+ REQUEUE_DELAY, requeueDelay);
}
- protected void bindSadisService(SadisService service) {
- sadisService = service;
- bpService = sadisService.getBandwidthProfileService();
- subsService = sadisService.getSubscriberInfoService();
- log.info("Sadis-service binds to onos.");
- }
-
- protected void unbindSadisService(SadisService service) {
- sadisService = null;
- bpService = null;
- subsService = null;
- log.info("Sadis-service unbinds from onos.");
- }
@Override
- public boolean provisionSubscriber(ConnectPoint connectPoint) {
- log.info("Call to provision subscriber at {}", connectPoint);
- DeviceId deviceId = connectPoint.deviceId();
- Port subscriberPortOnos = deviceService.getPort(deviceId, connectPoint.port());
- checkNotNull(subscriberPortOnos, "Invalid connect point:" + connectPoint);
- AccessDevicePort subscriberPort = new AccessDevicePort(subscriberPortOnos, AccessDevicePort.Type.UNI);
+ public boolean provisionSubscriber(ConnectPoint cp) {
+ subscriberExecutor.submit(() -> {
+ Device device = deviceService.getDevice(cp.deviceId());
+ Port port = deviceService.getPort(device.id(), cp.port());
+ AccessDevicePort accessDevicePort = new AccessDevicePort(port);
- if (isSubscriberInstalled(connectPoint)) {
- log.warn("Subscriber at {} already provisioned or in the process .."
- + " not taking any more action", connectPoint);
- return true;
- }
-
- // Find the subscriber config at this connect point
- SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
- if (sub == null) {
- log.warn("No subscriber found for {}", connectPoint);
- return false;
- }
-
- // Get the uplink port
- AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
- if (uplinkPort == null) {
- log.warn(NO_UPLINK_PORT, deviceId);
- return false;
- }
-
- // delete Eapol authentication flow with default bandwidth
- // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
- // retry deletion if it fails/times-out
- retryExecutor.execute(new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub, 1));
- return true;
- }
-
- // returns true if subscriber is programmed or in the process of being programmed
- private boolean isSubscriberInstalled(ConnectPoint connectPoint) {
- Collection<? extends UniTagInformation> uniTagInformationSet =
- programmedSubs.get(connectPoint).value();
- if (!uniTagInformationSet.isEmpty()) {
- return true;
- }
- //Check if the subscriber is already getting provisioned
- // so we do not provision twice
- AtomicBoolean isPending = new AtomicBoolean(false);
- pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
- for (SubscriberFlowInfo fi : infos) {
- if (fi.getUniPort().equals(connectPoint.port())) {
- log.debug("Subscriber is already pending, {}", fi);
- isPending.set(true);
- break;
- }
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
+ return false;
}
- return infos;
+
+ log.info("Provisioning subscriber on {}", accessDevicePort);
+
+ if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
+ log.error("Subscriber on {} is already provisioned", accessDevicePort);
+ return false;
+ }
+
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+ if (si == null) {
+ log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
+ return false;
+ }
+ DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.ADDED, true, si);
+
+ // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+ // regardless of the flow status
+ si.uniTagList().forEach(uti -> {
+ ServiceKey sk = new ServiceKey(accessDevicePort, uti);
+ oltFlowService.updateProvisionedSubscriberStatus(sk, true);
+ });
+
+ addSubscriberToQueue(sub);
+ return true;
});
-
- return isPending.get();
- }
-
- private class DeleteEapolInstallSub implements Runnable {
- AccessDevicePort subscriberPort;
- AccessDevicePort uplinkPort;
- SubscriberAndDeviceInformation sub;
- private int attemptNumber;
-
- DeleteEapolInstallSub(AccessDevicePort subscriberPort, AccessDevicePort uplinkPort,
- SubscriberAndDeviceInformation sub,
- int attemptNumber) {
- this.subscriberPort = subscriberPort;
- this.uplinkPort = uplinkPort;
- this.sub = sub;
- this.attemptNumber = attemptNumber;
- }
-
- @Override
- public void run() {
- CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
- oltFlowService.processEapolFilteringObjectives(subscriberPort,
- defaultBpId, Optional.empty(), filterFuture,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN),
- false);
- filterFuture.thenAcceptAsync(filterStatus -> {
- if (filterStatus == null) {
- log.info("Default eapol flow deleted in attempt {} of {}"
- + "... provisioning subscriber flows on {}",
- attemptNumber, eapolDeleteRetryMaxAttempts, subscriberPort);
-
- // FIXME this is needed to prevent that default EAPOL flow removal and
- // data plane flows install are received by the device at the same time
- provisionExecutor.schedule(
- () -> provisionUniTagList(subscriberPort, uplinkPort, sub),
- provisionDelay, TimeUnit.MILLISECONDS);
- } else {
- if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
- log.warn("The filtering future failed {} for subscriber on {}"
- + "... retrying {} of {} attempts",
- filterStatus, subscriberPort, attemptNumber, eapolDeleteRetryMaxAttempts);
- retryExecutor.execute(
- new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub,
- attemptNumber + 1));
- } else {
- log.error("The filtering future failed {} for subscriber on {}"
- + "after {} attempts. Subscriber provisioning failed",
- filterStatus, subscriberPort, eapolDeleteRetryMaxAttempts);
- sub.uniTagList().forEach(ut ->
- failedSubs.put(
- new ConnectPoint(subscriberPort.deviceId(), subscriberPort.number()), ut));
- }
- }
- });
- }
-
- }
-
- @Override
- public boolean removeSubscriber(ConnectPoint connectPoint) {
- Port subscriberPort = deviceService.getPort(connectPoint);
- if (subscriberPort == null) {
- log.error("Subscriber port not found at: {}", connectPoint);
- return false;
- }
- return removeSubscriber(new AccessDevicePort(subscriberPort, AccessDevicePort.Type.UNI));
- }
-
- private boolean removeSubscriber(AccessDevicePort subscriberPort) {
- log.info("Call to un-provision subscriber at {}", subscriberPort);
- //TODO we need to check if the subscriber is pending
- // Get the subscriber connected to this port from the local cache
- // If we don't know about the subscriber there's no need to remove it
- DeviceId deviceId = subscriberPort.deviceId();
-
- ConnectPoint connectPoint = new ConnectPoint(deviceId, subscriberPort.number());
- Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
- if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
- log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
- "no need to remove it", connectPoint);
- return true;
- }
-
- // Get the uplink port
- AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
- if (uplinkPort == null) {
- log.warn(NO_UPLINK_PORT, deviceId);
- return false;
- }
-
- for (UniTagInformation uniTag : uniTagInformationSet) {
-
- if (multicastServiceName.equals(uniTag.getServiceName())) {
- continue;
- }
-
- unprovisionVlans(uplinkPort, subscriberPort, uniTag);
-
- // remove eapol with subscriber bandwidth profile
- Optional<String> upstreamOltBw = uniTag.getUpstreamOltBandwidthProfile() == null ?
- Optional.empty() : Optional.of(uniTag.getUpstreamOltBandwidthProfile());
- oltFlowService.processEapolFilteringObjectives(subscriberPort,
- uniTag.getUpstreamBandwidthProfile(),
- upstreamOltBw,
- null, uniTag.getPonCTag(), false);
-
- if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
- // reinstall eapol with default bandwidth profile
- oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
- null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
- } else {
- log.debug("Port {} is no longer enabled or it's unavailable. Not "
- + "reprogramming default eapol flow", connectPoint);
- }
- }
+ //NOTE this only means we have taken the request in, nothing more.
return true;
}
-
@Override
- public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
- Optional<VlanId> cTag, Optional<Integer> tpId) {
+ public boolean removeSubscriber(ConnectPoint cp) {
+ subscriberExecutor.submit(() -> {
+ Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
+ Port port = deviceService.getPort(device.id(), cp.port());
+ AccessDevicePort accessDevicePort = new AccessDevicePort(port);
- log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}",
- subscriberId, sTag, cTag, tpId);
-
- // Check if we can find the connect point to which this subscriber is connected
- ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
- if (cp == null) {
- log.warn("ConnectPoint for {} not found", subscriberId);
- return false;
- }
- AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
-
- if (!sTag.isPresent() && !cTag.isPresent()) {
- return provisionSubscriber(cp);
- } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
- AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
- if (uplinkPort == null) {
- log.warn(NO_UPLINK_PORT, cp.deviceId());
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ log.warn("will not un-provision a subscriber on the NNI {}",
+ accessDevicePort);
return false;
}
- //delete Eapol authentication flow with default bandwidth
- //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
- //install subscriber flows
- CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
- oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
- filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
- filterFuture.thenAcceptAsync(filterStatus -> {
- if (filterStatus == null) {
- provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
- }
+ log.info("Un-provisioning subscriber on {}", accessDevicePort);
+
+ if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
+ log.error("Subscriber on {} is not provisioned", accessDevicePort);
+ return false;
+ }
+
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+ if (si == null) {
+ log.error("Subscriber information not found in sadis for port {}",
+ accessDevicePort);
+ // NOTE that we are returning true so that the subscriber is removed from the queue
+ // and we can move on provisioning others
+ return false;
+ }
+ DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.REMOVED, true, si);
+
+ // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+ // regardless of the flow status
+ si.uniTagList().forEach(uti -> {
+ ServiceKey sk = new ServiceKey(accessDevicePort, uti);
+ oltFlowService.updateProvisionedSubscriberStatus(sk, false);
});
+
+ addSubscriberToQueue(sub);
return true;
- } else {
- log.warn("Provisioning failed for subscriber: {}", subscriberId);
+ });
+ //NOTE this only means we have taken the request in, nothing more.
+ return true;
+ }
+
+ @Override
+ public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
+ log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
+ cp, cTag, sTag, tpId);
+ Device device = deviceService.getDevice(cp.deviceId());
+ Port port = deviceService.getPort(device.id(), cp.port());
+ AccessDevicePort accessDevicePort = new AccessDevicePort(port);
+
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
return false;
}
- }
- @Override
- public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
- Optional<VlanId> cTag, Optional<Integer> tpId) {
- // Check if we can find the connect point to which this subscriber is connected
- ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
- if (cp == null) {
- log.warn("ConnectPoint for {} not found", subscriberId);
+ SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+ UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+ if (specificService == null) {
+ log.error("Can't find Information for subscriber on {}, with cTag {}, " +
+ "stag {}, tpId {}", cp, cTag, sTag, tpId);
return false;
}
- AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
+ List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+ uniTagInformationList.add(specificService);
+ si.setUniTagList(uniTagInformationList);
+ DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.ADDED, true, si);
- if (!sTag.isPresent() && !cTag.isPresent()) {
- return removeSubscriber(cp);
- } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
- // Get the uplink port
- AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
- if (uplinkPort == null) {
- log.warn(NO_UPLINK_PORT, cp.deviceId());
- return false;
- }
-
- Optional<UniTagInformation> tagInfo = getUniTagInformation(subscriberPort, cTag.get(),
- sTag.get(), tpId.get());
- if (!tagInfo.isPresent()) {
- log.warn("UniTagInformation does not exist for {}, cTag {}, sTag {}, tpId {}",
- subscriberPort, cTag, sTag, tpId);
- return false;
- }
-
- unprovisionVlans(uplinkPort, subscriberPort, tagInfo.get());
- return true;
- } else {
- log.warn("Removing subscriber is not possible - please check the provided information" +
- "for the subscriber: {}", subscriberId);
+ // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+ // regardless of the flow status
+ ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
+ if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
+ log.error("Subscriber on {} is already provisioned", sk);
return false;
}
+ oltFlowService.updateProvisionedSubscriberStatus(sk, true);
+
+ addSubscriberToQueue(sub);
+ return true;
}
@Override
- public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
- return programmedSubs.stream()
- .collect(collectingAndThen(
- groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
- ImmutableMap::copyOf));
+ public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
+ log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
+ cp, cTag, sTag, tpId);
+ Device device = deviceService.getDevice(cp.deviceId());
+ Port port = deviceService.getPort(device.id(), cp.port());
+ AccessDevicePort accessDevicePort = new AccessDevicePort(port);
+
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ log.warn("will not un-provision a subscriber on the NNI {}",
+ accessDevicePort);
+ return false;
+ }
+
+ SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+ UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+ if (specificService == null) {
+ log.error("Can't find Information for subscriber on {}, with cTag {}, " +
+ "stag {}, tpId {}", cp, cTag, sTag, tpId);
+ return false;
+ }
+ List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+ uniTagInformationList.add(specificService);
+ si.setUniTagList(uniTagInformationList);
+ DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.ADDED, true, si);
+
+ // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+ // regardless of the flow status
+ ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
+ if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
+ log.error("Subscriber on {} is not provisioned", sk);
+ return false;
+ }
+ oltFlowService.updateProvisionedSubscriberStatus(sk, false);
+
+ addSubscriberToQueue(sub);
+ return true;
}
@Override
- public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
- return failedSubs.stream()
- .collect(collectingAndThen(
- groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
- ImmutableMap::copyOf));
- }
-
- @Override
- public List<DeviceId> fetchOlts() {
- // look through all the devices and find the ones that are OLTs as per Sadis
+ public List<DeviceId> getConnectedOlts() {
List<DeviceId> olts = new ArrayList<>();
Iterable<Device> devices = deviceService.getDevices();
for (Device d : devices) {
- if (getOltInfo(d) != null) {
+ if (oltDeviceService.isOlt(d)) {
// So this is indeed an OLT device
olts.add(d.id());
}
@@ -648,19 +467,20 @@
}
/**
- * Finds the connect point to which a subscriber is connected.
+ * Finds the connect-point to which a subscriber is connected.
*
* @param id The id of the subscriber, this is the same ID as in Sadis
* @return Subscribers ConnectPoint if found else null
*/
- private ConnectPoint findSubscriberConnectPoint(String id) {
+ @Override
+ public ConnectPoint findSubscriberConnectPoint(String id) {
Iterable<Device> devices = deviceService.getDevices();
for (Device d : devices) {
for (Port p : deviceService.getPorts(d.id())) {
log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
- log.debug("Found on device {} port {}", d.id(), p.number());
+ log.debug("Found on {}", portWithName(p));
return new ConnectPoint(d.id(), p.number());
}
}
@@ -668,584 +488,96 @@
return null;
}
- /**
- * Gets the context of the bandwidth profile information for the given parameter.
- *
- * @param bandwidthProfile the bandwidth profile id
- * @return the context of the bandwidth profile information
- */
- private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
- if (bpService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return null;
- }
- if (bandwidthProfile == null) {
- return null;
- }
- return bpService.get(bandwidthProfile);
- }
+ protected void processDiscoveredSubscribers() {
+ log.info("Started processDiscoveredSubscribers loop");
+ while (true) {
+ Set<ConnectPoint> discoveredCps;
+ try {
+ queueReadLock.lock();
+ discoveredCps = eventsQueues.keySet();
+ } catch (Exception e) {
+ log.error("Cannot read keys from queue map", e);
+ return;
+ } finally {
+ queueReadLock.unlock();
+ }
- /**
- * Removes subscriber vlan flows.
- *
- * @param uplink uplink port of the OLT
- * @param subscriberPort uni port
- * @param uniTag uni tag information
- */
- private void unprovisionVlans(AccessDevicePort uplink, AccessDevicePort subscriberPort, UniTagInformation uniTag) {
- log.info("Unprovisioning vlans for {} at {}", uniTag, subscriberPort);
- DeviceId deviceId = subscriberPort.deviceId();
+ discoveredCps.forEach(cp -> {
+ LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
- CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
- CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
-
- VlanId deviceVlan = uniTag.getPonSTag();
- VlanId subscriberVlan = uniTag.getPonCTag();
-
- MeterId upstreamMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
- MeterId downstreamMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
- MeterId upstreamOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamOltBandwidthProfile());
- MeterId downstreamOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamOltBandwidthProfile());
-
- Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
- getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
- subscriberVlan);
- if (waitingMacSubFlowInfo.isPresent()) {
- // only dhcp objectives applied previously, so only dhcp uninstallation objective will be processed
- log.debug("Waiting MAC service removed and dhcp uninstallation objective will be processed. " +
- "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
- CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
- oltFlowService.processDhcpFilteringObjectives(subscriberPort,
- upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.of(dhcpFuture));
- dhcpFuture.thenAcceptAsync(dhcpStatus -> {
- AccessDeviceEvent.Type type;
- if (dhcpStatus == null) {
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
- log.debug("Dhcp uninstallation objective was processed successfully for cTag {}, sTag {}, " +
- "tpId {} and Device/Port:{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
- uniTag.getTechnologyProfileId(), subscriberPort);
- updateProgrammedSubscriber(subscriberPort, uniTag, false);
- } else {
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
- log.error("Dhcp uninstallation objective was failed for cTag {}, sTag {}, " +
- "tpId {} and Device/Port:{} :{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
- uniTag.getTechnologyProfileId(), subscriberPort, dhcpStatus);
+ try {
+ queueReadLock.lock();
+ eventsQueue = eventsQueues.get(cp);
+ } catch (Exception e) {
+ log.error("Cannot get key from queue map", e);
+ return;
+ } finally {
+ queueReadLock.unlock();
}
- post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(),
- deviceVlan, subscriberVlan, uniTag.getTechnologyProfileId()));
- });
- return;
- } else {
- log.debug("There is no waiting MAC service for {} and subscriberVlan: {}", subscriberPort, subscriberVlan);
- }
- ForwardingObjective.Builder upFwd =
- oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag);
-
- Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
- ForwardingObjective.Builder downFwd =
- oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, downstreamOltMeterId,
- uniTag, macAddress);
-
- oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
- false, true);
- oltFlowService.processDhcpFilteringObjectives(subscriberPort,
- upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.empty());
- oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
- false, true);
-
- flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- upFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- upFuture.complete(error);
- }
- }));
-
- flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- downFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- downFuture.complete(error);
- }
- }));
-
- upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
- AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
- if (upStatus == null && downStatus == null) {
- log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {} on {}",
- uniTag.getPonCTag(), uniTag.getPonSTag(), uniTag.getTechnologyProfileId(), subscriberPort);
- updateProgrammedSubscriber(subscriberPort, uniTag, false);
- } else if (downStatus != null) {
- log.error("Subscriber with vlan {} on {} failed downstream uninstallation: {}",
- subscriberVlan, subscriberPort, downStatus);
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
- } else if (upStatus != null) {
- log.error("Subscriber with vlan {} on {} failed upstream uninstallation: {}",
- subscriberVlan, subscriberPort, upStatus);
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
- }
- post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(), deviceVlan, subscriberVlan,
- uniTag.getTechnologyProfileId()));
- }, oltInstallers);
- }
-
- private Optional<SubscriberFlowInfo> getAndRemoveWaitingMacSubFlowInfoForCTag(ConnectPoint cp, VlanId cTag) {
- SubscriberFlowInfo returnSubFlowInfo = null;
- Collection<? extends SubscriberFlowInfo> subFlowInfoSet = waitingMacSubscribers.get(cp).value();
- for (SubscriberFlowInfo subFlowInfo : subFlowInfoSet) {
- if (subFlowInfo.getTagInfo().getPonCTag().equals(cTag)) {
- returnSubFlowInfo = subFlowInfo;
- break;
- }
- }
- if (returnSubFlowInfo != null) {
- waitingMacSubscribers.remove(cp, returnSubFlowInfo);
- return Optional.of(returnSubFlowInfo);
- }
- return Optional.empty();
- }
-
- /**
- * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
- *
- * @param subPort the connection point of the subscriber
- * @param uplinkPort uplink port of the OLT (the nni port)
- * @param sub subscriber information that includes s, c tags, tech profile and bandwidth profile references
- */
- private void provisionUniTagList(AccessDevicePort subPort, AccessDevicePort uplinkPort,
- SubscriberAndDeviceInformation sub) {
-
- log.debug("Provisioning vlans for subscriber on {}", subPort);
- if (log.isTraceEnabled()) {
- log.trace("Subscriber informations {}", sub);
- }
-
- if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
- log.warn("Unitaglist doesn't exist for the subscriber {} on {}", sub.id(), subPort);
- return;
- }
-
- for (UniTagInformation uniTag : sub.uniTagList()) {
- handleSubscriberFlows(uplinkPort, subPort, uniTag);
- }
- }
-
- /**
- * Finds the uni tag information and provisions the found information.
- * If the uni tag information is not found, returns
- *
- * @param uplinkPort the nni port
- * @param subscriberPort the uni port
- * @param innerVlan the pon c tag
- * @param outerVlan the pon s tag
- * @param tpId the technology profile id
- */
- private void provisionUniTagInformation(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- VlanId innerVlan,
- VlanId outerVlan,
- Integer tpId) {
-
- Optional<UniTagInformation> gotTagInformation = getUniTagInformation(subscriberPort, innerVlan,
- outerVlan, tpId);
- if (!gotTagInformation.isPresent()) {
- return;
- }
- UniTagInformation tagInformation = gotTagInformation.get();
- handleSubscriberFlows(uplinkPort, subscriberPort, tagInformation);
- }
-
- private void updateProgrammedSubscriber(AccessDevicePort port, UniTagInformation tagInformation, boolean add) {
- if (add) {
- programmedSubs.put(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
- } else {
- programmedSubs.remove(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
- }
- }
-
- /**
- * Installs a uni tag information flow.
- *
- * @param uplinkPort the nni port
- * @param subscriberPort the uni port
- * @param tagInfo the uni tag information
- */
- private void handleSubscriberFlows(AccessDevicePort uplinkPort, AccessDevicePort subscriberPort,
- UniTagInformation tagInfo) {
- log.debug("Provisioning vlan-based flows for the uniTagInformation {} on {}", tagInfo, subscriberPort);
- DeviceId deviceId = subscriberPort.deviceId();
-
- if (multicastServiceName.equals(tagInfo.getServiceName())) {
- // IGMP flows are taken care of along with VOD service
- // Please note that for each service, Subscriber Registered event will be sent
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED, deviceId,
- subscriberPort.port(), tagInfo.getPonSTag(), tagInfo.getPonCTag(),
- tagInfo.getTechnologyProfileId()));
- return;
- }
-
- BandwidthProfileInformation upstreamBpInfo =
- getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
- BandwidthProfileInformation downstreamBpInfo =
- getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
- BandwidthProfileInformation upstreamOltBpInfo =
- getBandwidthProfileInformation(tagInfo.getUpstreamOltBandwidthProfile());
- BandwidthProfileInformation downstreamOltBpInfo =
- getBandwidthProfileInformation(tagInfo.getDownstreamOltBandwidthProfile());
- if (upstreamBpInfo == null) {
- log.warn("No meter installed since no Upstream BW Profile definition found for "
- + "ctag {} stag {} tpId {} on {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(), tagInfo.getTechnologyProfileId(), subscriberPort);
- return;
- }
- if (downstreamBpInfo == null) {
- log.warn("No meter installed since no Downstream BW Profile definition found for "
- + "ctag {} stag {} tpId {} on {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(),
- tagInfo.getTechnologyProfileId(), subscriberPort);
- return;
- }
- if ((upstreamOltBpInfo != null && downstreamOltBpInfo == null) ||
- (upstreamOltBpInfo == null && downstreamOltBpInfo != null)) {
- log.warn("No meter installed since only one olt BW Profile definition found for "
- + "ctag {} stag {} tpId {} and Device/port: {}:{}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(),
- tagInfo.getTechnologyProfileId(), deviceId,
- subscriberPort);
- return;
- }
-
- MeterId upOltMeterId = null;
- MeterId downOltMeterId = null;
-
- // check for meterIds for the upstream and downstream bandwidth profiles
- MeterId upMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
- MeterId downMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
-
- if (upstreamOltBpInfo != null) {
- // Multi UNI service
- upOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, upstreamOltBpInfo.id());
- downOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, downstreamOltBpInfo.id());
- } else {
- // NOT Multi UNI service
- log.debug("OLT bandwidth profiles fields are set to ONU bandwidth profiles");
- upstreamOltBpInfo = upstreamBpInfo;
- downstreamOltBpInfo = downstreamBpInfo;
- upOltMeterId = upMeterId;
- downOltMeterId = downMeterId;
- }
- SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
- tagInfo, downMeterId, upMeterId, downOltMeterId, upOltMeterId,
- downstreamBpInfo.id(), upstreamBpInfo.id(),
- downstreamOltBpInfo.id(), upstreamOltBpInfo.id());
-
- if (upMeterId != null && downMeterId != null && upOltMeterId != null && downOltMeterId != null) {
- log.debug("Meters are existing for upstream {} and downstream {} on {}",
- upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
- handleSubFlowsWithMeters(fi);
- } else {
- log.debug("Adding {} on {} to pending subs", fi, subscriberPort);
- // one or both meters are not ready. It's possible they are in the process of being
- // created for other subscribers that share the same bandwidth profile.
- pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
- if (queue == null) {
- queue = new LinkedBlockingQueue<>();
- }
- queue.add(fi);
- log.info("Added {} to pending subscribers on {}", fi, subscriberPort);
- return queue;
- });
-
- List<BandwidthProfileInformation> bws = new ArrayList<>();
- // queue up the meters to be created
- if (upMeterId == null) {
- log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
- bws.add(upstreamBpInfo);
- }
- if (downMeterId == null) {
- log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
- bws.add(downstreamBpInfo);
- }
- if (upOltMeterId == null) {
- log.debug("Missing meter for upstreamOlt {} on {}", upstreamOltBpInfo.id(), subscriberPort);
- bws.add(upstreamOltBpInfo);
- }
- if (downOltMeterId == null) {
- log.debug("Missing meter for downstreamOlt {} on {}", downstreamOltBpInfo.id(), subscriberPort);
- bws.add(downstreamOltBpInfo);
- }
- bws.stream().distinct().forEach(bw -> checkAndCreateDevMeter(deviceId, bw));
- }
- }
-
- private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- log.debug("Checking and Creating Meter with {} on {}", bwpInfo, deviceId);
- if (bwpInfo == null) {
- log.error("Can't create meter. Bandwidth profile is null for device : {}", deviceId);
- return;
- }
- //If false the meter is already being installed, skipping installation
- if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
- log.debug("Meter is already being installed on {} for {}", deviceId, bwpInfo);
- return;
- }
- createMeter(deviceId, bwpInfo);
- }
-
- private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- log.info("Creating Meter with {} on {}", bwpInfo, deviceId);
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
-
- MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
- meterFuture);
-
- meterFuture.thenAcceptAsync(result -> {
- log.info("Meter Future for {} has completed", meterId);
- pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
- // iterate through the subscribers on hold
- if (queue != null && !queue.isEmpty()) {
- while (true) {
- //TODO this might return the reference and not the actual object so
- // it can be actually swapped underneath us.
- SubscriberFlowInfo fi = queue.peek();
- if (fi == null) {
- log.info("No more subscribers pending on {}", deviceId);
- queue = new LinkedBlockingQueue<>();
- break;
- }
- if (result == null) {
- // meter install sent to device
- log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
-
- MeterId upMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
- MeterId downMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
- MeterId upOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getUpOltBpInfo());
- MeterId downOltMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getDownOltBpInfo());
- if (upMeterId != null && downMeterId != null &&
- upOltMeterId != null && downOltMeterId != null) {
- log.debug("Provisioning subscriber after meter {} " +
- "installation and all meters are present " +
- "upstream {} , downstream {} , oltUpstream {} " +
- "and oltDownstream {} on {}",
- meterId, upMeterId, downMeterId, upOltMeterId,
- downOltMeterId, fi.getUniPort());
- // put in the meterIds because when fi was first
- // created there may or may not have been a meterId
- // depending on whether the meter was created or
- // not at that time.
- //TODO possibly spawn this in a separate thread.
- fi.setUpMeterId(upMeterId);
- fi.setDownMeterId(downMeterId);
- fi.setUpOltMeterId(upOltMeterId);
- fi.setDownOltMeterId(downOltMeterId);
- handleSubFlowsWithMeters(fi);
- queue.remove(fi);
- } else {
- log.debug("Not all meters for {} are yet installed up {}, " +
- "down {}, oltUp {}, oltDown {}", fi, upMeterId,
- downMeterId, upOltMeterId, downOltMeterId);
- }
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
- } else {
- // meter install failed
- log.error("Addition of subscriber {} on {} failed due to meter " +
- "{} with result {}", fi, fi.getUniPort(), meterId, result);
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
- queue.remove(fi);
- }
+ if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
+ // if we're not local leader for this device, ignore this queue
+ if (log.isTraceEnabled()) {
+ log.trace("Ignoring queue on CP {} as not master of the device", cp);
}
- } else {
- log.info("No pending subscribers on {}", deviceId);
- queue = new LinkedBlockingQueue<>();
- }
- return queue;
- });
- });
-
- }
-
- /**
- * Add subscriber flows given meter information for both upstream and
- * downstream directions.
- *
- * @param subscriberFlowInfo relevant information for subscriber
- */
- private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
- log.info("Provisioning subscriber flows based on {}", subscriberFlowInfo);
- UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
- if (tagInfo.getIsDhcpRequired()) {
- Optional<MacAddress> macAddress =
- getMacAddress(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), tagInfo);
- if (subscriberFlowInfo.getTagInfo().getEnableMacLearning()) {
- ConnectPoint cp = new ConnectPoint(subscriberFlowInfo.getDevId(),
- subscriberFlowInfo.getUniPort().number());
- if (macAddress.isPresent()) {
- log.debug("MAC Address {} obtained for {}", macAddress.get(), subscriberFlowInfo);
- } else {
- waitingMacSubscribers.put(cp, subscriberFlowInfo);
- log.debug("Adding sub to waiting mac map: {}", subscriberFlowInfo);
+ return;
}
- CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
- oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
- subscriberFlowInfo.getUpId(), subscriberFlowInfo.getUpOltId(),
- tagInfo, true, true, Optional.of(dhcpFuture));
- dhcpFuture.thenAcceptAsync(dhcpStatus -> {
- if (dhcpStatus != null) {
- log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
- if (macAddress.isEmpty()) {
- waitingMacSubscribers.remove(cp, subscriberFlowInfo);
+ flowsExecutor.execute(() -> {
+ if (!eventsQueue.isEmpty()) {
+ // we do not remove the event from the queue until it has been processed
+ // in that way we guarantee that events are processed in order
+ DiscoveredSubscriber sub = eventsQueue.peek();
+ if (sub == null) {
+ // the queue is empty
+ return;
}
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED,
- subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort().port(),
- tagInfo.getPonSTag(), tagInfo.getPonCTag(), tagInfo.getTechnologyProfileId()));
- } else {
- log.debug("Dhcp Objective success for: {}", subscriberFlowInfo);
- if (macAddress.isPresent()) {
- continueProvisioningSubs(subscriberFlowInfo, macAddress);
+
+ if (log.isTraceEnabled()) {
+ log.trace("Processing subscriber on port {} with status {}",
+ portWithName(sub.port), sub.status);
+ }
+
+ if (sub.hasSubscriber) {
+ // this is a provision subscriber call
+ if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Provisioning of subscriber on {} completed",
+ portWithName(sub.port));
+ }
+ removeSubscriberFromQueue(sub);
+ }
+ } else {
+ // this is a port event (ENABLED/DISABLED)
+ // means no subscriber was provisioned on that port
+
+ if (!deviceService.isAvailable(sub.device.id()) ||
+ deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
+ // If the device is not connected or the port is not available do nothing
+ // This can happen when we disable and then immediately delete the device,
+ // the queue is populated but the meters and flows are already gone
+ // thus there is nothing left to do
+ return;
+ }
+
+ if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Processing of port {} completed",
+ portWithName(sub.port));
+ }
+ removeSubscriberFromQueue(sub);
+ }
}
}
});
- } else {
- log.debug("Dynamic MAC Learning disabled, so will not learn for: {}", subscriberFlowInfo);
- // dhcp flows will handle after data plane flows
- continueProvisioningSubs(subscriberFlowInfo, macAddress);
- }
- } else {
- // dhcp not required for this service
- continueProvisioningSubs(subscriberFlowInfo, Optional.empty());
- }
- }
+ });
- private void continueProvisioningSubs(SubscriberFlowInfo subscriberFlowInfo, Optional<MacAddress> macAddress) {
- AccessDevicePort uniPort = subscriberFlowInfo.getUniPort();
- log.debug("Provisioning subscriber flows on {} based on {}", uniPort, subscriberFlowInfo);
- UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
- CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
- CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
-
- ForwardingObjective.Builder upFwd =
- oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort, subscriberFlowInfo.getUpId(),
- subscriberFlowInfo.getUpOltId(), subscriberFlowInfo.getTagInfo());
- flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.debug("Upstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
- upFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- upFuture.complete(error);
- }
- }));
-
- ForwardingObjective.Builder downFwd =
- oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
- subscriberFlowInfo.getDownId(), subscriberFlowInfo.getDownOltId(),
- subscriberFlowInfo.getTagInfo(), macAddress);
- flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.debug("Downstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
- downFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- downFuture.complete(error);
- }
- }));
-
- upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
- AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
- if (downStatus != null) {
- log.error("Flow with innervlan {} and outerVlan {} on {} failed downstream installation: {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, downStatus);
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
- } else if (upStatus != null) {
- log.error("Flow with innervlan {} and outerVlan {} on {} failed upstream installation: {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, upStatus);
- type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
- } else {
- log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
- Optional<String> upstreamOltBw = tagInfo.getUpstreamOltBandwidthProfile() == null ?
- Optional.empty() : Optional.of(tagInfo.getUpstreamOltBandwidthProfile());
- oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
- upstreamOltBw, null,
- tagInfo.getPonCTag(), true);
-
- if (!tagInfo.getEnableMacLearning()) {
- oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- subscriberFlowInfo.getUpOltId(), tagInfo, true, true, Optional.empty());
- }
-
- oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
-
- oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
-
- updateProgrammedSubscriber(uniPort, tagInfo, true);
- }
- post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
- tagInfo.getPonSTag(), tagInfo.getPonCTag(),
- tagInfo.getTechnologyProfileId()));
- }, oltInstallers);
- }
-
- /**
- * Gets mac address from tag info if present, else checks the host service.
- *
- * @param deviceId device ID
- * @param port uni port
- * @param tagInformation tag info
- * @return MAC Address of subscriber
- */
- private Optional<MacAddress> getMacAddress(DeviceId deviceId, AccessDevicePort port,
- UniTagInformation tagInformation) {
- if (isMacAddressValid(tagInformation)) {
- log.debug("Got MAC Address {} from the uniTagInformation for {} and cTag {}",
- tagInformation.getConfiguredMacAddress(), port, tagInformation.getPonCTag());
- return Optional.of(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
- } else if (tagInformation.getEnableMacLearning()) {
- Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
- .stream().filter(host -> host.vlan().equals(tagInformation.getPonCTag())).findFirst();
- if (optHost.isPresent()) {
- log.debug("Got MAC Address {} from the hostService for {} and cTag {}",
- optHost.get().mac(), port, tagInformation.getPonCTag());
- return Optional.of(optHost.get().mac());
+ try {
+ TimeUnit.MILLISECONDS.sleep(requeueDelay);
+ } catch (InterruptedException e) {
+ continue;
}
}
- log.debug("Could not obtain MAC Address for {} and cTag {}", port, tagInformation.getPonCTag());
- return Optional.empty();
- }
-
- private boolean isMacAddressValid(UniTagInformation tagInformation) {
- return tagInformation.getConfiguredMacAddress() != null &&
- !tagInformation.getConfiguredMacAddress().trim().equals("") &&
- !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
}
/**
@@ -1253,26 +585,26 @@
* using the pon c tag, pon s tag and the technology profile id
* May return Optional<null>
*
- * @param port port of the subscriber
+ * @param portName port of the subscriber
* @param innerVlan pon c tag
* @param outerVlan pon s tag
* @param tpId the technology profile id
* @return the found uni tag information
*/
- private Optional<UniTagInformation> getUniTagInformation(AccessDevicePort port, VlanId innerVlan,
- VlanId outerVlan, int tpId) {
+ private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
+ VlanId outerVlan, int tpId) {
log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
- port, innerVlan, outerVlan, tpId);
- SubscriberAndDeviceInformation subInfo = getSubscriber(new ConnectPoint(port.deviceId(), port.number()));
+ portName, innerVlan, outerVlan, tpId);
+ SubscriberAndDeviceInformation subInfo = subsService.get(portName);
if (subInfo == null) {
- log.warn("Subscriber information doesn't exist for {}", port);
- return Optional.empty();
+ log.warn("Subscriber information doesn't exist for {}", portName);
+ return null;
}
List<UniTagInformation> uniTagList = subInfo.uniTagList();
if (uniTagList == null) {
- log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), port);
- return Optional.empty();
+ log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
+ return null;
}
UniTagInformation service = null;
@@ -1286,438 +618,298 @@
if (service == null) {
log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
- innerVlan, outerVlan, tpId, port);
- return Optional.empty();
- }
-
- return Optional.of(service);
- }
-
- /**
- * Creates trap flows for device, including DHCP and LLDP trap on NNI and
- * EAPOL trap on the UNIs, if device is present in Sadis config.
- *
- * @param dev Device to look for
- */
- private void checkAndCreateDeviceFlows(Device dev) {
- // check if this device is provisioned in Sadis
- SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
- log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
-
- if (deviceInfo != null) {
- log.debug("Driver for device {} is {}", dev.id(),
- driverService.getDriver(dev.id()));
- for (Port p : deviceService.getPorts(dev.id())) {
- if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
- continue;
- }
- if (isUniPort(dev, p)) {
- AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
- if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
- log.info("Creating Eapol on {}", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
- null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
- } else {
- log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
- }
- } else {
- AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
- oltFlowService.processNniFilteringObjectives(port, true);
- }
- }
- }
- }
-
-
- /**
- * Get the uplink for of the OLT device.
- * <p>
- * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
- * this logic needs to be changed
- *
- * @param dev Device to look for
- * @return The uplink Port of the OLT
- */
- private AccessDevicePort getUplinkPort(Device dev) {
- // check if this device is provisioned in Sadis
- SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
- log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
- if (deviceInfo == null) {
- log.warn("Device {} is not configured in SADIS .. cannot fetch device"
- + " info", dev.id());
+ innerVlan, outerVlan, tpId, portName);
return null;
}
- // Return the port that has been configured as the uplink port of this OLT in Sadis
- Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
- .filter(port -> isNniPort(port) ||
- (port.number().toLong() == deviceInfo.uplinkPort()))
- .findFirst();
- if (optionalPort.isPresent()) {
- log.trace("getUplinkPort: Found port {}", optionalPort.get());
- return new AccessDevicePort(optionalPort.get(), AccessDevicePort.Type.NNI);
- }
- log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
- return null;
+ return service;
}
- /**
- * Return the subscriber on a port.
- *
- * @param cp ConnectPoint on which to find the subscriber
- * @return subscriber if found else null
- */
- protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
- if (subsService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return null;
- }
- Port port = deviceService.getPort(cp);
- checkNotNull(port, "Invalid connect point");
- String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
- return subsService.get(portName);
+ protected void bindSadisService(SadisService service) {
+ sadisService = service;
+ subsService = sadisService.getSubscriberInfoService();
+
+ log.info("Sadis-service binds to onos.");
}
- /**
- * Checks whether the given port of the device is a uni port or not.
- *
- * @param d the access device
- * @param p the port of the device
- * @return true if the given port is a uni port
- */
- private boolean isUniPort(Device d, Port p) {
- AccessDevicePort ulPort = getUplinkPort(d);
- if (ulPort != null) {
- return (ulPort.number().toLong() != p.number().toLong());
- }
- //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
- //checks whether the port name starts with nni- which is the signature of an NNI Port
- if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
- p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
- log.error("NNI port number {} is not matching with configured value", p.number().toLong());
- return false;
- }
- return true;
+ protected void unbindSadisService(SadisService service) {
+ deviceService.removeListener(deviceListener);
+ deviceListener = null;
+ sadisService = null;
+ subsService = null;
+ log.info("Sadis-service unbinds from onos.");
}
- /**
- * Gets the given device details from SADIS.
- * If the device is not found, returns null
- *
- * @param dev the access device
- * @return the olt information
- */
- private SubscriberAndDeviceInformation getOltInfo(Device dev) {
- if (subsService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return null;
+ protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
+ ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
+ LinkedBlockingQueue<DiscoveredSubscriber> q = null;
+ try {
+ queueReadLock.lock();
+ q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
+ } finally {
+ queueReadLock.unlock();
}
- String devSerialNo = dev.serialNumber();
- return subsService.get(devSerialNo);
- }
-
- /**
- * Checks for mastership or falls back to leadership on deviceId.
- * If the device is available use mastership,
- * otherwise fallback on leadership.
- * Leadership on the device topic is needed because the master can be NONE
- * in case the device went away, we still need to handle events
- * consistently
- */
- private boolean isLocalLeader(DeviceId deviceId) {
- if (deviceService.isAvailable(deviceId)) {
- return mastershipService.isLocalMaster(deviceId);
+ if (!q.contains(sub)) {
+ log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
+ portWithName(sub.port), sub.status, sub.hasSubscriber);
+ q.add(sub);
} else {
- // Fallback with Leadership service - device id is used as topic
- NodeId leader = leadershipService.runForLeadership(
- deviceId.toString()).leaderNodeId();
- // Verify if this node is the leader
- return clusterService.getLocalNode().id().equals(leader);
+ log.debug("Not adding subscriber to queue as already present: {} with status {}",
+ portWithName(sub.port), sub.status);
+ // no need to update the queue in the map if nothing has changed
+ return;
+ }
+ try {
+ queueWriteLock.lock();
+ eventsQueues.put(cp, q);
+ } catch (UnsupportedOperationException | ClassCastException |
+ NullPointerException | IllegalArgumentException e) {
+ log.error("Cannot add subscriber to queue: {}", e.getMessage());
+ } finally {
+ queueWriteLock.unlock();
}
}
- private boolean isNniPort(Port port) {
- if (port.annotations().keys().contains(AnnotationKeys.PORT_NAME)) {
- return port.annotations().value(AnnotationKeys.PORT_NAME).contains(NNI);
+ protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
+ ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
+ LinkedBlockingQueue<DiscoveredSubscriber> q = null;
+ if (log.isTraceEnabled()) {
+ log.trace("removing subscriber {} from queue", sub);
}
- return false;
- }
-
- private class InternalHostListener implements HostListener {
- @Override
- public void event(HostEvent event) {
- hostEventExecutor.execute(() -> {
- Host host = event.subject();
- switch (event.type()) {
- case HOST_ADDED:
- ConnectPoint cp = new ConnectPoint(host.location().deviceId(), host.location().port());
- Optional<SubscriberFlowInfo> optSubFlowInfo =
- getAndRemoveWaitingMacSubFlowInfoForCTag(cp, host.vlan());
- if (optSubFlowInfo.isPresent()) {
- log.debug("Continuing provisioning for waiting mac service. event: {}", event);
- continueProvisioningSubs(optSubFlowInfo.get(), Optional.of(host.mac()));
- } else {
- log.debug("There is no waiting mac sub. event: {}", event);
- }
- break;
- case HOST_UPDATED:
- if (event.prevSubject() != null && !event.prevSubject().mac().equals(event.subject().mac())) {
- log.debug("Subscriber's MAC address changed from {} to {}. " +
- "devId/portNumber: {}/{} vlan: {}", event.prevSubject().mac(),
- event.subject().mac(), host.location().deviceId(), host.location().port(),
- host.vlan());
- // TODO handle subscriber MAC Address changed
- } else {
- log.debug("Unhandled HOST_UPDATED event: {}", event);
- }
- break;
- default:
- log.debug("Unhandled host event received. event: {}", event);
- }
- });
+ try {
+ queueReadLock.lock();
+ q = eventsQueues.get(cp);
+ } finally {
+ queueReadLock.unlock();
+ }
+ if (q == null) {
+ log.warn("Cannot find queue for connectPoint {}", cp);
+ return;
+ }
+ boolean removed = q.remove(sub);
+ if (!removed) {
+ log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
+ return;
+ } else {
+ log.debug("Subscriber {} has been removed from the queue", sub);
}
- @Override
- public boolean isRelevant(HostEvent event) {
- return isLocalLeader(event.subject().location().deviceId());
+ try {
+ queueWriteLock.lock();
+ eventsQueues.remove(cp); // am I needed??
+ eventsQueues.put(cp, q);
+ } catch (UnsupportedOperationException | ClassCastException |
+ NullPointerException | IllegalArgumentException e) {
+ log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
+ } finally {
+ queueWriteLock.unlock();
}
}
- private class InternalDeviceListener implements DeviceListener {
- private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
+ protected class OltDeviceListener
+ implements DeviceListener {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected ExecutorService eventExecutor;
+
+ /**
+ * Builds the listener with all the proper services and information needed.
+ */
+ public OltDeviceListener() {
+ this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
+ groupedThreads("onos/olt-device-listener-event", "event-%d", log));
+ }
+
+ public void deactivate() {
+ this.eventExecutor.shutdown();
+ }
@Override
public void event(DeviceEvent event) {
eventExecutor.execute(() -> {
- DeviceId devId = event.subject().id();
- Device dev = event.subject();
- Port p = event.port();
- DeviceEvent.Type eventType = event.type();
-
- if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
- DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
- DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
- return;
- }
-
- boolean isLocalLeader = isLocalLeader(devId);
- // Only handle the event if the device belongs to us
- if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
- && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
- log.info("Cleaning local state for non master instance upon " +
- "device disconnection {}", devId);
- // Since no mastership of the device is present upon disconnection
- // the method in the FlowRuleManager only empties the local copy
- // of the DeviceFlowTable thus this method needs to get called
- // on every instance, see how it's done in the InternalDeviceListener
- // in FlowRuleManager: no mastership check for purgeOnDisconnection
- handleDeviceDisconnection(dev, false, false);
- return;
- } else if (!isLocalLeader) {
- log.debug("Not handling event because instance is not leader for {}", devId);
- return;
- }
-
- log.debug("OLT got {} event for {}/{}", eventType, event.subject(), event.port());
-
- if (getOltInfo(dev) == null) {
- // it's possible that we got an event for a previously
- // programmed OLT that is no longer available in SADIS
- // we let such events go through
- if (!programmedDevices.contains(devId)) {
- log.warn("No device info found for {}, this is either "
- + "not an OLT or not known to sadis", dev);
- return;
- }
- }
- AccessDevicePort port = null;
- if (p != null) {
- if (isUniPort(dev, p)) {
- port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
- } else {
- port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
- }
- }
-
+ boolean isOlt = oltDeviceService.isOlt(event.subject());
+ DeviceId deviceId = event.subject().id();
switch (event.type()) {
- //TODO: Port handling and bookkeeping should be improved once
- // olt firmware handles correct behaviour.
- case PORT_ADDED:
- if (!deviceService.isAvailable(devId)) {
- log.warn("Received {} for disconnected device {}, ignoring", event, devId);
- return;
- }
- if (port.type().equals(AccessDevicePort.Type.UNI)) {
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
-
- if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
- log.info("eapol will be sent for port added {}", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
- null,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN),
- true);
- }
- } else {
- SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
- if (deviceInfo != null) {
- oltFlowService.processNniFilteringObjectives(port, true);
- }
- }
- break;
- case PORT_REMOVED:
- if (port.type().equals(AccessDevicePort.Type.UNI)) {
- // if no subscriber is provisioned we need to remove the default EAPOL
- // if a subscriber was provisioned the default EAPOL will not be there and we can skip.
- // The EAPOL with subscriber tag will be removed by removeSubscriber call.
- Collection<? extends UniTagInformation> uniTagInformationSet =
- programmedSubs.get(new ConnectPoint(port.deviceId(), port.number())).value();
- if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
- log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
- "removing default EAPOL flow", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
- null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
- } else {
- removeSubscriber(port);
- }
-
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
- }
- break;
- case PORT_UPDATED:
- if (!deviceService.isAvailable(devId)) {
- log.warn("Received {} for disconnected device {}, ignoring", event, devId);
- return;
- }
- if (port.type().equals(AccessDevicePort.Type.NNI)) {
- SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
- if (deviceInfo != null && port.isEnabled()) {
- log.debug("NNI {} enabled", port);
- oltFlowService.processNniFilteringObjectives(port, true);
- }
- return;
- }
- ConnectPoint cp = new ConnectPoint(devId, port.number());
- Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
- if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
- if (!port.number().equals(PortNumber.LOCAL)) {
- log.info("eapol will be {} updated for {} with default vlan {}",
- (port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
- null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), port.isEnabled());
- }
- } else {
- log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
- port);
- for (UniTagInformation uniTag : uniTagInformationSet) {
- oltFlowService.processEapolFilteringObjectives(port,
- uniTag.getUpstreamBandwidthProfile(),
- Optional.of(uniTag.getUpstreamOltBandwidthProfile()),
- null, uniTag.getPonCTag(), port.isEnabled());
- }
- }
- if (port.isEnabled()) {
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
- } else {
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
- }
- break;
+ case PORT_STATS_UPDATED:
case DEVICE_ADDED:
- handleDeviceConnection(dev, true);
- break;
- case DEVICE_REMOVED:
- handleDeviceDisconnection(dev, true, true);
- break;
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(devId)) {
- log.info("Handling available device: {}", dev.id());
- handleDeviceConnection(dev, false);
- } else {
- if (deviceService.getPorts(devId).isEmpty()) {
- log.info("Handling controlled device disconnection .. "
- + "flushing all state for dev:{}", devId);
- handleDeviceDisconnection(dev, true, false);
- } else {
- log.info("Disconnected device has available ports .. "
- + "assuming temporary disconnection, "
- + "retaining state for device {}", devId);
- }
- }
- break;
- default:
- log.debug("Not handling event {}", event);
return;
+ case PORT_ADDED:
+ case PORT_UPDATED:
+ case PORT_REMOVED:
+ if (!isOlt) {
+ log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+ return;
+ }
+ if (!oltDeviceService.isLocalLeader(deviceId)) {
+ log.trace("Device {} is not local to this node", deviceId);
+ return;
+ }
+ // port added, updated and removed are treated in the same way as we only care whether the port
+ // is enabled or not
+ handleOltPort(event.type(), event.subject(), event.port());
+ return;
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (!isOlt) {
+ log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+ return;
+ }
+ if (deviceService.isAvailable(deviceId)) {
+ if (!oltDeviceService.isLocalLeader(deviceId)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Device {} is not local to this node, not handling available device",
+ deviceId);
+ }
+ } else {
+ log.info("Handling available device: {}", deviceId);
+ handleExistingPorts();
+ }
+ } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
+ // NOTE that upon disconnection there is no mastership on the device,
+ // and we should anyway clear the local cache of the flows/meters across instances.
+ // We're only clearing the device if there are no available ports,
+ // otherwise we assume it's a temporary disconnection
+ log.info("Device {} availability changed to false and ports are empty, " +
+ "purging meters and flows", deviceId);
+ //NOTE all the instances will call these methods
+ oltFlowService.purgeDeviceFlows(deviceId);
+ oltMeterService.purgeDeviceMeters(deviceId);
+ clearQueueForDevice(deviceId);
+ } else {
+ log.info("Device {} availability changed to false, but ports are still available, " +
+ "assuming temporary disconnection. Ports: {}",
+ deviceId, deviceService.getPorts(deviceId));
+ }
+ return;
+ case DEVICE_REMOVED:
+ if (!isOlt) {
+ log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+ return;
+ }
+ log.info("Device {} Removed, purging meters and flows", deviceId);
+ oltFlowService.purgeDeviceFlows(deviceId);
+ oltMeterService.purgeDeviceMeters(deviceId);
+ clearQueueForDevice(deviceId);
+ return;
+ default:
+ if (log.isTraceEnabled()) {
+ log.trace("Not handling event: {}, ", event);
+ }
}
});
}
- private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
- deviceService.getPorts(device.id()).stream()
- .filter(p -> !PortNumber.LOCAL.equals(p.number()))
- .filter(p -> isUniPort(device, p))
- .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
- }
-
- private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
- programmedDevices.remove(device.id());
- removeAllSubscribers(device.id());
- removeWaitingMacSubs(device.id());
- //Handle case where OLT disconnects during subscriber provisioning
- pendingSubscribersForDevice.remove(device.id());
- oltFlowService.clearDeviceState(device.id());
-
- //Complete meter and flow purge
- flowRuleService.purgeFlowRules(device.id());
- oltMeterService.clearMeters(device.id());
- if (sendDisconnectedEvent) {
- post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
- null, null, null));
- }
- if (sendUniEvent) {
- sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
+ protected void clearQueueForDevice(DeviceId devId) {
+ try {
+ queueWriteLock.lock();
+ Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
+ eventsQueues.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
+ if (entry.getKey().deviceId().equals(devId)) {
+ eventsQueues.remove(entry.getKey());
+ }
+ }
+ } finally {
+ queueWriteLock.unlock();
}
}
- private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
- post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
- null, null, null));
- programmedDevices.add(dev.id());
- checkAndCreateDeviceFlows(dev);
- if (sendUniEvent) {
- sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
+ protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
+ log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
+ portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
+ if (port.isEnabled()) {
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ // NOTE in the NNI case we receive a PORT_REMOVED event with status ENABLED, thus we need to
+ // pass the floeAction to the handleNniFlows method
+ OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
+ if (type == DeviceEvent.Type.PORT_REMOVED) {
+ action = OltFlowService.FlowOperation.REMOVE;
+ }
+ oltFlowService.handleNniFlows(device, port, action);
+ } else {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
+ // NOTE if the subscriber was previously provisioned,
+ // then add it back to the queue to be re-provisioned
+ boolean provisionSubscriber = oltFlowService.
+ isSubscriberServiceProvisioned(new AccessDevicePort(port));
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+ if (si == null) {
+ //NOTE this should not happen given that the subscriber was provisioned before
+ log.error("Subscriber information not found in sadis for port {}",
+ portWithName(port));
+ return;
+ }
+
+ DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
+ if (type == DeviceEvent.Type.PORT_REMOVED) {
+ status = DiscoveredSubscriber.Status.REMOVED;
+ }
+
+ DiscoveredSubscriber sub =
+ new DiscoveredSubscriber(device, port,
+ status, provisionSubscriber, si);
+ addSubscriberToQueue(sub);
+ }
+ } else {
+ if (oltDeviceService.isNniPort(device, port.number())) {
+ // NOTE this may need to be handled on DEVICE_REMOVE as we don't disable the NNI
+ oltFlowService.handleNniFlows(device, port, OltFlowService.FlowOperation.REMOVE);
+ } else {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
+ // NOTE we are assuming that if a subscriber has default eapol
+ // it does not have subscriber flows
+ if (oltFlowService.hasDefaultEapol(port)) {
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+ if (si == null) {
+ //NOTE this should not happen given that the subscriber was provisioned before
+ log.error("Subscriber information not found in sadis for port {}",
+ portWithName(port));
+ return;
+ }
+ DiscoveredSubscriber sub =
+ new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.REMOVED, false, si);
+
+ addSubscriberToQueue(sub);
+
+ } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+ if (si == null) {
+ //NOTE this should not happen given that the subscriber was provisioned before
+ log.error("Subscriber information not found in sadis for port {}",
+ portWithName(port));
+ return;
+ }
+ DiscoveredSubscriber sub =
+ new DiscoveredSubscriber(device, port,
+ DiscoveredSubscriber.Status.REMOVED, true, si);
+ addSubscriberToQueue(sub);
+ }
+ }
}
}
- private void removeAllSubscribers(DeviceId deviceId) {
- List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
- .filter(e -> e.getKey().deviceId().equals(deviceId))
- .collect(toList());
-
- subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
- }
-
- private void removeWaitingMacSubs(DeviceId deviceId) {
- List<ConnectPoint> waitingMacKeys = waitingMacSubscribers.stream()
- .filter(cp -> cp.getKey().deviceId().equals(deviceId))
- .map(Map.Entry::getKey)
- .collect(toList());
- waitingMacKeys.forEach(cp -> waitingMacSubscribers.removeAll(cp));
- }
-
- }
-
- private class InternalClusterListener implements ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
- hasher.addServer(event.subject().id());
- }
- if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
- hasher.removeServer(event.subject().id());
+ /**
+ * This method is invoked on app activation in order to deal
+ * with devices and ports that are already existing in the system
+ * and thus won't trigger an event.
+ * It is also needed on instance reboot and device reconnect
+ */
+ protected void handleExistingPorts() {
+ Iterable<DeviceId> devices = getConnectedOlts();
+ for (DeviceId deviceId : devices) {
+ log.info("Handling existing OLT Ports for device {}", deviceId);
+ if (oltDeviceService.isLocalLeader(deviceId)) {
+ List<Port> ports = deviceService.getPorts(deviceId);
+ for (Port p : ports) {
+ if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
+ continue;
+ }
+ Device device = deviceService.getDevice(deviceId);
+ deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
+ }
+ }
}
}
}
-
}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java b/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java
new file mode 100644
index 0000000..b336c28
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * The implementation of the OltDeviceService.
+ */
+@Component(immediate = true)
+public class OltDeviceService implements OltDeviceServiceInterface {
+
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ bind = "bindSadisService",
+ unbind = "unbindSadisService",
+ policy = ReferencePolicy.DYNAMIC)
+ protected volatile SadisService sadisService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Activate
+ public void activate() {
+ log.info("Activated");
+ }
+
+ /**
+ * Returns true if the device is an OLT.
+ *
+ * @param device the Device to be checked
+ * @return boolean
+ */
+ public boolean isOlt(Device device) {
+ return getOltInfo(device) != null;
+ }
+
+ private SubscriberAndDeviceInformation getOltInfo(Device dev) {
+ if (subsService == null) {
+ return null;
+ }
+ String devSerialNo = dev.serialNumber();
+ return subsService.get(devSerialNo);
+ }
+
+
+ /**
+ * Returns true if the port is an NNI Port on the OLT.
+ * NOTE: We can check if a port is a NNI based on the SADIS config, specifically the uplinkPort section
+ *
+ * @param dev the Device this port belongs to
+ * @param portNumber the PortNumber to be checked
+ * @return boolean
+ */
+ @Override
+ public boolean isNniPort(Device dev, PortNumber portNumber) {
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+ return deviceInfo != null && portNumber.toLong() == deviceInfo.uplinkPort();
+ }
+
+ @Override
+ public Optional<Port> getNniPort(Device device) {
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(device);
+ if (deviceInfo == null) {
+ return Optional.empty();
+ }
+ List<Port> ports = deviceService.getPorts(device.id());
+ if (log.isTraceEnabled()) {
+ log.trace("Get NNI Port looks for NNI in: {}", ports);
+ }
+ return ports.stream()
+ .filter(p -> p.number().toLong() == deviceInfo.uplinkPort())
+ .findFirst();
+ }
+
+ protected void bindSadisService(SadisService service) {
+ this.subsService = service.getSubscriberInfoService();
+ log.info("Sadis service is loaded");
+ }
+
+ protected void unbindSadisService(SadisService service) {
+ this.subsService = null;
+ log.info("Sadis service is unloaded");
+ }
+
+ /**
+ * Checks for mastership or falls back to leadership on deviceId.
+ * If the device is available use mastership,
+ * otherwise fallback on leadership.
+ * Leadership on the device topic is needed because the master can be NONE
+ * in case the device went away, we still need to handle events
+ * consistently
+ *
+ * @param deviceId The device ID to check.
+ * @return boolean (true if the current instance is managing the device)
+ */
+ @Override
+ public boolean isLocalLeader(DeviceId deviceId) {
+ if (deviceService.isAvailable(deviceId)) {
+ return mastershipService.isLocalMaster(deviceId);
+ } else {
+ // Fallback with Leadership service - device id is used as topic
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ // Verify if this node is the leader
+ return clusterService.getLocalNode().id().equals(leader);
+ }
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java
new file mode 100644
index 0000000..e8fcccb
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+
+import java.util.Optional;
+
+/**
+ * Service for olt device handling.
+ */
+public interface OltDeviceServiceInterface {
+ /**
+ * Returns true if the device is a known OLT to sadis/config.
+ * @param device the device
+ * @return true if a configured olt
+ */
+ boolean isOlt(Device device);
+
+ /**
+ * Returns true if the port is an NNI port of the OLT device.
+ * @param device the device
+ * @param port the port
+ * @return true if an NNI port of that OLT
+ */
+ boolean isNniPort(Device device, PortNumber port);
+
+ /**
+ * Returns the NNi port fo the OLT device if present.
+ * @param device the device
+ * @return the nni Port, if present
+ */
+ Optional<Port> getNniPort(Device device);
+
+ /**
+ * Returns true if the instance is leader for the OLT device.
+ * @param deviceId the device
+ * @return true if master, false otherwise.
+ */
+ boolean isLocalLeader(DeviceId deviceId);
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 1868421..ee184a5 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.opencord.olt.impl;
+import com.google.common.collect.ImmutableMap;
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.IPv6;
@@ -26,18 +28,31 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
@@ -46,13 +61,11 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.host.HostService;
import org.onosproject.net.meter.MeterId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -67,26 +80,46 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Dictionary;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
+import static org.opencord.olt.impl.OltUtils.flowOpToString;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
-/**
- * Provisions flow rules on access devices.
- */
@Component(immediate = true, property = {
ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
ENABLE_DHCP_V4 + ":Boolean=" + ENABLE_DHCP_V4_DEFAULT,
@@ -94,32 +127,20 @@
ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
- DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
+ DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
+ // FIXME remove this option as potentially dangerous in production
+ WAIT_FOR_REMOVAL + ":Boolean=" + WAIT_FOR_REMOVAL_DEFAULT
})
-public class OltFlowService implements AccessDeviceFlowService {
- private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
- private static final String APP_NAME = "org.opencord.olt";
- private static final int NONE_TP_ID = -1;
- private static final int NO_PCP = -1;
- private static final Integer MAX_PRIORITY = 10000;
- private static final Integer MIN_PRIORITY = 1000;
- private static final String INSTALLED = "installed";
- private static final String REMOVED = "removed";
- private static final String INSTALLATION = "installation";
- private static final String REMOVAL = "removal";
- private static final String V4 = "V4";
- private static final String V6 = "V6";
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected FlowObjectiveService flowObjectiveService;
+public class OltFlowService implements OltFlowServiceInterface {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
+ protected ComponentConfigService cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL,
bind = "bindSadisService",
@@ -128,17 +149,65 @@
protected volatile SadisService sadisService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OltMeterServiceInterface oltMeterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OltDeviceServiceInterface oltDeviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowRuleService flowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected AccessDeviceMeterService oltMeterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ protected BaseInformationService<BandwidthProfileInformation> bpService;
+
+ private static final String APP_NAME = "org.opencord.olt";
+ protected ApplicationId appId;
+ private static final Integer MAX_PRIORITY = 10000;
+ private static final Integer MIN_PRIORITY = 1000;
+ private static final short EAPOL_DEFAULT_VLAN = 4091;
+ private static final int NONE_TP_ID = -1;
+ private static final String V4 = "V4";
+ private static final String V6 = "V6";
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected UniTagInformation defaultEapolUniTag = new UniTagInformation.Builder()
+ .setServiceName("defaultEapol").build();
+ protected UniTagInformation nniUniTag = new UniTagInformation.Builder()
+ .setServiceName("nni")
+ .setTechnologyProfileId(NONE_TP_ID)
+ .setPonCTag(VlanId.NONE)
+ .setUniTagMatch(VlanId.ANY)
+ .setUsPonCTagPriority(-1)
+ .build();
+
+ /**
+ * Connect Point status map.
+ * Used to keep track of which cp has flows that needs to be removed when the status changes.
+ */
+ protected Map<ServiceKey, OltPortStatus> cpStatus;
+ private final ReentrantReadWriteLock cpStatusLock = new ReentrantReadWriteLock();
+ private final Lock cpStatusWriteLock = cpStatusLock.writeLock();
+ private final Lock cpStatusReadLock = cpStatusLock.readLock();
+
+ /**
+ * This map contains the subscriber that have been provisioned by the operator.
+ * They may or may not have flows, depending on the port status.
+ * The map is used to define whether flows need to be provisioned when a port comes up.
+ */
+ protected Map<ServiceKey, Boolean> provisionedSubscribers;
+ private final ReentrantReadWriteLock provisionedSubscribersLock = new ReentrantReadWriteLock();
+ private final Lock provisionedSubscribersWriteLock = provisionedSubscribersLock.writeLock();
+ private final Lock provisionedSubscribersReadLock = provisionedSubscribersLock.readLock();
+
/**
* Create DHCP trap flow on NNI port(s).
*/
@@ -174,42 +243,76 @@
**/
protected int defaultTechProfileId = DEFAULT_TP_ID_DEFAULT;
- protected ApplicationId appId;
- protected BaseInformationService<BandwidthProfileInformation> bpService;
- protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
+ protected boolean waitForRemoval = WAIT_FOR_REMOVAL_DEFAULT;
+
+ public enum FlowOperation {
+ ADD,
+ REMOVE;
+
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
+ }
+
+ public enum FlowDirection {
+ UPSTREAM,
+ DOWNSTREAM,
+ }
+
+ public enum OltFlowsStatus {
+ NONE,
+ PENDING_ADD,
+ ADDED,
+ PENDING_REMOVE,
+ REMOVED,
+ ERROR
+ }
+
+ protected InternalFlowListener internalFlowListener;
@Activate
public void activate(ComponentContext context) {
- if (sadisService != null) {
- bpService = sadisService.getBandwidthProfileService();
- subsService = sadisService.getSubscriberInfoService();
- } else {
- log.warn(SADIS_NOT_RUNNING);
- }
- componentConfigService.registerProperties(getClass());
- appId = coreService.getAppId(APP_NAME);
+ cfgService.registerProperties(getClass());
+ appId = coreService.registerApplication(APP_NAME);
+ internalFlowListener = new InternalFlowListener();
+
+ modified(context);
+
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .register(UniTagInformation.class)
- .register(SubscriberFlowInfo.class)
+ .register(OltFlowsStatus.class)
+ .register(FlowDirection.class)
+ .register(OltPortStatus.class)
+ .register(OltFlowsStatus.class)
.register(AccessDevicePort.class)
- .register(AccessDevicePort.Type.class)
- .register(LinkedBlockingQueue.class)
+ .register(new ServiceKeySerializer(), ServiceKey.class)
+ .register(UniTagInformation.class)
.build();
- pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
- .withName("volt-pending-eapol")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build().asJavaMap();
- log.info("started");
- }
+ cpStatus = storageService.<ServiceKey, OltPortStatus>consistentMapBuilder()
+ .withName("volt-cp-status")
+ .withApplicationId(appId)
+ .withSerializer(Serializer.using(serializer))
+ .build().asJavaMap();
+
+ provisionedSubscribers = storageService.<ServiceKey, Boolean>consistentMapBuilder()
+ .withName("volt-provisioned-subscriber")
+ .withApplicationId(appId)
+ .withSerializer(Serializer.using(serializer))
+ .build().asJavaMap();
+
+ flowRuleService.addListener(internalFlowListener);
+
+ log.info("Started");
+ }
@Deactivate
public void deactivate(ComponentContext context) {
- componentConfigService.unregisterProperties(getClass(), false);
- log.info("stopped");
+ cfgService.unregisterProperties(getClass(), false);
+ flowRuleService.removeListener(internalFlowListener);
+ log.info("Stopped");
}
@Modified
@@ -247,106 +350,712 @@
enablePppoe = pppoe;
}
+ Boolean wait = Tools.isPropertyEnabled(properties, WAIT_FOR_REMOVAL);
+ if (wait != null) {
+ waitForRemoval = wait;
+ }
+
String tpId = get(properties, DEFAULT_TP_ID);
defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
- log.info("modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
- "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
- "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}",
- enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
- enableIgmpOnNni, enableEapol, enablePppoe,
- defaultTechProfileId);
+ log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
+ "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
+ "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
+ "waitForRemoval:{}",
+ enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
+ enableIgmpOnNni, enableEapol, enablePppoe,
+ defaultTechProfileId, waitForRemoval);
}
- protected void bindSadisService(SadisService service) {
- sadisService = service;
- bpService = sadisService.getBandwidthProfileService();
- subsService = sadisService.getSubscriberInfoService();
- log.info("Sadis-service binds to onos.");
- }
-
- protected void unbindSadisService(SadisService service) {
- sadisService = null;
- bpService = null;
- subsService = null;
- log.info("Sadis-service unbinds from onos.");
- }
-
@Override
- public void processDhcpFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream,
- Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
- if (upstream) {
- // for UNI ports
- if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
- log.debug("Dhcp provisioning is disabled for UNI port {} for service {}",
- port, tagInformation.getServiceName());
- dhcpFuture.ifPresent(f -> f.complete(null));
- return;
- }
- } else {
- // for NNI ports
- if (!enableDhcpOnNni) {
- log.debug("Dhcp provisioning is disabled for NNI port {}", port);
- dhcpFuture.ifPresent(f -> f.complete(null));
- return;
- }
- }
- int techProfileId = tagInformation != null ? tagInformation.getTechnologyProfileId() : NONE_TP_ID;
- VlanId cTag = tagInformation != null ? tagInformation.getPonCTag() : VlanId.NONE;
- VlanId unitagMatch = tagInformation != null ? tagInformation.getUniTagMatch() : VlanId.ANY;
- Byte vlanPcp = tagInformation != null && tagInformation.getUsPonCTagPriority() != NO_PCP
- ? (byte) tagInformation.getUsPonCTagPriority() : null;
-
-
- if (enableDhcpV4) {
- int udpSrc = (upstream) ? 68 : 67;
- int udpDst = (upstream) ? 67 : 68;
-
- EthType ethType = EthType.EtherType.IPV4.ethType();
- byte protocol = IPv4.PROTOCOL_UDP;
-
- addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
- }
-
- if (enableDhcpV6) {
- int udpSrc = (upstream) ? 547 : 546;
- int udpDst = (upstream) ? 546 : 547;
-
- EthType ethType = EthType.EtherType.IPV6.ethType();
- byte protocol = IPv6.PROTOCOL_UDP;
-
- addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
+ public ImmutableMap<ServiceKey, OltPortStatus> getConnectPointStatus() {
+ try {
+ cpStatusReadLock.lock();
+ return ImmutableMap.copyOf(cpStatus);
+ } finally {
+ cpStatusReadLock.unlock();
}
}
- private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
- EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
- int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
- Byte vlanPcp, boolean upstream,
- boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
+ @Override
+ public ImmutableMap<ServiceKey, UniTagInformation> getProgrammedSubscribers() {
+ // NOTE we might want to remove this conversion and directly use cpStatus as it contains
+ // all the required information about suscribers
+ Map<ServiceKey, UniTagInformation> subscribers =
+ new HashMap<>();
+ try {
+ cpStatusReadLock.lock();
+
+ cpStatus.forEach((sk, status) -> {
+ if (
+ // not NNI Port
+ !oltDeviceService.isNniPort(deviceService.getDevice(sk.getPort().connectPoint().deviceId()),
+ sk.getPort().connectPoint().port()) &&
+ // not EAPOL flow
+ !sk.getService().equals(defaultEapolUniTag)
+ ) {
+ subscribers.put(sk, sk.getService());
+ }
+ });
+
+ return ImmutableMap.copyOf(subscribers);
+ } finally {
+ cpStatusReadLock.unlock();
+ }
+ }
+
+ @Override
+ public Map<ServiceKey, Boolean> getRequestedSubscribers() {
+ try {
+ provisionedSubscribersReadLock.lock();
+ return ImmutableMap.copyOf(provisionedSubscribers);
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void handleNniFlows(Device device, Port port, FlowOperation action) {
+
+ // always handle the LLDP flow
+ processLldpFilteringObjective(device.id(), port, action);
+
+ if (enableDhcpOnNni) {
+ if (enableDhcpV4) {
+ log.debug("Installing DHCPv4 trap flow on NNI {} for device {}", portWithName(port), device.id());
+ processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ 67, 68, EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP,
+ null, null, nniUniTag);
+ }
+ if (enableDhcpV6) {
+ log.debug("Installing DHCPv6 trap flow on NNI {} for device {}", portWithName(port), device.id());
+ processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ 546, 547, EthType.EtherType.IPV6.ethType(), IPv6.PROTOCOL_UDP,
+ null, null, nniUniTag);
+ }
+ } else {
+ log.info("DHCP is not required on NNI {} for device {}", portWithName(port), device.id());
+ }
+
+ if (enableIgmpOnNni) {
+ log.debug("Installing IGMP flow on NNI {} for device {}", portWithName(port), device.id());
+ processIgmpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
+ }
+
+ if (enablePppoe) {
+ log.debug("Installing PPPoE flow on NNI {} for device {}", port.number(), device.id());
+ processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+ null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
+ }
+ }
+
+ @Override
+ public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
+ String oltBandwidthProfileId) {
+
+ // we only need to something if EAPOL is enabled
+ if (!enableEapol) {
+ return true;
+ }
+
+ if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+ return addDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+ } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+ return removeDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+ } else {
+ log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
+ return false;
+ }
+
+ }
+
+ private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
+
+ if (!oltMeterService.createMeter(sub.device.id(), bandwidthProfileId)) {
+ if (log.isTraceEnabled()) {
+ log.trace("waiting on meter for bp {} and sub {}", bandwidthProfileId, sub);
+ }
+ return false;
+ }
+ if (hasDefaultEapol(sub.port)) {
+ return true;
+ }
+ return handleEapolFlow(sub, bandwidthProfileId,
+ oltBandwidthProfileId, FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+
+ }
+
+ private boolean removeDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfile, String oltBandwidthProfile) {
+ // NOTE that we are not checking for meters as they must have been created to install the flow in first place
+ return handleEapolFlow(sub, bandwidthProfile, oltBandwidthProfile,
+ FlowOperation.REMOVE, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+ }
+
+ @Override
+ public boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwidthProfile,
+ String multicastServiceName) {
+ // NOTE that we are taking defaultBandwithProfile as a parameter as that can be configured in the Olt component
+ if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+ return addSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+ } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+ return removeSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+ } else {
+ log.error("don't know how to handle {}", sub);
+ return false;
+ }
+ }
+
+ private boolean addSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+ String multicastServiceName) {
+ if (log.isTraceEnabled()) {
+ log.trace("Provisioning of subscriber on {} started", portWithName(sub.port));
+ }
+ if (enableEapol) {
+ if (hasDefaultEapol(sub.port)) {
+ // remove EAPOL flow and throw exception so that we'll retry later
+ if (!isDefaultEapolPendingRemoval(sub.port)) {
+ removeDefaultFlows(sub, defaultBandwithProfile, defaultBandwithProfile);
+ }
+
+ if (waitForRemoval) {
+ // NOTE wait for removal is a flag only needed to make sure VOLTHA
+ // does not explode with the flows remove/add in the same batch
+ log.debug("Awaiting for default flows removal for {}", portWithName(sub.port));
+ return false;
+ } else {
+ log.warn("continuing provisioning on {}", portWithName(sub.port));
+ }
+ }
+
+ }
+
+ // NOTE createMeters will return if the meters are not installed
+ if (!oltMeterService.createMeters(sub.device.id(),
+ sub.subscriberAndDeviceInformation)) {
+ return false;
+ }
+
+ // NOTE we need to add the DHCP flow regardless so that the host can be discovered and the MacAddress added
+ handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.ADD,
+ sub.subscriberAndDeviceInformation);
+
+ if (isMacLearningEnabled(sub.subscriberAndDeviceInformation)
+ && !isMacAddressAvailable(sub.device.id(), sub.port,
+ sub.subscriberAndDeviceInformation)) {
+ log.debug("Awaiting for macAddress on {}", portWithName(sub.port));
+ return false;
+ }
+
+ handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
+ sub.subscriberAndDeviceInformation, multicastServiceName);
+
+ handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
+ handleSubscriberIgmpFlows(sub, FlowOperation.ADD);
+
+ log.info("Provisioning of subscriber on {} completed", portWithName(sub.port));
+ return true;
+ }
+
+ private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+ String multicastServiceName) {
+
+ if (log.isTraceEnabled()) {
+ log.trace("Removal of subscriber on {} started",
+ portWithName(sub.port));
+ }
+ SubscriberAndDeviceInformation si = subsService.get(sub.portName());
+ if (si == null) {
+ log.error("Subscriber information not found in sadis for port {} during subscriber removal",
+ portWithName(sub.port));
+ // NOTE that we are returning true so that the subscriber is removed from the queue
+ // and we can move on provisioning others
+ return true;
+ }
+
+ handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
+
+ if (enableEapol) {
+ // remove the tagged eapol
+ handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+
+ // and add the default one back
+ if (sub.port.isEnabled()) {
+ // NOTE we remove the subscriber when the port goes down
+ // but in that case we don't need to add default eapol
+ handleEapolFlow(sub, defaultBandwithProfile, defaultBandwithProfile,
+ FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+ }
+ }
+ handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
+
+ handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+ // FIXME check the return status of the flow and return accordingly
+ log.info("Removal of subscriber on {} completed", portWithName(sub.port));
+ return true;
+ }
+
+ @Override
+ public boolean hasDefaultEapol(Port port) {
+ OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+ // NOTE we consider ERROR as a present EAPOL flow as ONOS
+ // will keep trying to add it
+ return status != null && (status.defaultEapolStatus == OltFlowsStatus.ADDED ||
+ status.defaultEapolStatus == OltFlowsStatus.PENDING_ADD ||
+ status.defaultEapolStatus == OltFlowsStatus.ERROR);
+ }
+
+ private OltPortStatus getOltPortStatus(Port port, UniTagInformation uniTagInformation) {
+ try {
+ cpStatusReadLock.lock();
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uniTagInformation);
+ OltPortStatus status = cpStatus.get(sk);
+ return status;
+ } finally {
+ cpStatusReadLock.unlock();
+ }
+ }
+
+ public boolean isDefaultEapolPendingRemoval(Port port) {
+ OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during EAPOL flow check {} for port {} and UniTagInformation {}",
+ status, portWithName(port), defaultEapolUniTag);
+ }
+ return status != null && status.defaultEapolStatus == OltFlowsStatus.PENDING_REMOVE;
+ }
+
+ @Override
+ public boolean hasDhcpFlows(Port port, UniTagInformation uti) {
+ OltPortStatus status = getOltPortStatus(port, uti);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during DHCP flow check {} for port {} and service {}",
+ status, portWithName(port), uti.getServiceName());
+ }
+ return status != null &&
+ (status.dhcpStatus == OltFlowsStatus.ADDED ||
+ status.dhcpStatus == OltFlowsStatus.PENDING_ADD);
+ }
+
+ @Override
+ public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
+
+ OltPortStatus status = getOltPortStatus(port, uti);
+ if (log.isTraceEnabled()) {
+ log.trace("Status during subscriber flow check {} for port {} and service {}",
+ status, portWithName(port), uti.getServiceName());
+ }
+ return status != null && (status.subscriberFlowsStatus == OltFlowsStatus.ADDED ||
+ status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
+ }
+
+ @Override
+ public void purgeDeviceFlows(DeviceId deviceId) {
+ log.debug("Purging flows on device {}", deviceId);
+ flowRuleService.purgeFlowRules(deviceId);
+
+ // removing the status from the cpStatus map
+ try {
+ cpStatusWriteLock.lock();
+ Iterator<Map.Entry<ServiceKey, OltPortStatus>> iter = cpStatus.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<ServiceKey, OltPortStatus> entry = iter.next();
+ if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+ cpStatus.remove(entry.getKey());
+ }
+ }
+ } finally {
+ cpStatusWriteLock.unlock();
+ }
+
+ // removing subscribers from the provisioned map
+ try {
+ provisionedSubscribersWriteLock.lock();
+ Iterator<Map.Entry<ServiceKey, Boolean>> iter = provisionedSubscribers.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<ServiceKey, Boolean> entry = iter.next();
+ if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+ provisionedSubscribers.remove(entry.getKey());
+ }
+ }
+ } finally {
+ provisionedSubscribersWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
+ // if any service is programmed on this port, returns true
+ AtomicBoolean provisioned = new AtomicBoolean(false);
+ try {
+ provisionedSubscribersReadLock.lock();
+ for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
+ if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+ provisioned.set(true);
+ break;
+ }
+ }
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ return provisioned.get();
+ }
+
+ @Override
+ public boolean isSubscriberServiceProvisioned(ServiceKey sk) {
+ try {
+ provisionedSubscribersReadLock.lock();
+ Boolean provisioned = provisionedSubscribers.get(sk);
+ if (provisioned == null || !provisioned) {
+ return false;
+ }
+ } finally {
+ provisionedSubscribersReadLock.unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status) {
+ try {
+ provisionedSubscribersWriteLock.lock();
+ provisionedSubscribers.put(sk, status);
+ } finally {
+ provisionedSubscribersWriteLock.unlock();
+ }
+ }
+
+ private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+ String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
+
+ // create a subscriberKey for the EAPOL flow
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(sub.port), defaultEapolUniTag);
+
+ // NOTE we only need to keep track of the default EAPOL flow in the
+ // connectpoint status map
+ if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+ OltFlowsStatus status = action == FlowOperation.ADD ?
+ OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+ updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+
+ }
+
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ int techProfileId = getDefaultTechProfileId(sub.port);
+ MeterId meterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), bandwidthProfile);
+
+ // in the delete case the meter should still be there as we remove
+ // the meters only if no flows are pointing to them
+ if (meterId == null) {
+ log.debug("MeterId is null for BandwidthProfile {} on device {}",
+ bandwidthProfile, sub.device.id());
+ return false;
+ }
+
+ MeterId oltMeterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), oltBandwidthProfile);
+ if (oltMeterId == null) {
+ log.debug("MeterId is null for OltBandwidthProfile {} on device {}",
+ oltBandwidthProfile, sub.device.id());
+ return false;
+ }
+
+ log.info("{} EAPOL flow for {} with vlanId {} and BandwidthProfile {} (meterId {})",
+ flowOpToString(action), portWithName(sub.port), vlanId, bandwidthProfile, meterId);
+
+ FilteringObjective.Builder eapolAction;
+
+ if (action == FlowOperation.ADD) {
+ eapolAction = filterBuilder.permit();
+ } else if (action == FlowOperation.REMOVE) {
+ eapolAction = filterBuilder.deny();
+ } else {
+ log.error("Operation {} not supported", action);
+ return false;
+ }
+
+ FilteringObjective.Builder baseEapol = eapolAction
+ .withKey(Criteria.matchInPort(sub.port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()));
+
+ // NOTE we only need to add the treatment to install the flow,
+ // we can remove it based in the match
+ FilteringObjective.Builder eapol;
+
+ TrafficTreatment treatment = treatmentBuilder
+ .meter(meterId)
+ .writeMetadata(createTechProfValueForWriteMetadata(
+ vlanId,
+ techProfileId, oltMeterId), 0)
+ .setOutput(PortNumber.CONTROLLER)
+ .pushVlan()
+ .setVlanId(vlanId)
+ .build();
+ eapol = baseEapol
+ .withMeta(treatment);
+
+ FilteringObjective eapolObjective = eapol
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("EAPOL flow objective {} for {}",
+ completeFlowOpToString(action), portWithName(sub.port));
+ if (log.isTraceEnabled()) {
+ log.trace("EAPOL flow details for port {}: {}", portWithName(sub.port), objective);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Cannot {} eapol flow for {} : {}", action,
+ portWithName(sub.port), error);
+
+ if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+ updateConnectPointStatus(sk,
+ OltFlowsStatus.ERROR, null, null);
+ }
+ }
+ });
+
+ flowObjectiveService.filter(sub.device.id(), eapolObjective);
+
+ log.info("{} EAPOL filter for {}", completeFlowOpToString(action), portWithName(sub.port));
+ return true;
+ }
+
+ // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
+ private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+ SubscriberAndDeviceInformation si) {
+ if (!enableEapol) {
+ return true;
+ }
+ // TODO verify we need an EAPOL flow for EACH service
+ AtomicBoolean success = new AtomicBoolean(true);
+ si.uniTagList().forEach(u -> {
+ // we assume that if subscriber flows are installed, tagged EAPOL is installed as well
+ boolean hasFlows = hasSubscriberFlows(sub.port, u);
+
+ // if the subscriber flows are present the EAPOL flow is present thus we don't need to install it,
+ // if the subscriber does not have flows the EAPOL flow is not present thus we don't need to remove it
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not {} EAPOL on {}({}) as subscriber flow status is {}", flowOpToString(action),
+ portWithName(sub.port), u.getServiceName(), hasFlows);
+ return;
+ }
+ log.info("{} EAPOL flows for subscriber {} on {} and service {}",
+ flowOpToString(action), si.id(), portWithName(sub.port), u.getServiceName());
+ if (!handleEapolFlow(sub, u.getUpstreamBandwidthProfile(),
+ u.getUpstreamOltBandwidthProfile(),
+ action, u.getPonCTag())) {
+ //
+ log.error("Failed to {} EAPOL with suscriber tags", action);
+ //TODO this sets it for all services, maybe some services succeeded.
+ success.set(false);
+ }
+ });
+ return success.get();
+ }
+
+ private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+ sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
+ if (uti.getIsIgmpRequired()) {
+ DeviceId deviceId = sub.device.id();
+ // if we reached here a meter already exists
+ MeterId meterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+ MeterId oltMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+ processIgmpFilteringObjectives(deviceId, sub.port,
+ action, FlowDirection.UPSTREAM, meterId, oltMeterId, uti.getTechnologyProfileId(),
+ uti.getPonCTag(), uti.getUniTagMatch(), uti.getUsPonCTagPriority());
+ }
+ });
+ }
+
+ private boolean checkSadisRunning() {
+ if (bpService == null) {
+ log.warn("Sadis is not running");
+ return false;
+ }
+ return true;
+ }
+
+ private int getDefaultTechProfileId(Port port) {
+ if (!checkSadisRunning()) {
+ return defaultTechProfileId;
+ }
+ if (port != null) {
+ SubscriberAndDeviceInformation info = subsService.get(getPortName(port));
+ if (info != null && info.uniTagList().size() == 1) {
+ return info.uniTagList().get(0).getTechnologyProfileId();
+ }
+ }
+ return defaultTechProfileId;
+ }
+
+ protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+ Long writeMetadata;
+
+ if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+ writeMetadata = (long) techProfileId << 32;
+ } else {
+ writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+ }
+ if (upstreamOltMeterId == null) {
+ return writeMetadata;
+ } else {
+ return writeMetadata | upstreamOltMeterId.id();
+ }
+ }
+
+ private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+ FilteringObjective lldp = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+ .withMeta(DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} LLDP filter for {}.", completeFlowOpToString(action), portWithName(port));
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Falied to {} LLDP filter on {} because {}", action, portWithName(port),
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(deviceId, lldp);
+ }
+
+ protected void handleSubscriberDhcpFlows(DeviceId deviceId, Port port,
+ FlowOperation action,
+ SubscriberAndDeviceInformation si) {
+ si.uniTagList().forEach(uti -> {
+
+ if (!uti.getIsDhcpRequired()) {
+ return;
+ }
+
+ // if it's an ADD skip if flows are there,
+ // if it's a DELETE skip if flows are not there
+ boolean hasFlows = hasDhcpFlows(port, uti);
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not dealing with DHCP {} on {} as DHCP flow status is {}", action,
+ uti.getServiceName(), hasFlows);
+ return;
+ }
+
+ log.info("{} DHCP flows for subscriber on {} and service {}",
+ flowOpToString(action), portWithName(port), uti.getServiceName());
+
+ // if we reached here a meter already exists
+ MeterId meterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+ MeterId oltMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+ if (enableDhcpV4) {
+ processDhcpFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, 68, 67,
+ EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP, meterId, oltMeterId,
+ uti);
+ }
+ if (enableDhcpV6) {
+ log.error("DHCP V6 not supported for subscribers");
+ }
+ });
+ }
+
+ // FIXME return boolean, if this fails we need to retry
+ protected void handleSubscriberDataFlows(Device device, Port port,
+ FlowOperation action,
+ SubscriberAndDeviceInformation si, String multicastServiceName) {
+
+ Optional<Port> nniPort = oltDeviceService.getNniPort(device);
+ if (nniPort.isEmpty()) {
+ log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
+ si.id(), portWithName(port));
+ return;
+ }
+ si.uniTagList().forEach(uti -> {
+
+ boolean hasFlows = hasSubscriberFlows(port, uti);
+ if (action == FlowOperation.ADD && hasFlows ||
+ action == FlowOperation.REMOVE && !hasFlows) {
+ log.debug("Not dealing with DP flows {} on {} as subscriber flow status is {}", action,
+ uti.getServiceName(), hasFlows);
+ return;
+ }
+
+ if (multicastServiceName.equals(uti.getServiceName())) {
+ log.debug("This is the multicast service ({}) for subscriber {} on {}, " +
+ "dataplane flows are not needed",
+ uti.getServiceName(), si.id(), portWithName(port));
+ return;
+ }
+
+ log.info("{} Data plane flows for subscriber {} on {} and service {}",
+ flowOpToString(action), si.id(), portWithName(port), uti.getServiceName());
+
+ // upstream flows
+ MeterId usMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
+ MeterId oltUsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
+ processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+ oltUsMeterId, uti);
+
+ // downstream flows
+ MeterId dsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
+ MeterId oltDsMeterId = oltMeterService
+ .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
+ processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+ oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+ });
+ }
+
+ private void processDhcpFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ int udpSrc, int udpDst, EthType ethType, byte protocol,
+ MeterId meterId, MeterId oltMeterId, UniTagInformation uti) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+ log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
+
+
+ OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
+ OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+ updateConnectPointStatus(sk, null, null, status);
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
}
- if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
+ if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(
+ createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
+ uti.getTechnologyProfileId(), oltMeterId), 0);
}
- FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
+ FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
.withKey(Criteria.matchInPort(port.number()))
.addCondition(Criteria.matchEthType(ethType))
.addCondition(Criteria.matchIPProtocol(protocol))
@@ -356,87 +1065,124 @@
.withPriority(MAX_PRIORITY);
//VLAN changes and PCP matching need to happen only in the upstream directions
- if (upstream) {
- treatmentBuilder.setVlanId(cTag);
- if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
- dhcpUpstreamBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ if (direction == FlowDirection.UPSTREAM) {
+ treatmentBuilder.setVlanId(uti.getPonCTag());
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+ dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
}
- if (vlanPcp != null) {
- treatmentBuilder.setVlanPcp(vlanPcp);
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
}
- dhcpUpstreamBuilder.withMeta(treatmentBuilder
- .setOutput(PortNumber.CONTROLLER).build());
+ dhcpBuilder.withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build());
- FilteringObjective dhcpUpstream = dhcpUpstreamBuilder.add(new ObjectiveContext() {
+ FilteringObjective dhcpUpstream = dhcpBuilder.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("DHCP {} filter for {} {}.",
- (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
- (install) ? INSTALLED : REMOVED);
- dhcpFuture.ifPresent(f -> f.complete(null));
+ log.info("{} DHCP {} filter for {}.",
+ completeFlowOpToString(action), (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ portWithName(port));
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.error("DHCP {} filter for {} failed {} because {}",
- (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
- (install) ? INSTALLATION : REMOVAL,
+ (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ portWithName(port),
+ action,
error);
- dhcpFuture.ifPresent(f -> f.complete(error));
+ updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
}
});
- flowObjectiveService.filter(port.deviceId(), dhcpUpstream);
+ flowObjectiveService.filter(deviceId, dhcpUpstream);
+ }
+
+ private void processIgmpFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ MeterId meterId, MeterId oltMeterId, int techProfileId,
+ VlanId cTag, VlanId unitagMatch, int vlanPcp) {
+
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ if (direction == FlowDirection.UPSTREAM) {
+
+ if (techProfileId != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+ techProfileId, oltMeterId), 0);
+ }
+
+
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+ filterBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(cTag)) {
+ treatmentBuilder.setVlanId(cTag);
+ }
+
+ if (vlanPcp != -1) {
+ treatmentBuilder.setVlanPcp((byte) vlanPcp);
+ }
+ }
+
+ filterBuilder = (action == FlowOperation.ADD) ? filterBuilder.permit() : filterBuilder.deny();
+
+ FilteringObjective igmp = filterBuilder
+ .withKey(Criteria.matchInPort(port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+ .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+ .withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Igmp filter for {} {}.", portWithName(port), action);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Igmp filter for {} failed {} because {}.", portWithName(port), action,
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(deviceId, igmp);
}
- @Override
- public void processPPPoEDFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream) {
- if (!enablePppoe) {
- log.debug("PPPoED filtering is disabled. Ignoring request.");
- return;
- }
-
- int techProfileId = NONE_TP_ID;
- VlanId cTag = VlanId.NONE;
- VlanId unitagMatch = VlanId.ANY;
- Byte vlanPcp = null;
-
- if (tagInformation != null) {
- techProfileId = tagInformation.getTechnologyProfileId();
- cTag = tagInformation.getPonCTag();
- unitagMatch = tagInformation.getUniTagMatch();
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- vlanPcp = (byte) tagInformation.getUsPonCTagPriority();
- }
- }
+ private void processPPPoEDFilteringObjectives(DeviceId deviceId, Port port,
+ FlowOperation action, FlowDirection direction,
+ MeterId meterId, MeterId oltMeterId, int techProfileId,
+ VlanId cTag, VlanId unitagMatch, Byte vlanPcp) {
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
+ if (meterId != null) {
+ treatmentBuilder.meter(meterId);
}
if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
+ treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
}
- DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
+ DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
+ ? builder.permit() : builder.deny())
.withKey(Criteria.matchInPort(port.number()))
.addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
.fromApp(appId)
.withPriority(10000);
- if (upstream) {
+ if (direction == FlowDirection.UPSTREAM) {
treatmentBuilder.setVlanId(cTag);
if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
pppoedBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
@@ -451,398 +1197,50 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("PPPoED filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
+ log.info("PPPoED filter for {} {}.", portWithName(port), action);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("PPPoED filter for {} failed {} because {}", port,
- (install) ? INSTALLATION : REMOVAL, error);
+ log.info("PPPoED filter for {} failed {} because {}", portWithName(port),
+ action, error);
}
});
- flowObjectiveService.filter(port.deviceId(), pppoed);
+ flowObjectiveService.filter(deviceId, pppoed);
}
- @Override
- public void processIgmpFilteringObjectives(AccessDevicePort port,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation tagInformation,
- boolean install,
- boolean upstream) {
- if (upstream) {
- // for UNI ports
- if (tagInformation != null && !tagInformation.getIsIgmpRequired()) {
- log.debug("Igmp provisioning is disabled for UNI port {} for service {}",
- port, tagInformation.getServiceName());
- return;
- }
- } else {
- // for NNI ports
- if (!enableIgmpOnNni) {
- log.debug("Igmp provisioning is disabled for NNI port {}", port);
- return;
- }
- }
-
- log.debug("{} IGMP flows on {}", (install) ? "Installing" : "Removing", port);
- DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
- TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- if (upstream) {
-
- if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
- tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
- }
-
-
- if (upstreamMeterId != null) {
- treatmentBuilder.meter(upstreamMeterId);
- }
-
- if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getUniTagMatch())) {
- filterBuilder.addCondition(Criteria.matchVlanId(tagInformation.getUniTagMatch()));
- }
-
- if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getPonCTag())) {
- treatmentBuilder.setVlanId(tagInformation.getPonCTag());
- }
-
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
- }
- }
-
- filterBuilder = install ? filterBuilder.permit() : filterBuilder.deny();
-
- FilteringObjective igmp = filterBuilder
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
- .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
- .withMeta(treatmentBuilder
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Igmp filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("Igmp filter for {} failed {} because {}.", port, (install) ? INSTALLATION : REMOVAL,
- error);
- }
- });
-
- flowObjectiveService.filter(port.deviceId(), igmp);
- }
-
- @Override
- public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
- CompletableFuture<ObjectiveError> filterFuture,
- VlanId vlanId, boolean install) {
-
- if (!enableEapol) {
- log.debug("Eapol filtering is disabled. Completing filteringFuture immediately for the device {}",
- port.deviceId());
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- return;
- }
- log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
- BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
- BandwidthProfileInformation oltBpInfo;
- if (oltBpId.isPresent()) {
- oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
- } else {
- oltBpInfo = bpInfo;
- }
- if (bpInfo == null) {
- log.warn("Bandwidth profile {} is not found. Authentication flow"
- + " will not be installed on {}", bpId, port);
- if (filterFuture != null) {
- filterFuture.complete(ObjectiveError.BADPARAMS);
- }
- return;
- }
-
- ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
- DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
- TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- // check if meter exists and create it only for an install
- final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
- MeterId oltMeterId = null;
- if (oltBpId.isPresent()) {
- oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
- }
- log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
- "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
- if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
- if (install) {
- log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder()
- .setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId,
- null, bpInfo.id(), null, oltBpInfo.id());
- pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
- if (queue == null) {
- queue = new LinkedBlockingQueue<>();
- }
- queue.add(fi);
- return queue;
- });
-
- //If false the meter is already being installed, skipping installation
- if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
- !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
- return;
- }
- List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
- bwpList.stream().distinct().filter(Objects::nonNull)
- .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
- cp, filterBuilder, treatmentBuilder));
- } else {
- // this case should not happen as the request to remove an eapol
- // flow should mean that the flow points to a meter that exists.
- // Nevertheless we can still delete the flow as we only need the
- // correct 'match' to do so.
- log.warn("Unknown meter id for bp {}, still proceeding with "
- + "delete of eapol flow on {}", bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder().setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId, null,
- bpInfo.id(), null, oltBpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
- }
- } else {
- log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
- SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder().setPonCTag(vlanId).build(),
- null, meterId, null, oltMeterId, null,
- bpInfo.id(), null, oltBpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
- //No need for the future, meter is present.
- return;
- }
- }
-
- private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
- CompletableFuture<ObjectiveError> filterFuture,
- boolean install, ConnectPoint cp,
- DefaultFilteringObjective.Builder filterBuilder,
- TrafficTreatment.Builder treatmentBuilder) {
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
- MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
- DeviceId deviceId = port.deviceId();
- meterFuture.thenAccept(result -> {
- //for each pending eapol flow we check if the meter is there.
- pendingEapolForDevice.compute(deviceId, (id, queue) -> {
- if (queue != null && !queue.isEmpty()) {
- while (true) {
- //TODO this might return the reference and not the actual object
- // so it can be actually swapped underneath us.
- SubscriberFlowInfo fi = queue.peek();
- if (fi == null) {
- log.debug("No more subscribers eapol flows on {}", deviceId);
- queue = new LinkedBlockingQueue<>();
- break;
- }
- log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
- if (result == null) {
- MeterId mId = oltMeterService
- .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
- MeterId oltMeterId = oltMeterService
- .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
- if (mId != null && oltMeterId != null) {
- log.debug("Meter installation completed for subscriber on {}, " +
- "handling EAPOL trap flow", port);
- fi.setUpMeterId(mId);
- fi.setUpOltMeterId(oltMeterId);
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
- mId, oltMeterId);
- queue.remove(fi);
- } else {
- log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
- "oltMeterId {}", fi, meterId, oltMeterId);
- }
- } else {
- log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
- "Result {} and MeterId {}", port, result, meterId);
- queue.remove(fi);
- }
- oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
- }
- } else {
- log.info("No pending EAPOLs on {}", port.deviceId());
- queue = new LinkedBlockingQueue<>();
- }
- return queue;
- });
- });
- }
-
- private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
- boolean install, ConnectPoint cp,
- DefaultFilteringObjective.Builder filterBuilder,
- TrafficTreatment.Builder treatmentBuilder,
- SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
- log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
- mId, fi.getUpBpInfo(), fi.getUniPort(),
- (install) ? "Installing" : "Removing");
- int techProfileId = getDefaultTechProfileId(fi.getUniPort());
- // can happen in case of removal
- if (mId != null) {
- treatmentBuilder.meter(mId);
- }
- //Authentication trap flow uses only tech profile id as write metadata value
- FilteringObjective eapol = (install ? filterBuilder.permit() : filterBuilder.deny())
- .withKey(Criteria.matchInPort(fi.getUniPort().number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
- .withMeta(treatmentBuilder
- .writeMetadata(createTechProfValueForWm(
- fi.getTagInfo().getPonCTag(),
- techProfileId, oltMeterId), 0)
- .setOutput(PortNumber.CONTROLLER)
- .pushVlan()
- .setVlanId(fi.getTagInfo().getPonCTag())
- .build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Eapol filter {} for {} on {} with meter {}.",
- objective.id(), (install) ? INSTALLED : REMOVED, fi.getUniPort(), mId);
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("Eapol filter {} for {} with meter {} " +
- "failed {} because {}", objective.id(), fi.getUniPort(), mId,
- (install) ? INSTALLATION : REMOVAL,
- error);
- if (filterFuture != null) {
- filterFuture.complete(error);
- }
- }
- });
- flowObjectiveService.filter(fi.getDevId(), eapol);
- }
-
- /**
- * Installs trap filtering objectives for particular traffic types on an
- * NNI port.
- *
- * @param nniPort NNI port
- * @param install true to install, false to remove
- */
- @Override
- public void processNniFilteringObjectives(AccessDevicePort nniPort, boolean install) {
- log.info("{} flows for NNI port {}",
- install ? "Adding" : "Removing", nniPort);
- processLldpFilteringObjective(nniPort, install);
- processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
- processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
- processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
- }
-
-
- @Override
- public void processLldpFilteringObjective(AccessDevicePort port, boolean install) {
- DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
- FilteringObjective lldp = (install ? builder.permit() : builder.deny())
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
- .withMeta(DefaultTrafficTreatment.builder()
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("LLDP filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.error("LLDP filter for {} failed {} because {}", port, (install) ? INSTALLATION : REMOVAL,
- error);
- }
- });
-
- flowObjectiveService.filter(port.deviceId(), lldp);
- }
-
- @Override
- public ForwardingObjective.Builder createTransparentBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId meterId,
- UniTagInformation tagInfo,
- boolean upstream) {
-
+ private void processUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+ FlowOperation action,
+ MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
+ UniTagInformation uti) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchVlanId(tagInfo.getPonSTag())
- .matchInPort(upstream ? subscriberPort.number() : uplinkPort.number())
- .matchInnerVlanId(tagInfo.getPonCTag())
- .build();
-
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
- if (meterId != null) {
- tBuilder.meter(meterId);
- }
-
- TrafficTreatment treatment = tBuilder
- .setOutput(upstream ? uplinkPort.number() : subscriberPort.number())
- .writeMetadata(createMetadata(upstream ? tagInfo.getPonSTag() : tagInfo.getPonCTag(),
- tagInfo.getTechnologyProfileId(),
- upstream ? uplinkPort.number() : subscriberPort.number()), 0)
- .build();
-
- return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
- DefaultAnnotations.builder().build());
- }
-
- @Override
- public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId upstreamMeterId,
- MeterId upstreamOltMeterId,
- UniTagInformation uniTagInformation) {
-
- TrafficSelector selector = DefaultTrafficSelector.builder()
- .matchInPort(subscriberPort.number())
- .matchVlanId(uniTagInformation.getUniTagMatch())
+ .matchInPort(port.number())
+ .matchVlanId(uti.getUniTagMatch())
.build();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
//if the subscriberVlan (cTag) is different than ANY it needs to set.
- if (uniTagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
treatmentBuilder.pushVlan()
- .setVlanId(uniTagInformation.getPonCTag());
+ .setVlanId(uti.getPonCTag());
}
- if (uniTagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonCTagPriority());
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
treatmentBuilder.pushVlan()
- .setVlanId(uniTagInformation.getPonSTag());
+ .setVlanId(uti.getPonSTag());
- if (uniTagInformation.getUsPonSTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonSTagPriority());
+ if (uti.getUsPonSTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
}
- treatmentBuilder.setOutput(uplinkPort.number())
- .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
- uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
+ treatmentBuilder.setOutput(nniPort.number())
+ .writeMetadata(createMetadata(uti.getPonCTag(),
+ uti.getTechnologyProfileId(), nniPort.number()), 0L);
DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -855,55 +1253,84 @@
annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
}
- return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+ DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selector,
+ treatmentBuilder.build(), MIN_PRIORITY,
annotationBuilder.build());
+
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} Upstream Data plane filter for {}.",
+ completeFlowOpToString(action), sk);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Upstream Data plane filter for {} failed {} because {}.",
+ sk, action, error);
+ updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+ }
+ };
+
+ ForwardingObjective flow = null;
+ if (action == FlowOperation.ADD) {
+ flow = flowBuilder.add(context);
+ } else if (action == FlowOperation.REMOVE) {
+ flow = flowBuilder.remove(context);
+ } else {
+ log.error("Flow action not supported: {}", action);
+ }
+
+ if (flow != null) {
+ flowObjectiveService.forward(deviceId, flow);
+ }
}
- @Override
- public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
- AccessDevicePort subscriberPort,
- MeterId downstreamMeterId,
- MeterId downstreamOltMeterId,
- UniTagInformation tagInformation,
- Optional<MacAddress> macAddress) {
-
+ private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+ FlowOperation action,
+ MeterId downstreamMeterId,
+ MeterId downstreamOltMeterId,
+ UniTagInformation uti,
+ MacAddress macAddress) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
//subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
- .matchVlanId(tagInformation.getPonSTag())
- .matchInPort(uplinkPort.number())
- .matchInnerVlanId(tagInformation.getPonCTag());
+ .matchVlanId(uti.getPonSTag())
+ .matchInPort(nniPort.number())
+ .matchInnerVlanId(uti.getPonCTag());
-
- if (tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
- selectorBuilder.matchMetadata(tagInformation.getPonCTag().toShort());
+ if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ selectorBuilder.matchMetadata(uti.getPonCTag().toShort());
}
- if (tagInformation.getDsPonSTagPriority() != NO_PCP) {
- selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
+ if (uti.getDsPonCTagPriority() != -1) {
+ selectorBuilder.matchVlanPcp((byte) uti.getDsPonCTagPriority());
}
- macAddress.ifPresent(selectorBuilder::matchEthDst);
+ if (macAddress != null) {
+ selectorBuilder.matchEthDst(macAddress);
+ }
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
.popVlan()
- .setOutput(subscriberPort.number());
+ .setOutput(port.number());
- treatmentBuilder.writeMetadata(createMetadata(tagInformation.getPonCTag(),
- tagInformation.getTechnologyProfileId(),
- subscriberPort.number()), 0);
+ treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+ uti.getTechnologyProfileId(),
+ port.number()), 0);
// Upstream pbit is used to remark inner vlan pbit.
// Upstream is used to avoid trusting the BNG to send the packet with correct pbit.
// this is done because ds mode 0 is used because ds mode 3 or 6 that allow for
// all pbit acceptance are not widely supported by vendors even though present in
// the OMCI spec.
- if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
- treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+ if (uti.getUsPonCTagPriority() != -1) {
+ treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
}
- if (!VlanId.NONE.equals(tagInformation.getUniTagMatch()) &&
- tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
- treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
+ if (!VlanId.NONE.equals(uti.getUniTagMatch()) &&
+ uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ treatmentBuilder.setVlanId(uti.getUniTagMatch());
}
DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -918,13 +1345,39 @@
annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
}
- return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
- annotationBuilder.build());
- }
+ DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+ treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
- @Override
- public void clearDeviceState(DeviceId deviceId) {
- pendingEapolForDevice.remove(deviceId);
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("{} Downstream Data plane filter for {}.",
+ completeFlowOpToString(action), sk);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.info("Downstream Data plane filter for {} failed {} because {}.",
+ sk, action, error);
+ updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+ }
+ };
+
+ ForwardingObjective flow = null;
+ if (action == FlowOperation.ADD) {
+ flow = flowBuilder.add(context);
+ } else if (action == FlowOperation.REMOVE) {
+ flow = flowBuilder.remove(context);
+ } else {
+ log.error("Flow action not supported: {}", action);
+ }
+
+ if (flow != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Forwarding rule {}", flow);
+ }
+ flowObjectiveService.forward(deviceId, flow);
+ }
}
private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -941,70 +1394,6 @@
.withTreatment(treatment);
}
- /**
- * Returns the write metadata value including tech profile reference and innerVlan.
- * For param cVlan, null can be sent
- *
- * @param cVlan c (customer) tag of one subscriber
- * @param techProfileId tech profile id of one subscriber
- * @param upstreamOltMeterId upstream meter id for OLT device.
- * @return the write metadata value including tech profile reference and innerVlan
- */
- private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
- Long writeMetadata;
-
- if (cVlan == null || VlanId.NONE.equals(cVlan)) {
- writeMetadata = (long) techProfileId << 32;
- } else {
- writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
- }
- if (upstreamOltMeterId == null) {
- return writeMetadata;
- } else {
- return writeMetadata | upstreamOltMeterId.id();
- }
- }
-
- private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
- if (bpService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return null;
- }
- if (bandwidthProfile == null) {
- return null;
- }
- return bpService.get(bandwidthProfile);
- }
-
- /**
- * It will be used to support AT&T use case (for EAPOL flows).
- * If multiple services are found in uniServiceList, returns default tech profile id
- * If one service is found, returns the found one
- *
- * @param port uni port
- * @return the default technology profile id
- */
- private int getDefaultTechProfileId(AccessDevicePort port) {
- if (subsService == null) {
- log.warn(SADIS_NOT_RUNNING);
- return defaultTechProfileId;
- }
- if (port != null) {
- SubscriberAndDeviceInformation info = subsService.get(port.name());
- if (info != null && info.uniTagList().size() == 1) {
- return info.uniTagList().get(0).getTechnologyProfileId();
- }
- }
- return defaultTechProfileId;
- }
-
- /**
- * Write metadata instruction value (metadata) is 8 bytes.
- * <p>
- * MS 2 bytes: C Tag
- * Next 2 bytes: Technology Profile Id
- * Next 4 bytes: Port number (uni or nni)
- */
private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
if (techProfileId == NONE_TP_ID) {
techProfileId = DEFAULT_TP_ID_DEFAULT;
@@ -1013,5 +1402,318 @@
return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
}
+ private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
+ AtomicBoolean requiresMacLearning = new AtomicBoolean();
+ requiresMacLearning.set(false);
+ si.uniTagList().forEach(uniTagInfo -> {
+ if (uniTagInfo.getEnableMacLearning()) {
+ requiresMacLearning.set(true);
+ }
+ });
+
+ return requiresMacLearning.get();
+ }
+
+ /**
+ * Checks whether the subscriber has the MacAddress configured or discovered.
+ *
+ * @param deviceId DeviceId for this subscriber
+ * @param port Port for this subscriber
+ * @param si SubscriberAndDeviceInformation
+ * @return boolean
+ */
+ protected boolean isMacAddressAvailable(DeviceId deviceId, Port port, SubscriberAndDeviceInformation si) {
+ AtomicBoolean isConfigured = new AtomicBoolean();
+ isConfigured.set(true);
+
+ si.uniTagList().forEach(uniTagInfo -> {
+ boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
+ boolean configureMac = isMacAddressValid(uniTagInfo);
+ boolean discoveredMac = false;
+ Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+ .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+ if (optHost.isPresent() && optHost.get().mac() != null) {
+ discoveredMac = true;
+ }
+ if (isMacLearningEnabled && !configureMac && !discoveredMac) {
+ log.debug("Awaiting for macAddress on {} for service {}",
+ portWithName(port), uniTagInfo.getServiceName());
+ isConfigured.set(false);
+ }
+ });
+
+ return isConfigured.get();
+ }
+
+ protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
+ boolean configuredMac = isMacAddressValid(uniTagInfo);
+ if (configuredMac) {
+ return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
+ } else if (uniTagInfo.getEnableMacLearning()) {
+ Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+ .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+ if (optHost.isPresent() && optHost.get().mac() != null) {
+ return optHost.get().mac();
+ }
+ }
+ return null;
+ }
+
+ private boolean isMacAddressValid(UniTagInformation tagInformation) {
+ return tagInformation.getConfiguredMacAddress() != null &&
+ !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+ !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+ }
+
+ protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
+ OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+ try {
+ cpStatusWriteLock.lock();
+ OltPortStatus status = cpStatus.get(key);
+
+ if (status == null) {
+ status = new OltPortStatus(
+ eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
+ subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
+ dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+ );
+ } else {
+ if (eapolStatus != null) {
+ status.defaultEapolStatus = eapolStatus;
+ }
+ if (subscriberFlowsStatus != null) {
+ status.subscriberFlowsStatus = subscriberFlowsStatus;
+ }
+ if (dhcpStatus != null) {
+ status.dhcpStatus = dhcpStatus;
+ }
+ }
+
+ cpStatus.put(key, status);
+ } finally {
+ cpStatusWriteLock.unlock();
+ }
+ }
+
+ protected class InternalFlowListener implements FlowRuleListener {
+ @Override
+ public void event(FlowRuleEvent event) {
+ if (appId.id() != (event.subject().appId())) {
+ return;
+ }
+
+ if (!oltDeviceService.isLocalLeader(event.subject().deviceId())) {
+ if (log.isTraceEnabled()) {
+ log.trace("ignoring flow event {} " +
+ "as not leader for {}", event, event.subject().deviceId());
+ }
+ return;
+ }
+
+ switch (event.type()) {
+ case RULE_ADDED:
+ case RULE_REMOVED:
+ Port port = getCpFromFlowRule(event.subject());
+ if (port == null) {
+ log.error("Can't find port in flow {}", event.subject());
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("flow event {} on cp {}: {}", event.type(),
+ portWithName(port), event.subject());
+ }
+ updateCpStatus(event.type(), port, event.subject());
+ return;
+ case RULE_ADD_REQUESTED:
+ case RULE_REMOVE_REQUESTED:
+ // NOTE that PENDING_ADD/REMOVE is set when we create the flowObjective
+ return;
+ default:
+ return;
+ }
+ }
+
+ protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
+ OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
+ if (isDefaultEapolFlow(flowRule)) {
+ ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
+ defaultEapolUniTag);
+ if (log.isTraceEnabled()) {
+ log.trace("update defaultEapolStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, status, null, null);
+ } else if (isDhcpFlow(flowRule)) {
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ if (sk == null) {
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("update dhcpStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, null, null, status);
+ } else if (isDataFlow(flowRule)) {
+
+ if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
+ getCpFromFlowRule(flowRule).number())) {
+ // the NNI has data-plane for every subscriber, doesn't make sense to track them
+ return;
+ }
+
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ if (sk == null) {
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("update dataplaneStatus {} on {}", status, sk);
+ }
+ updateConnectPointStatus(sk, null, status, null);
+ }
+ }
+
+ private boolean isDefaultEapolFlow(FlowRule flowRule) {
+ EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+ if (c == null) {
+ return false;
+ }
+ if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+ AtomicBoolean isDefault = new AtomicBoolean(false);
+ flowRule.treatment().allInstructions().forEach(instruction -> {
+ if (instruction.type() == L2MODIFICATION) {
+ L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+ if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+ L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+ (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+ if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+ isDefault.set(true);
+ return;
+ }
+ }
+ }
+ });
+ return isDefault.get();
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the flow is a DHCP flow.
+ * Matches both upstream and downstream flows.
+ *
+ * @param flowRule The FlowRule to evaluate
+ * @return boolean
+ */
+ private boolean isDhcpFlow(FlowRule flowRule) {
+ IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
+ .getCriterion(Criterion.Type.IP_PROTO);
+ if (ipCriterion == null) {
+ return false;
+ }
+
+ UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
+
+ if (src == null) {
+ return false;
+ }
+ return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
+ (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
+ }
+
+ private boolean isDataFlow(FlowRule flowRule) {
+ // we consider subscriber flows the one that matches on VLAN_VID
+ // method is valid only because it's the last check after EAPOL and DHCP.
+ // this matches mcast flows as well, if we want to avoid that we can
+ // filter out the elements that have groups in the treatment or
+ // mcastIp in the selector
+ // IPV4_DST:224.0.0.22/32
+ // treatment=[immediate=[GROUP:0x1]]
+
+ return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
+ }
+
+ private Port getCpFromFlowRule(FlowRule flowRule) {
+ DeviceId deviceId = flowRule.deviceId();
+ PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+ if (inPort != null) {
+ PortNumber port = inPort.port();
+ return deviceService.getPort(deviceId, port);
+ }
+ return null;
+ }
+
+ private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
+ Port flowPort = getCpFromFlowRule(flowRule);
+ SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
+
+ Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
+ if (si == null && !isNni) {
+ log.error("Subscriber information not found in sadis for port {}", portWithName(flowPort));
+ return null;
+ }
+
+ if (isNni) {
+ return new ServiceKey(new AccessDevicePort(flowPort), nniUniTag);
+ }
+
+ Optional<UniTagInformation> found = Optional.empty();
+ VlanId flowVlan = null;
+ if (isDhcpFlow(flowRule)) {
+ // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
+ L2ModificationInstruction.ModVlanIdInstruction instruction =
+ (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
+ flowVlan = instruction.vlanId();
+ } else {
+ // for now we assume that if it's not DHCP it's dataplane (or at least tagged)
+ VlanIdCriterion vlanIdCriterion =
+ (VlanIdCriterion) flowRule.selector().getCriterion(Criterion.Type.VLAN_VID);
+ if (vlanIdCriterion == null) {
+ log.warn("cannot match the flow to a subscriber service as it does not carry vlans");
+ return null;
+ }
+ flowVlan = vlanIdCriterion.vlanId();
+ }
+
+ VlanId finalFlowVlan = flowVlan;
+ found = si.uniTagList().stream().filter(uti ->
+ uti.getPonCTag().equals(finalFlowVlan) ||
+ uti.getPonSTag().equals(finalFlowVlan) ||
+ uti.getUniTagMatch().equals(finalFlowVlan)
+ ).findFirst();
+
+
+ if (found.isEmpty()) {
+ log.warn("Cannot map flow rule {} to Service in {}", flowRule, si);
+ }
+
+ return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
+
+ }
+
+ private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
+ switch (type) {
+ case RULE_ADD_REQUESTED:
+ return OltFlowsStatus.PENDING_ADD;
+ case RULE_ADDED:
+ return OltFlowsStatus.ADDED;
+ case RULE_REMOVE_REQUESTED:
+ return OltFlowsStatus.PENDING_REMOVE;
+ case RULE_REMOVED:
+ return OltFlowsStatus.REMOVED;
+ default:
+ return OltFlowsStatus.NONE;
+ }
+ }
+ }
+
+ protected void bindSadisService(SadisService service) {
+ this.subsService = service.getSubscriberInfoService();
+ this.bpService = service.getBandwidthProfileService();
+ log.info("Sadis service is loaded");
+ }
+
+ protected void unbindSadisService(SadisService service) {
+ this.subsService = null;
+ this.bpService = null;
+ log.info("Sadis service is unloaded");
+ }
}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
new file mode 100644
index 0000000..e55c789
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+
+/**
+ * Interface for flow installation/removal methods for different types of traffic.
+ */
+public interface OltFlowServiceInterface {
+ /**
+ * Installs or removes default flows for the port to trap to controller.
+ * @param sub the information about the port
+ * @param defaultBpId the default bandwidth profile
+ * @param oltBandwidthProfile the olt bandwidth profile.
+ * @return true if successful
+ */
+ boolean handleBasicPortFlows(
+ DiscoveredSubscriber sub, String defaultBpId, String oltBandwidthProfile);
+
+ /**
+ * Installs or removes subscriber specific flows.
+ * @param sub the information about the subscriber
+ * @param defaultBpId the default bandwidth profile
+ * @param multicastServiceName the multicast service name.
+ * @return true if successful
+ */
+ boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBpId, String multicastServiceName);
+
+ /**
+ * Installs or removes flows on the NNI port.
+ * @param device the OLT
+ * @param port the NNI port
+ * @param action the operatio, ADD or REMOVE.
+ */
+ void handleNniFlows(Device device, Port port, OltFlowService.FlowOperation action);
+
+ /**
+ * Checks if the default eapol flow is already installed.
+ * @param port the port
+ * @return true if installed, false otherwise.
+ */
+ boolean hasDefaultEapol(Port port);
+
+ /**
+ * Checks if the dhcp flows are installed.
+ * @param port the port
+ * @param uti the UniTagInformation to check for
+ * @return true if installed, false otherwise.
+ */
+ boolean hasDhcpFlows(Port port, UniTagInformation uti);
+
+ /**
+ * Checks if the subscriber flows are installed.
+ * @param port the port
+ * @param uti the UniTagInformation to check for
+ * @return true if installed, false otherwise.
+ */
+ boolean hasSubscriberFlows(Port port, UniTagInformation uti);
+
+ /**
+ * Removes all device flows.
+ * @param deviceId the olt.
+ */
+ void purgeDeviceFlows(DeviceId deviceId);
+
+ /**
+ * Return the status of installation on the connect points.
+ * @return the status map
+ */
+ Map<ServiceKey, OltPortStatus> getConnectPointStatus();
+
+ /**
+ * Returns all the programmed subscribers.
+ * @return the subscribers
+ */
+ Map<ServiceKey, UniTagInformation> getProgrammedSubscribers();
+
+ /**
+ * Returns the list of requested subscribers to be installed with status.
+ * @return the list
+ */
+ Map<ServiceKey, Boolean> getRequestedSubscribers();
+
+ /**
+ * Returns if a subscriber on a port is provisioned or not.
+ * @param cp the port
+ * @return true if any service on that port is provisioned, false otherwise
+ */
+ boolean isSubscriberServiceProvisioned(AccessDevicePort cp);
+
+ /**
+ * Returns if a subscriber on a port is provisioned or not.
+ * @param sk the SubscriberKey
+ * @return true if provisioned, false otherwise
+ */
+ boolean isSubscriberServiceProvisioned(ServiceKey sk);
+
+ /**
+ * Updates the subscriber provisioning status.
+ * @param sk the SubscriberKey
+ * @param status the next status
+ */
+ void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status);
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 4b88344..be04449 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,47 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.opencord.olt.impl;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableMap;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeterRequest;
@@ -66,12 +35,14 @@
import org.onosproject.net.meter.MeterListener;
import org.onosproject.net.meter.MeterRequest;
import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterState;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -79,219 +50,386 @@
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-/**
- * Provisions Meters on access devices.
- */
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
@Component(immediate = true, property = {
DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
- })
-public class OltMeterService implements AccessDeviceMeterService {
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MeterService meterService;
+})
+public class OltMeterService implements OltMeterServiceInterface {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
+ protected ComponentConfigService cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ bind = "bindSadisService",
+ unbind = "unbindSadisService",
+ policy = ReferencePolicy.DYNAMIC)
+ protected volatile SadisService sadisService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
+ protected MeterService meterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
+ protected OltDeviceServiceInterface oltDeviceService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected LeadershipService leadershipService;
+ private final Logger log = getLogger(getClass());
+ protected BaseInformationService<BandwidthProfileInformation> bpService;
+ private ApplicationId appId;
+ private static final String APP_NAME = "org.opencord.olt";
+ private final ReentrantReadWriteLock programmedMeterLock = new ReentrantReadWriteLock();
+ private final Lock programmedMeterWriteLock = programmedMeterLock.writeLock();
+ private final Lock programmedMeterReadLock = programmedMeterLock.readLock();
+
+ /**
+ * Programmed Meters status map.
+ * Keeps track of which meter is programmed on which device for which BandwidthProfile.
+ * The String key is the BandwidthProfile
+ */
+ protected Map<DeviceId, Map<String, MeterData>> programmedMeters;
+
+ private final MeterListener meterListener = new InternalMeterListener();
+ protected ExecutorService pendingRemovalMetersExecutor =
+ Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+ "pending-removal-meters-%d", log));
+
+ /**
+ * Map that contains a list of meters that needs to be removed.
+ * We wait to get 3 METER_REFERENCE_COUNT_ZERO events before removing the meter
+ * so that we're sure no flow is referencing it.
+ */
+ protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+
+ /**
+ * Number of consecutive meter events with empty reference count
+ * after which a meter gets removed from the device.
+ */
+ protected int zeroReferenceMeterCount = 3;
/**
* Delete meters when reference count drops to zero.
*/
protected boolean deleteMeters = DELETE_METERS_DEFAULT;
- /**
- * Number of Zero References received before deleting the meter.
- */
- protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
-
- private ApplicationId appId;
- private static final String APP_NAME = "org.opencord.olt";
-
- private final MeterListener meterListener = new InternalMeterListener();
-
- private final Logger log = getLogger(getClass());
-
- protected ExecutorService eventExecutor;
-
- protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
- protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
- protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
-
@Activate
public void activate(ComponentContext context) {
- eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
- "events-%d", log));
appId = coreService.registerApplication(APP_NAME);
modified(context);
-
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(List.class)
+ .register(MeterData.class)
+ .register(MeterState.class)
.register(MeterKey.class)
- .register(BandwidthProfileInformation.class)
.build();
- bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
- .withName("volt-bp-info-to-meter")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build();
-
- meterService.addListener(meterListener);
- componentConfigService.registerProperties(getClass());
- pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
- .withName("volt-pending-meters")
+ programmedMeters = storageService.<DeviceId, Map<String, MeterData>>consistentMapBuilder()
+ .withName("volt-programmed-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
+
pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
.withName("volt-pending-remove-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
- log.info("Olt Meter service started");
- }
- @Deactivate
- public void deactivate() {
- meterService.removeListener(meterListener);
- }
+ cfgService.registerProperties(getClass());
+ meterService.addListener(meterListener);
+
+ log.info("Started");
+ }
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
- Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+ Boolean d = Tools.isPropertyEnabled(properties, DELETE_METERS);
if (d != null) {
deleteMeters = d;
}
- String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
- zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
- Integer.parseInt(zeroReferenceMeterCountNew.trim());
+ String zeroCount = get(properties, ZERO_REFERENCE_METER_COUNT);
+ int oldSubscriberProcessingThreads = zeroReferenceMeterCount;
+ zeroReferenceMeterCount = isNullOrEmpty(zeroCount) ?
+ oldSubscriberProcessingThreads : Integer.parseInt(zeroCount.trim());
+ log.info("Modified. Values = deleteMeters: {}, zeroReferenceMeterCount: {}",
+ deleteMeters, zeroReferenceMeterCount);
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ cfgService.unregisterProperties(getClass(), false);
+ meterService.removeListener(meterListener);
+ log.info("Stopped");
}
@Override
- public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
- return bpInfoToMeter.stream()
- .collect(collectingAndThen(
- groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
- ImmutableMap::copyOf));
+ public Map<DeviceId, Map<String, MeterData>> getProgrammedMeters() {
+ try {
+ programmedMeterReadLock.lock();
+ return ImmutableMap.copyOf(programmedMeters);
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
}
- boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
- log.debug("adding bp {} to meter {} mapping for device {}",
- bandwidthProfile, meterId, deviceId);
- return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
+ /**
+ * Will create a meter if needed and return true once available.
+ *
+ * @param deviceId DeviceId
+ * @param bandwidthProfile Bandwidth Profile Id
+ * @return true
+ */
+ @Override
+ public synchronized boolean createMeter(DeviceId deviceId, String bandwidthProfile) {
+
+ // NOTE it is possible that hasMeterByBandwidthProfile returns false has the meter is in PENDING_ADD
+ // then a different thread changes the meter to ADDED
+ // and thus hasPendingMeterByBandwidthProfile return false as well and we install the meter a second time
+ // this causes an inconsistency between the existing meter and meterId stored in the map
+
+ if (!hasMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+ // NOTE this is at trace level as it's constantly called by the queue processor
+ if (log.isTraceEnabled()) {
+ log.trace("Missing meter for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+ }
+
+ if (!hasPendingMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+ createMeterForBp(deviceId, bandwidthProfile);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Meter is not yet available for {} on device {}",
+ bandwidthProfile, deviceId);
+ }
+ return false;
+ }
+ log.debug("Meter found for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+ return true;
}
@Override
- public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
- if (bandwidthProfile == null) {
- log.warn("Bandwidth Profile requested is null");
- return null;
+ public boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si) {
+ // Each UniTagInformation has up to 4 meters,
+ // check and/or create all of them
+ AtomicBoolean waitingOnMeter = new AtomicBoolean();
+ waitingOnMeter.set(false);
+ Map<String, List<String>> pendingMeters = new HashMap<>();
+ si.uniTagList().forEach(uniTagInfo -> {
+ String serviceName = uniTagInfo.getServiceName();
+ pendingMeters.put(serviceName, new LinkedList<>());
+ String usBp = uniTagInfo.getUpstreamBandwidthProfile();
+ String dsBp = uniTagInfo.getDownstreamBandwidthProfile();
+ String oltUBp = uniTagInfo.getDownstreamOltBandwidthProfile();
+ String oltDsBp = uniTagInfo.getUpstreamOltBandwidthProfile();
+ if (!createMeter(deviceId, usBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, dsBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, oltUBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, oltDsBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ });
+ if (waitingOnMeter.get()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Meters {} on device {} are not " +
+ "installed yet (requested by subscriber {})",
+ pendingMeters, deviceId, si.id());
+ }
+ return false;
}
- if (bpInfoToMeter.get(bandwidthProfile) == null) {
- log.warn("Bandwidth Profile '{}' is not present in the map",
- bandwidthProfile);
- return null;
- }
- if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
- log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
- bandwidthProfile);
- return null;
- }
+ return true;
+ }
- Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
- .stream()
- .filter(meterKey -> meterKey.deviceId().equals(deviceId))
- .findFirst();
- if (meterKeyForDevice.isPresent()) {
- log.debug("Found meter {} for bandwidth profile {} on {}",
- meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
- return meterKeyForDevice.get().meterId();
- } else {
- log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
- bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
- return null;
+ /**
+ * Returns true if a meter is present in the programmed meters map, only if status is ADDED.
+ *
+ * @param deviceId the DeviceId on which to look for the meter
+ * @param bandwidthProfile the Bandwidth profile associated with this meter
+ * @return true if the meter is found
+ */
+ public boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return false;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("added metersOnDevice {}: {}", deviceId, metersOnDevice);
+ }
+ return metersOnDevice.get(bandwidthProfile) != null &&
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED);
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
+ }
+
+ public boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return false;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("pending metersOnDevice {}: {}", deviceId, metersOnDevice);
+ }
+ // NOTE that we check in order if the meter was ADDED and if it wasn't we check for PENDING_ADD
+ // it is possible that a different thread move the meter state from PENDING_ADD
+ // to ADDED between these two checks
+ // to avoid creating the meter twice we return true event if the meter is already added
+ return metersOnDevice.get(bandwidthProfile) != null && (
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED) ||
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.PENDING_ADD)
+ );
+
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
+ }
+
+ public MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return null;
+ }
+ MeterData meterData = metersOnDevice.get(bandwidthProfile);
+ if (meterData == null || meterData.getMeterStatus() != MeterState.ADDED) {
+ return null;
+ }
+ if (log.isTraceEnabled()) {
+ log.debug("Found meter {} on device {} for bandwidth profile {}",
+ meterData.getMeterId(), deviceId, bandwidthProfile);
+ }
+ return meterData.getMeterId();
+ } finally {
+ programmedMeterReadLock.unlock();
}
}
@Override
- public ImmutableSet<MeterKey> getProgMeters() {
- return bpInfoToMeter.stream()
- .map(Map.Entry::getValue)
- .collect(ImmutableSet.toImmutableSet());
+ public void purgeDeviceMeters(DeviceId deviceId) {
+ log.debug("Purging meters on device {}", deviceId);
+ meterService.purgeMeters(deviceId);
+
+ // after we purge the meters we also need to clear the map
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.remove(deviceId);
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
+
+ // and clear the event count
+ // NOTE do we need a lock?
+ pendingRemoveMeters.remove(deviceId);
}
- @Override
- public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
- CompletableFuture<Object> meterFuture) {
- log.debug("Creating meter on {} for {}", deviceId, bpInfo);
+ /**
+ * Creates of a meter for a given Bandwidth Profile on a given device.
+ *
+ * @param deviceId the DeviceId
+ * @param bandwidthProfile the BandwidthProfile ID
+ */
+ public void createMeterForBp(DeviceId deviceId, String bandwidthProfile) {
+ // adding meter in pending state to the programmedMeter map
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.compute(deviceId, (d, deviceMeters) -> {
+
+ if (deviceMeters == null) {
+ deviceMeters = new HashMap<>();
+ }
+ // NOTE that this method is only called after verifying a
+ // meter for this BP does not already exist
+ MeterData meterData = new MeterData(
+ null,
+ MeterState.PENDING_ADD,
+ bandwidthProfile
+ );
+ deviceMeters.put(bandwidthProfile, meterData);
+
+ return deviceMeters;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
+
+ BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bandwidthProfile);
if (bpInfo == null) {
- log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
- meterFuture.complete(ObjectiveError.BADPARAMS);
- return null;
+ log.error("BandwidthProfile {} information not found in sadis", bandwidthProfile);
+ return;
}
- MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
- if (meterId != null) {
- log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
- meterFuture.complete(null);
- return meterId;
+ log.info("Creating meter for BandwidthProfile {} on device {}", bpInfo.id(), deviceId);
+
+ if (log.isTraceEnabled()) {
+ log.trace("BandwidthProfile: {}", bpInfo);
}
List<Band> meterBands = createMeterBands(bpInfo);
- final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
MeterRequest meterRequest = DefaultMeterRequest.builder()
.withBands(meterBands)
.withUnit(Meter.Unit.KB_PER_SEC)
.withContext(new MeterContext() {
@Override
public void onSuccess(MeterRequest op) {
- log.debug("Meter {} for {} is installed on the device {}",
- meterIdRef.get(), bpInfo.id(), deviceId);
- boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
- if (added) {
- meterFuture.complete(null);
- } else {
- log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
- meterIdRef.get(), bpInfo.id(), deviceId);
- meterFuture.complete(ObjectiveError.UNKNOWN);
- }
+ log.info("Meter for BandwidthProfile {} is installed on the device {}",
+ bandwidthProfile, deviceId);
+ meterFuture.complete(null);
}
@Override
public void onError(MeterRequest op, MeterFailReason reason) {
- log.error("Failed installing meter {} on {} for {}",
- meterIdRef.get(), deviceId, bpInfo.id());
- bpInfoToMeter.remove(bpInfo.id(),
- MeterKey.key(deviceId, meterIdRef.get()));
+ log.error("Failed installing meter on {} for {}",
+ deviceId, bandwidthProfile);
meterFuture.complete(reason);
}
})
@@ -300,63 +438,34 @@
.burst()
.add();
+ // creating the meter
Meter meter = meterService.submit(meterRequest);
- meterIdRef.set(meter.id());
- log.info("Meter {} created and sent for installation on {} for {}",
- meter.id(), deviceId, bpInfo);
- return meter.id();
- }
- @Override
- public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- if (deviceId == null) {
- return;
- }
- pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
- bwps.remove(bwpInfo);
- return bwps;
- });
- }
-
- @Override
- public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- if (bwpInfo == null) {
- log.debug("Bandwidth profile is null for device: {}", deviceId);
- return false;
- }
- if (pendingMeters.containsKey(deviceId)
- && pendingMeters.get(deviceId).contains(bwpInfo)) {
- log.debug("Meter is already pending on {} with bp {}",
- deviceId, bwpInfo);
- return false;
- }
- log.debug("Adding bandwidth profile {} to pending on {}",
- bwpInfo, deviceId);
- pendingMeters.compute(deviceId, (id, bwps) -> {
- if (bwps == null) {
- bwps = new HashSet<>();
+ // wait for the meter to be completed
+ meterFuture.thenAccept(error -> {
+ if (error != null) {
+ log.error("Cannot create meter, TODO address me");
}
- bwps.add(bwpInfo);
- return bwps;
+
+ // then update the map with the MeterId
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.compute(deviceId, (d, entry) -> {
+ if (entry != null) {
+ entry.compute(bandwidthProfile, (bp, meterData) -> {
+ if (meterData != null) {
+ meterData.setMeterCellId(meter.meterCellId());
+ meterData.setMeterStatus(MeterState.ADDED);
+ }
+ return meterData;
+ });
+ }
+ return entry;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
});
-
- return true;
- }
-
- @Override
- public void clearMeters(DeviceId deviceId) {
- log.debug("Removing all meters for device {}", deviceId);
- clearDeviceState(deviceId);
- meterService.purgeMeters(deviceId);
- }
-
- @Override
- public void clearDeviceState(DeviceId deviceId) {
- log.info("Clearing local device state for {}", deviceId);
- pendingRemoveMeters.remove(deviceId);
- removeMetersFromBpMapping(deviceId);
- //Following call handles cornercase of OLT delete during meter provisioning
- pendingMeters.remove(deviceId);
}
private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
@@ -408,119 +517,137 @@
.build();
}
- private void removeMeterFromBpMapping(MeterKey meterKey) {
- List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
- .filter(e -> e.getValue().equals(meterKey))
- .collect(Collectors.toList());
-
- meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
- }
-
- private void removeMetersFromBpMapping(DeviceId deviceId) {
- List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
- .filter(e -> e.getValue().deviceId().equals(deviceId))
- .collect(Collectors.toList());
-
- meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
- }
-
- /**
- * Checks for mastership or falls back to leadership on deviceId.
- * If the device is available use mastership,
- * otherwise fallback on leadership.
- * Leadership on the device topic is needed because the master can be NONE
- * in case the device went away, we still need to handle events
- * consistently
- */
- private boolean isLocalLeader(DeviceId deviceId) {
- if (deviceService.isAvailable(deviceId)) {
- return mastershipService.isLocalMaster(deviceId);
- } else {
- // Fallback with Leadership service - device id is used as topic
- NodeId leader = leadershipService.runForLeadership(
- deviceId.toString()).leaderNodeId();
- // Verify if this node is the leader
- return clusterService.getLocalNode().id().equals(leader);
+ private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+ if (!checkSadisRunning()) {
+ return null;
}
+ if (bandwidthProfile == null) {
+ return null;
+ }
+ return bpService.get(bandwidthProfile);
+ }
+
+ private boolean checkSadisRunning() {
+ if (bpService == null) {
+ log.warn("Sadis is not running");
+ return false;
+ }
+ return true;
}
private class InternalMeterListener implements MeterListener {
-
@Override
public void event(MeterEvent meterEvent) {
- eventExecutor.execute(() -> {
+ pendingRemovalMetersExecutor.execute(() -> {
+
Meter meter = meterEvent.subject();
- if (meter == null) {
- log.error("Meter in event {} is null", meterEvent);
+ if (!appId.equals(meter.appId())) {
return;
}
- if (isLocalLeader(meter.deviceId())) {
- MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
- if (deleteMeters &&
- MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
- log.info("Zero Count Meter Event is received. Meter is {} on {}",
- meter.id(), meter.deviceId());
- incrementMeterCount(meter.deviceId(), key);
- if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
- .get(key).get() == zeroReferenceMeterCount) {
- log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
- meter.id(), meter.deviceId());
+ if (log.isTraceEnabled()) {
+ log.trace("Received meter event {}", meterEvent);
+ }
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ if (meterEvent.type().equals(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO)) {
+ if (!oltDeviceService.isLocalLeader(meter.deviceId())) {
+ if (log.isTraceEnabled()) {
+ log.trace("ignoring meter event {} " +
+ "as not leader for {}", meterEvent, meter.deviceId());
+ }
+ return;
+ }
+ log.info("Zero Count Reference event is received for meter {} on {}, " +
+ "incrementing counter",
+ meter.id(), meter.deviceId());
+ incrementMeterCount(meter.deviceId(), key);
+ if (pendingRemoveMeters.get(meter.deviceId())
+ .get(key).get() == zeroReferenceMeterCount) {
+ // only delete the meters if the app is configured to do so
+ if (deleteMeters) {
+ log.info("Meter {} on device {} is unused, removing it", meter.id(), meter.deviceId());
deleteMeter(meter.deviceId(), meter.id());
}
}
- if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
- log.info("Meter Removed Event is received for {} on {}",
- meter.id(), meter.deviceId());
- pendingRemoveMeters.computeIfPresent(meter.deviceId(),
- (id, meters) -> {
- if (meters.get(key) == null) {
- log.info("Meters is not pending " +
- "{} on {}", key, id);
- return meters;
- }
- meters.remove(key);
- return meters;
- });
- removeMeterFromBpMapping(key);
- }
- } else {
- log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
+ }
+
+ if (meterEvent.type().equals(MeterEvent.Type.METER_REMOVED)) {
+ removeMeterCount(meter, key);
}
});
}
+ private void removeMeterCount(Meter meter, MeterKey key) {
+ pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+ (id, meters) -> {
+ if (meters.get(key) == null) {
+ log.info("Meters is not pending " +
+ "{} on {}", key, id);
+ return meters;
+ }
+ meters.remove(key);
+ return meters;
+ });
+ }
+
private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
if (key == null) {
return;
}
pendingRemoveMeters.compute(deviceId,
- (id, meters) -> {
- if (meters == null) {
- meters = new HashMap<>();
+ (id, meters) -> {
+ if (meters == null) {
+ meters = new HashMap<>();
- }
- if (meters.get(key) == null) {
- meters.put(key, new AtomicInteger(1));
- }
- meters.get(key).addAndGet(1);
- return meters;
- });
+ }
+ if (meters.get(key) == null) {
+ meters.put(key, new AtomicInteger(1));
+ }
+ meters.get(key).addAndGet(1);
+ return meters;
+ });
+ }
+ }
+
+ private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+ Meter meter = meterService.getMeter(deviceId, meterId);
+ if (meter != null) {
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meter.bands())
+ .withUnit(meter.unit())
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .remove();
+
+ meterService.withdraw(meterRequest, meterId);
}
- private void deleteMeter(DeviceId deviceId, MeterId meterId) {
- Meter meter = meterService.getMeter(deviceId, meterId);
- if (meter != null) {
- MeterRequest meterRequest = DefaultMeterRequest.builder()
- .withBands(meter.bands())
- .withUnit(meter.unit())
- .forDevice(deviceId)
- .fromApp(appId)
- .burst()
- .remove();
-
- meterService.withdraw(meterRequest, meterId);
- }
+ // remove the meter from local caching
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.computeIfPresent(deviceId, (d, deviceMeters) -> {
+ Iterator<Map.Entry<String, MeterData>> iter = deviceMeters.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, MeterData> entry = iter.next();
+ if (entry.getValue().getMeterId().equals(meterId)) {
+ deviceMeters.remove(entry.getKey());
+ }
+ }
+ return deviceMeters;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
}
}
+
+ protected void bindSadisService(SadisService service) {
+ this.bpService = service.getBandwidthProfileService();
+ log.info("Sadis service is loaded");
+ }
+
+ protected void unbindSadisService(SadisService service) {
+ this.bpService = null;
+ log.info("Sadis service is unloaded");
+ }
}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java
new file mode 100644
index 0000000..a10455b
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import java.util.Map;
+
+/**
+ * Interface for meter installation/removal methods
+ * for different types of bandwidth profiles.
+ */
+public interface OltMeterServiceInterface {
+ /**
+ * Checks for a meter, if not present it will create it and return false.
+ * @param deviceId DeviceId
+ * @param bandwidthProfile Bandwidth Profile Id
+ * @return boolean
+ */
+ boolean createMeter(DeviceId deviceId, String bandwidthProfile);
+
+ /**
+ * Checks for all the meters specified in the sadis uniTagList,
+ * if not present it will create them and return false.
+ * @param deviceId DeviceId
+ * @param si SubscriberAndDeviceInformation
+ * @return boolean
+ */
+ boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si);
+
+ /**
+ * Checks if a meter for the specified bandwidthProfile exists
+ * and is in ADDED state.
+ * @param deviceId DeviceId
+ * @param bandwidthProfileId bandwidth profile id
+ * @return true if present and in ADDED state
+ */
+ boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+ /**
+ * Checks if a meter for the specified bandwidthProfile exists
+ * and is in PENDING_ADD state.
+ * @param deviceId DeviceId
+ * @param bandwidthProfileId bandwidth profile id
+ * @return true if present and in PENDING_ADD state
+ */
+ boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+ /**
+ * Creates a meter on a device for the given BandwidthProfile Id.
+ * @param deviceId the device id
+ * @param bandwidthProfileId the bandwidth profile Id
+ */
+ void createMeterForBp(DeviceId deviceId, String bandwidthProfileId);
+
+ /**
+ * Returns the meter Id for a given bandwidth profile Id.
+ * @param deviceId the device id
+ * @param bandwidthProfileId the bandwidth profile Id
+ * @return the meter Id
+ */
+ MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+ /**
+ * Purges all the meters on a device.
+ * @param deviceId the device
+ */
+ void purgeDeviceMeters(DeviceId deviceId);
+
+ /**
+ * Return all programmed meters for all OLTs controlled by this ONOS cluster.
+ * @return a map, with the device keys, and entry of map with bp Id and corresponding meter
+ */
+ Map<DeviceId, Map<String, MeterData>> getProgrammedMeters();
+
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
new file mode 100644
index 0000000..655a0ac
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import java.util.Objects;
+
+/**
+ * OltPortStatus is used to keep track of the flow status for a subscriber service.
+ */
+public class OltPortStatus {
+ // TODO consider adding a lastUpdated field, it may help with debugging
+ public OltFlowService.OltFlowsStatus defaultEapolStatus;
+ public OltFlowService.OltFlowsStatus subscriberFlowsStatus;
+ // NOTE we need to keep track of the DHCP status as that is installed before the other flows
+ // if macLearning is enabled (DHCP is needed to learn the MacAddress from the host)
+ public OltFlowService.OltFlowsStatus dhcpStatus;
+
+ public OltPortStatus(OltFlowService.OltFlowsStatus defaultEapolStatus,
+ OltFlowService.OltFlowsStatus subscriberFlowsStatus,
+ OltFlowService.OltFlowsStatus dhcpStatus) {
+ this.defaultEapolStatus = defaultEapolStatus;
+ this.subscriberFlowsStatus = subscriberFlowsStatus;
+ this.dhcpStatus = dhcpStatus;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OltPortStatus that = (OltPortStatus) o;
+ return defaultEapolStatus == that.defaultEapolStatus
+ && subscriberFlowsStatus == that.subscriberFlowsStatus
+ && dhcpStatus == that.dhcpStatus;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(defaultEapolStatus, subscriberFlowsStatus, dhcpStatus);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("OltPortStatus{");
+ sb.append("defaultEapolStatus=").append(defaultEapolStatus);
+ sb.append(", subscriberFlowsStatus=").append(subscriberFlowsStatus);
+ sb.append(", dhcpStatus=").append(dhcpStatus);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
new file mode 100644
index 0000000..6300ff9
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Port;
+
+import static org.opencord.olt.impl.OltFlowService.FlowOperation.ADD;
+
+/**
+ * Utility class for OLT app.
+ */
+final class OltUtils {
+
+ private OltUtils() {
+ }
+
+ /**
+ * Returns the port name if present in the annotations.
+ * @param port the port
+ * @return the annotated port name
+ */
+ static String getPortName(Port port) {
+ String name = port.annotations().value(AnnotationKeys.PORT_NAME);
+ return name == null ? "" : name;
+ }
+
+ /**
+ * Returns a port printed as a connect point and with the name appended.
+ * @param port the port
+ * @return the formatted string
+ */
+ static String portWithName(Port port) {
+ return port.element().id().toString() + '/' +
+ port.number() + '[' +
+ getPortName(port) + ']';
+ }
+
+ static String flowOpToString(OltFlowService.FlowOperation op) {
+ return op == ADD ? "Adding" : "Removing";
+ }
+
+ static String completeFlowOpToString(OltFlowService.FlowOperation op) {
+ return op == ADD ? "Added" : "Removed";
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 3657436..a73a42b 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -58,15 +58,21 @@
public static final String ENABLE_PPPOE = "enablePppoe";
public static final boolean ENABLE_PPPOE_DEFAULT = false;
- public static final String EAPOL_DELETE_RETRY_MAX_ATTEMPS = "eapolDeleteRetryMaxAttempts";
- public static final int EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT = 3;
-
- public static final String PROVISION_DELAY = "provisionDelay";
- public static final int PROVISION_DELAY_DEFAULT = 100;
+ public static final String WAIT_FOR_REMOVAL = "waitForRemoval";
+ public static final boolean WAIT_FOR_REMOVAL_DEFAULT = true;
public static final String REQUIRED_DRIVERS_PROPERTY_DELAY = "requiredDriversPropertyDelay";
public static final int REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT = 5;
+ public static final String FLOW_PROCESSING_THREADS = "flowProcessingThreads";
+ public static final int FLOW_PROCESSING_THREADS_DEFAULT = 8;
+
+ public static final String SUBSCRIBER_PROCESSING_THREADS = "subscriberProcessingThreads";
+ public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 8;
+
+ public static final String REQUEUE_DELAY = "requeueDelay";
+ public static final int REQUEUE_DELAY_DEFAULT = 500;
+
public static final String UPSTREAM_ONU = "upstreamOnu";
public static final String UPSTREAM_OLT = "upstreamOlt";
public static final String DOWNSTREAM_ONU = "downstreamOnu";
diff --git a/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java b/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java
new file mode 100644
index 0000000..c878062
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Objects;
+
+/**
+ * SubscriberKey is used to identify the combination of a subscriber and a service.
+ */
+public class ServiceKey {
+ private AccessDevicePort port;
+ private UniTagInformation service;
+
+ public ServiceKey(AccessDevicePort port, UniTagInformation service) {
+ this.port = port;
+ this.service = service;
+ }
+
+ public AccessDevicePort getPort() {
+ return port;
+ }
+
+ public void setPort(AccessDevicePort port) {
+ this.port = port;
+ }
+
+ public UniTagInformation getService() {
+ return service;
+ }
+
+ public void setService(UniTagInformation service) {
+ this.service = service;
+ }
+
+ @Override
+ public String toString() {
+ return this.port.toString() + " - " + this.service.getServiceName();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServiceKey that = (ServiceKey) o;
+ boolean isPortEqual = Objects.equals(port, that.port);
+ boolean isServiceEqual = Objects.equals(service, that.service);
+
+ return isPortEqual && isServiceEqual;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(port, service);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java b/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java
new file mode 100644
index 0000000..4035bdd
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.Serializer;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.sadis.UniTagInformation;
+
+/**
+ * ServiceKeySerializer is a custom serializer to store a ServiceKey in an Atomix ditributed map.
+ */
+class ServiceKeySerializer extends Serializer<ServiceKey> {
+
+ ServiceKeySerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ServiceKey object) {
+ kryo.writeClassAndObject(output, object.getPort().connectPoint().port());
+ output.writeString(object.getPort().name());
+ output.writeString(object.getPort().connectPoint().deviceId().toString());
+ kryo.writeClassAndObject(output, object.getService());
+ }
+
+ @Override
+ public ServiceKey read(Kryo kryo, Input input, Class<ServiceKey> type) {
+ PortNumber port = (PortNumber) kryo.readClassAndObject(input);
+ final String portName = input.readString();
+ final String devId = input.readString();
+
+ UniTagInformation uti = (UniTagInformation) kryo.readClassAndObject(input);
+ ConnectPoint cp = new ConnectPoint(DeviceId.deviceId(devId), port);
+ AccessDevicePort adp = new AccessDevicePort(cp, portName);
+
+ return new ServiceKey(adp, uti);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
deleted file mode 100644
index 225e48f..0000000
--- a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.meter.MeterId;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.sadis.UniTagInformation;
-
-import java.util.Objects;
-
-/**
- * Contains the mapping of a given port to flow information, including bandwidth profile.
- */
-class SubscriberFlowInfo {
- private final DeviceId devId;
- private final AccessDevicePort nniPort;
- private final AccessDevicePort uniPort;
- private final UniTagInformation tagInfo;
- private MeterId downId;
- private MeterId upId;
- private MeterId downOltId;
- private MeterId upOltId;
- private final String downBpInfo;
- private final String upBpInfo;
- private final String downOltBpInfo;
- private final String upOltBpInfo;
-
- /**
- * Builds the mapper of information.
- * @param nniPort the nni port
- * @param uniPort the uni port
- * @param tagInfo the tag info
- * @param downId the downstream meter id
- * @param upId the upstream meter id
- * @param downOltId the downstream meter id of OLT device
- * @param upOltId the upstream meter id of OLT device
- * @param downBpInfo the downstream bandwidth profile
- * @param upBpInfo the upstream bandwidth profile
- * @param downOltBpInfo the downstream bandwidth profile of OLT device
- * @param upOltBpInfo the upstream bandwidth profile of OLT device
- */
- SubscriberFlowInfo(AccessDevicePort nniPort, AccessDevicePort uniPort,
- UniTagInformation tagInfo, MeterId downId, MeterId upId,
- MeterId downOltId, MeterId upOltId,
- String downBpInfo, String upBpInfo,
- String downOltBpInfo, String upOltBpInfo) {
- this.devId = uniPort.deviceId();
- this.nniPort = nniPort;
- this.uniPort = uniPort;
- this.tagInfo = tagInfo;
- this.downId = downId;
- this.upId = upId;
- this.downOltId = downOltId;
- this.upOltId = upOltId;
- this.downBpInfo = downBpInfo;
- this.upBpInfo = upBpInfo;
- this.downOltBpInfo = downOltBpInfo;
- this.upOltBpInfo = upOltBpInfo;
- }
-
- /**
- * Gets the device id of this subscriber and flow information.
- *
- * @return device id
- */
- DeviceId getDevId() {
- return devId;
- }
-
- /**
- * Gets the nni of this subscriber and flow information.
- *
- * @return nni port
- */
- AccessDevicePort getNniPort() {
- return nniPort;
- }
-
- /**
- * Gets the uni port of this subscriber and flow information.
- *
- * @return uni port
- */
- AccessDevicePort getUniPort() {
- return uniPort;
- }
-
- /**
- * Gets the tag of this subscriber and flow information.
- *
- * @return tag of the subscriber
- */
- UniTagInformation getTagInfo() {
- return tagInfo;
- }
-
- /**
- * Gets the downstream meter id of this subscriber and flow information.
- *
- * @return downstream meter id
- */
- MeterId getDownId() {
- return downId;
- }
-
- /**
- * Gets the upstream meter id of this subscriber and flow information.
- *
- * @return upstream meter id
- */
- MeterId getUpId() {
- return upId;
- }
-
- /**
- * Gets the downstream meter id of this subscriber and flow information of OLT device.
- *
- * @return downstream olt meter id
- */
- MeterId getDownOltId() {
- return downOltId;
- }
-
- /**
- * Gets the upstream meter id of this subscriber and flow information of OLT device.
- *
- * @return upstream olt meter id
- */
- MeterId getUpOltId() {
- return upOltId;
- }
-
- /**
- * Gets the downstream bandwidth profile of this subscriber and flow information.
- *
- * @return downstream bandwidth profile
- */
- String getDownBpInfo() {
- return downBpInfo;
- }
-
- /**
- * Gets the upstream bandwidth profile of this subscriber and flow information.
- *
- * @return upstream bandwidth profile.
- */
- String getUpBpInfo() {
- return upBpInfo;
- }
-
- /**
- * Gets the downstream bandwidth profile of this subscriber and flow information of OLT device.
- *
- * @return downstream OLT bandwidth profile
- */
- String getDownOltBpInfo() {
- return downOltBpInfo;
- }
-
- /**
- * Gets the upstream bandwidth profile of this subscriber and flow information of OLT device.
- *
- * @return upstream OLT bandwidth profile.
- */
- String getUpOltBpInfo() {
- return upOltBpInfo;
- }
-
- /**
- * Sets the upstream meter id.
- *
- * @param upMeterId the upstream meter id
- */
- void setUpMeterId(MeterId upMeterId) {
- this.upId = upMeterId;
- }
-
- /**
- * Sets the downstream meter id.
- *
- * @param downMeterId the downstream meter id
- */
- void setDownMeterId(MeterId downMeterId) {
- this.downId = downMeterId;
- }
-
- /**
- * Sets the upstream meter id of OLT.
- *
- * @param upOltMeterId the upstream meter id of OLT
- */
- void setUpOltMeterId(MeterId upOltMeterId) {
- this.upOltId = upOltMeterId;
- }
-
- /**
- * Sets the downstream meter id of OLT.
- *
- * @param downOltMeterId the downstream meter id of OLT
- */
- void setDownOltMeterId(MeterId downOltMeterId) {
- this.downOltId = downOltMeterId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SubscriberFlowInfo flowInfo = (SubscriberFlowInfo) o;
- return devId.equals(flowInfo.devId) &&
- nniPort.equals(flowInfo.nniPort) &&
- uniPort.equals(flowInfo.uniPort) &&
- tagInfo.equals(flowInfo.tagInfo) &&
- downBpInfo.equals(flowInfo.downBpInfo) &&
- upBpInfo.equals(flowInfo.upBpInfo) &&
- Objects.equals(downOltBpInfo, flowInfo.downOltBpInfo) &&
- Objects.equals(upOltBpInfo, flowInfo.upOltBpInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(devId, nniPort, uniPort, tagInfo, downBpInfo, upBpInfo, downOltBpInfo, upOltBpInfo);
- }
-
- @Override
- public String toString() {
- return com.google.common.base.MoreObjects.toStringHelper(this)
- .add("devId", devId)
- .add("nniPort", nniPort)
- .add("uniPort", uniPort)
- .add("tagInfo", tagInfo)
- .add("downId", downId)
- .add("upId", upId)
- .add("downOltId", downOltId)
- .add("upOltId", upOltId)
- .add("downBpInfo", downBpInfo)
- .add("upBpInfo", upBpInfo)
- .add("downOltBpInfo", downOltBpInfo)
- .add("upOltBpInfo", upOltBpInfo)
- .toString();
- }
-}
diff --git a/impl/src/main/java/org/opencord/olt/impl/package-info.java b/impl/src/main/java/org/opencord/olt/impl/package-info.java
index b5ba320..ccdfda7 100644
--- a/impl/src/main/java/org/opencord/olt/impl/package-info.java
+++ b/impl/src/main/java/org/opencord/olt/impl/package-info.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,6 +15,6 @@
*/
/**
- * OLT application handling PMC OLT hardware.
+ * ONOS application archetype.
*/
-package org.opencord.olt.impl;
+package org.opencord.olt.impl;
\ No newline at end of file