Bug-fix: Remove flows from voltha for deleted subscriber, even if sadis cannot
fetch information about the deleted subscriber.
Other changes:
- Added a cli command that details subscribers that have actually been programmed in the data plane
- Changed all cli commands to start with volt-* as all app cli commands should
- removed DeviceIdCompleter from the 'olts' cli command as it is not needed
- removed unused method, renamed another one, fixed some logs
- S-tags are no longer 'DeviceVlans', so changed variable name
Change-Id: I7cfde850669ed5b581fbac3195f68da28c4514c3
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 3c5108e..e1d1dac 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -15,6 +15,25 @@
*/
package org.opencord.olt.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -64,24 +83,8 @@
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/**
* Provisions rules on access devices.
@@ -131,18 +134,21 @@
private ApplicationId appId;
- private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
- groupedThreads("onos/olt-service",
- "olt-installer-%d"));
+ private ExecutorService oltInstallers = Executors
+ .newFixedThreadPool(4, groupedThreads("onos/olt-service",
+ "olt-installer-%d"));
protected ExecutorService eventExecutor;
+ private Map<ConnectPoint, SubscriberAndDeviceInformation> programmedSubs;
+
@Activate
public void activate(ComponentContext context) {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt", "events-%d", log));
modified(context);
appId = coreService.registerApplication(APP_NAME);
componentConfigService.registerProperties(getClass());
+ programmedSubs = Maps.newConcurrentMap();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
@@ -210,14 +216,16 @@
if (enableDhcpOnProvisioning) {
processDhcpFilteringObjectives(port.deviceId(), port.port(), true);
}
-
+ log.info("Programming vlans for subscriber: {}", sub);
Optional<VlanId> defaultVlan = Optional.empty();
- provisionVlans(port.deviceId(), uplinkPort.number(), port.port(), sub.cTag(), sub.sTag(),
- defaultVlan);
+ provisionVlans(port.deviceId(), uplinkPort.number(), port.port(),
+ sub.cTag(), sub.sTag(), defaultVlan);
if (enableIgmpOnProvisioning) {
processIgmpFilteringObjectives(port.deviceId(), port.port(), true);
}
+ // cache subscriber info
+ programmedSubs.put(port, sub);
return true;
}
@@ -226,8 +234,13 @@
// Get the subscriber connected to this port from Sadis
SubscriberAndDeviceInformation subscriber = getSubscriber(port);
if (subscriber == null) {
- log.warn("Subscriber on port {} not found", port);
- return false;
+ log.warn("Subscriber on port {} not found in sadis .. checking "
+ + "local cache", port);
+ subscriber = programmedSubs.get(port);
+ if (subscriber == null) {
+ log.warn("Subscriber on port {} was not previously programmed", port);
+ return false;
+ }
}
// Get the uplink port
@@ -241,13 +254,15 @@
processDhcpFilteringObjectives(port.deviceId(), port.port(), false);
}
+ log.info("Removing programmed vlans for subscriber: {}", subscriber);
Optional<VlanId> defaultVlan = Optional.empty();
- unprovisionSubscriber(port.deviceId(), uplinkPort.number(), port.port(), subscriber.cTag(),
- subscriber.sTag(), defaultVlan);
+ unprovisionSubscriber(port.deviceId(), uplinkPort.number(), port.port(),
+ subscriber.cTag(), subscriber.sTag(), defaultVlan);
if (enableIgmpOnProvisioning) {
processIgmpFilteringObjectives(port.deviceId(), port.port(), false);
}
+ programmedSubs.remove(port);
return true;
}
@@ -279,9 +294,12 @@
public Collection<Map.Entry<ConnectPoint, Map.Entry<VlanId, VlanId>>> getSubscribers() {
ArrayList<Map.Entry<ConnectPoint, Map.Entry<VlanId, VlanId>>> subs = new ArrayList<>();
- // Get the subscribers for all the devices
+ // Get the subscribers for all the devices configured in sadis
// If the port is UNI, is enabled and exists in Sadis then copy it
for (Device d : deviceService.getDevices()) {
+ if (getOltInfo(d) == null) {
+ continue; // not an olt, or not configured in sadis
+ }
for (Port p: deviceService.getPorts(d.id())) {
if (isUniPort(d, p) && p.isEnabled()) {
ConnectPoint cp = new ConnectPoint(d.id(), p.number());
@@ -299,16 +317,18 @@
}
@Override
+ public ImmutableMap<ConnectPoint, SubscriberAndDeviceInformation> getProgSubs() {
+ return ImmutableMap.copyOf(programmedSubs);
+ }
+
+ @Override
public List<DeviceId> fetchOlts() {
// look through all the devices and find the ones that are OLTs as per Sadis
List<DeviceId> olts = new ArrayList<>();
Iterable<Device> devices = deviceService.getDevices();
for (Device d : devices) {
- String devSerialNo = d.serialNumber();
- SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
-
- if (deviceInfo != null) {
- // So this is indeed a OLT device
+ if (getOltInfo(d) != null) {
+ // So this is indeed an OLT device
olts.add(d.id());
}
}
@@ -336,31 +356,20 @@
return null;
}
- private void initializeUni(Port port) {
- DeviceId deviceId = (DeviceId) port.element().id();
-
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, deviceId, port));
-
- if (port.isEnabled()) {
- processFilteringObjectives(deviceId, port.number(), true);
- }
- }
-
private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
- PortNumber subscriberPort, VlanId subscriberVlan,
- VlanId deviceVlan, Optional<VlanId> defaultVlan) {
+ PortNumber subscriberPort, VlanId cVlan,
+ VlanId sVlan, Optional<VlanId> defaultVlan) {
CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
ForwardingObjective.Builder upFwd = upBuilder(uplink, subscriberPort,
- subscriberVlan, deviceVlan,
+ cVlan, sVlan,
defaultVlan);
ForwardingObjective.Builder downFwd = downBuilder(uplink, subscriberPort,
- subscriberVlan, deviceVlan,
+ cVlan, sVlan,
defaultVlan);
-
flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
@@ -389,16 +398,16 @@
if (upStatus == null && downStatus == null) {
post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED,
deviceId,
- deviceVlan,
- subscriberVlan));
+ sVlan,
+ cVlan));
} else if (downStatus != null) {
log.error("Subscriber with vlan {} on device {} " +
"on port {} failed downstream uninstallation: {}",
- subscriberVlan, deviceId, subscriberPort, downStatus);
+ cVlan, deviceId, subscriberPort, downStatus);
} else if (upStatus != null) {
log.error("Subscriber with vlan {} on device {} " +
"on port {} failed upstream uninstallation: {}",
- subscriberVlan, deviceId, subscriberPort, upStatus);
+ cVlan, deviceId, subscriberPort, upStatus);
}
}, oltInstallers);
@@ -519,7 +528,7 @@
.withTreatment(upstreamTreatment);
}
- private void processFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
+ private void processEapolFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
if (!mastershipService.isLocalMaster(devId)) {
return;
}
@@ -535,14 +544,15 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("Eapol filter for {} on {} installed.",
- devId, port);
+ log.info("Eapol filter for {} on {} {}.",
+ devId, port, (install) ? "installed" : "removed");
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("Eapol filter for {} on {} failed because {}",
- devId, port, error);
+ log.info("Eapol filter for {} on {} failed {} because {}",
+ devId, port, (install) ? "installation" : "removal",
+ error);
}
});
@@ -579,14 +589,15 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("LLDP filter for {} on {} installed.",
- devId, port);
+ log.info("LLDP filter for {} on {} {}.",
+ devId, port, (install) ? "installed" : "removed");
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("LLDP filter for {} on {} failed because {}",
- devId, port, error);
+ log.info("LLDP filter for {} on {} failed {} because {}",
+ devId, port, (install) ? "installation" : "removal",
+ error);
}
});
@@ -613,14 +624,15 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("DHCP filter for {} on {} installed.",
- devId, port);
+ log.info("DHCP filter for {} on {} {}.",
+ devId, port, (install) ? "installed" : "removed");
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("DHCP filter for {} on {} failed because {}",
- devId, port, error);
+ log.info("DHCP filter for {} on {} failed {} because {}",
+ devId, port, (install) ? "installation" : "removal",
+ error);
}
});
@@ -647,14 +659,15 @@
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("Igmp filter for {} on {} installed.",
- devId, port);
+ log.info("Igmp filter for {} on {} {}.",
+ devId, port, (install) ? "installed" : "removed");
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("Igmp filter for {} on {} failed because {}.",
- devId, port, error);
+ log.info("Igmp filter for {} on {} failed {} because {}.",
+ devId, port, (install) ? "installation" : "removal",
+ error);
}
});
@@ -681,7 +694,7 @@
// This is an OLT device as per Sadis, we create flows for UNI and NNI ports
for (Port p : deviceService.getPorts(dev.id())) {
if (isUniPort(dev, p)) {
- processFilteringObjectives(dev.id(), p.number(), true);
+ processEapolFilteringObjectives(dev.id(), p.number(), true);
} else {
processNniFilteringObjectives(dev.id(), p.number(), true);
}
@@ -704,18 +717,20 @@
String devSerialNo = dev.serialNumber();
SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
log.debug("getUplinkPort: deviceInfo {}", deviceInfo);
-
- if (deviceInfo != null) {
- // Return the port that has been configured as the uplink port of this OLT in Sadis
- for (Port p: deviceService.getPorts(dev.id())) {
- if (p.number().toLong() == deviceInfo.uplinkPort()) {
- log.debug("getUplinkPort: Found port {}", p);
- return p;
- }
+ if (deviceInfo == null) {
+ log.warn("Device {} is not configured in SADIS .. cannot fetch device"
+ + " info", dev.id());
+ return null;
+ }
+ // Return the port that has been configured as the uplink port of this OLT in Sadis
+ for (Port p: deviceService.getPorts(dev.id())) {
+ if (p.number().toLong() == deviceInfo.uplinkPort()) {
+ log.debug("getUplinkPort: Found port {}", p);
+ return p;
}
}
- log.debug("getUplinkPort: No uplink port found for OLT {}", dev);
+ log.debug("getUplinkPort: No uplink port found for OLT {}", dev.id());
return null;
}
@@ -772,7 +787,7 @@
post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
if (event.port().isEnabled()) {
- processFilteringObjectives(devId, event.port().number(), true);
+ processEapolFilteringObjectives(devId, event.port().number(), true);
}
} else {
checkAndCreateDeviceFlows(dev);
@@ -781,7 +796,7 @@
case PORT_REMOVED:
if (isUniPort(dev, event.port())) {
if (event.port().isEnabled()) {
- processFilteringObjectives(devId, event.port().number(), false);
+ processEapolFilteringObjectives(devId, event.port().number(), false);
removeSubscriber(new ConnectPoint(devId, event.port().number()));
}
@@ -795,10 +810,10 @@
}
if (event.port().isEnabled()) {
- processFilteringObjectives(devId, event.port().number(), true);
+ processEapolFilteringObjectives(devId, event.port().number(), true);
post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
} else {
- processFilteringObjectives(devId, event.port().number(), false);
+ processEapolFilteringObjectives(devId, event.port().number(), false);
post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
}
break;