OLT ability to remove a subscriber
Change-Id: I5fee9dd8189ae374bf39b0a74da5bd33304a3346
diff --git a/src/main/java/org/onosproject/olt/Olt.java b/src/main/java/org/onosproject/olt/Olt.java
index fdf82a0..d3120dd 100644
--- a/src/main/java/org/onosproject/olt/Olt.java
+++ b/src/main/java/org/onosproject/olt/Olt.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.olt;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -52,6 +54,7 @@
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -89,11 +92,16 @@
private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0);
private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
- groupedThreads("onos/olt-service",
- "olt-installer-%d"));
+ groupedThreads("onos/olt-service",
+ "olt-installer-%d"));
private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
+ private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives =
+ Maps.newConcurrentMap();
+
+ private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
+
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
private static final Class<AccessDeviceConfig> CONFIG_CLASS =
@@ -102,11 +110,12 @@
private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
new ConfigFactory<DeviceId, AccessDeviceConfig>(
SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
- @Override
- public AccessDeviceConfig createConfig() {
- return new AccessDeviceConfig();
- }
- };
+ @Override
+ public AccessDeviceConfig createConfig() {
+ return new AccessDeviceConfig();
+ }
+ };
+
@Activate
public void activate() {
@@ -152,7 +161,68 @@
@Override
public void removeSubscriber(ConnectPoint port) {
- throw new UnsupportedOperationException();
+ AccessDeviceData olt = oltData.get(port.deviceId());
+
+ if (olt == null) {
+ log.warn("No data found for OLT device {}", port.deviceId());
+ return;
+ }
+
+ unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan());
+
+ }
+
+ private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
+ PortNumber subscriberPort, VlanId deviceVlan) {
+
+ //FIXME: This method is slightly ugly but it'll do until we have a better
+ // way to remove flows from the flow store.
+
+ CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+ CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+ ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+ VlanId subscriberVlan = subscribers.remove(cp);
+
+ Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
+
+ if (fwds == null || fwds.size() != 2) {
+ log.warn("Unknown or incomplete subscriber at {}", cp);
+ return;
+ }
+
+
+ fwds.stream().forEach(
+ fwd -> flowObjectiveService.forward(deviceId,
+ fwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ })));
+
+ upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+ if (upStatus == null && downStatus == null) {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED,
+ deviceId,
+ deviceVlan,
+ subscriberVlan));
+ } else if (downStatus != null) {
+ log.error("Subscriber with vlan {} on device {} " +
+ "on port {} failed downstream uninstallation: {}",
+ subscriberVlan, deviceId, subscriberPort, downStatus);
+ } else if (upStatus != null) {
+ log.error("Subscriber with vlan {} on device {} " +
+ "on port {} failed upstream uninstallation: {}",
+ subscriberVlan, deviceId, subscriberPort, upStatus);
+ }
+ }, oltInstallers);
}
@@ -190,46 +260,53 @@
.build();
- ForwardingObjective upFwd = DefaultForwardingObjective.builder()
+ ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder()
.withFlag(ForwardingObjective.Flag.VERSATILE)
.withPriority(1000)
.makePermanent()
.withSelector(upstream)
.fromApp(appId)
- .withTreatment(upstreamTreatment)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- upFuture.complete(null);
- }
+ .withTreatment(upstreamTreatment);
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- upFuture.complete(error);
- }
- });
- ForwardingObjective downFwd = DefaultForwardingObjective.builder()
+ ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
.withFlag(ForwardingObjective.Flag.VERSATILE)
.withPriority(1000)
.makePermanent()
.withSelector(downstream)
.fromApp(appId)
- .withTreatment(downstreamTreatment)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- downFuture.complete(null);
- }
+ .withTreatment(downstreamTreatment);
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- downFuture.complete(error);
- }
- });
+ ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
- flowObjectiveService.forward(deviceId, upFwd);
- flowObjectiveService.forward(deviceId, downFwd);
+ subscribers.put(cp, subscriberVlan);
+ objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
+
+
+ flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+
+ flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
if (upStatus == null && downStatus == null) {
@@ -288,20 +365,20 @@
public void event(NetworkConfigEvent event) {
switch (event.type()) {
- case CONFIG_ADDED:
- case CONFIG_UPDATED:
- if (event.configClass().equals(CONFIG_CLASS)) {
- AccessDeviceConfig config =
- networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
- if (config != null) {
- oltData.put(config.getOlt().deviceId(), config.getOlt());
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ if (event.configClass().equals(CONFIG_CLASS)) {
+ AccessDeviceConfig config =
+ networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
+ if (config != null) {
+ oltData.put(config.getOlt().deviceId(), config.getOlt());
+ }
}
- }
- break;
- case CONFIG_UNREGISTERED:
- case CONFIG_REMOVED:
- default:
- break;
+ break;
+ case CONFIG_UNREGISTERED:
+ case CONFIG_REMOVED:
+ default:
+ break;
}
}
}