VOL-1256: Support for transparent flows (with different Vlan IDs) on a ONU for DT FTTB Use case, after merge
Change-Id: I1fa25d826d87658e6951a2aa90f4577be81f301d
diff --git a/api/src/main/java/org/opencord/olt/AccessDeviceService.java b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
index 6387a03..2652a31 100644
--- a/api/src/main/java/org/opencord/olt/AccessDeviceService.java
+++ b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.onlab.packet.VlanId;
import org.onosproject.event.ListenerService;
@@ -54,17 +55,21 @@
* Provisions flows for the specific subscriber.
*
* @param subscriberId Identification of the subscriber
+ * @param sTag additional outer tag on this port
+ * @param cTag additional inner tag on this port
* @return true if successful false otherwise
*/
- boolean provisionSubscriber(AccessSubscriberId subscriberId);
+ boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag);
/**
* Removes flows for the specific subscriber.
*
* @param subscriberId Identification of the subscriber
+ * @param sTag additional outer tag on this port
+ * @param cTag additional inner tag on this port
* @return true if successful false otherwise
*/
- boolean removeSubscriber(AccessSubscriberId subscriberId);
+ boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag);
/**
* Returns information about the provisioned subscribers.
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index e1d1dac..2116c70 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -23,6 +23,7 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.AbstractMap;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Dictionary;
@@ -74,6 +75,10 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
import org.opencord.olt.AccessDeviceService;
@@ -83,6 +88,7 @@
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -97,6 +103,7 @@
private static final String APP_NAME = "org.opencord.olt";
private static final short DEFAULT_VLAN = 0;
+ private static final String ADDITIONAL_VLANS = "additional-vlans";
private final Logger log = getLogger(getClass());
@@ -118,6 +125,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SubscriberAndDeviceInformationService subsService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
@Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
label = "Default VLAN RG<->ONU traffic")
private int defaultVlan = DEFAULT_VLAN;
@@ -138,6 +148,8 @@
.newFixedThreadPool(4, groupedThreads("onos/olt-service",
"olt-installer-%d"));
+ private ConsistentMultimap<ConnectPoint, Map.Entry<VlanId, VlanId>> additionalVlans;
+
protected ExecutorService eventExecutor;
private Map<ConnectPoint, SubscriberAndDeviceInformation> programmedSubs;
@@ -159,6 +171,12 @@
checkAndCreateDeviceFlows(d);
}
+ additionalVlans = storageService.<ConnectPoint, Map.Entry<VlanId, VlanId>>consistentMultimapBuilder()
+ .withName(ADDITIONAL_VLANS)
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ AbstractMap.SimpleEntry.class))
+ .build();
+
deviceService.addListener(deviceListener);
log.info("Started with Application ID {}", appId.id());
@@ -262,12 +280,25 @@
if (enableIgmpOnProvisioning) {
processIgmpFilteringObjectives(port.deviceId(), port.port(), false);
}
+
+ // Remove if there are any flows for the additional Vlans
+ Collection<? extends Map.Entry<VlanId, VlanId>> vlansList = additionalVlans.get(port).value();
+
+ // Remove the flows for the additional vlans for this subscriber
+ for (Map.Entry<VlanId, VlanId> vlans : vlansList) {
+ unprovisionTransparentFlows(port.deviceId(), uplinkPort.number(), port.port(),
+ vlans.getValue(), vlans.getKey());
+
+ // Remove it from the map also
+ additionalVlans.remove(port, vlans);
+ }
+
programmedSubs.remove(port);
return true;
}
@Override
- public boolean provisionSubscriber(AccessSubscriberId subscriberId) {
+ public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag) {
// Check if we can find the connect point to which this subscriber is connected
ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
if (subsPort == null) {
@@ -275,11 +306,26 @@
return false;
}
- return provisionSubscriber(subsPort);
+ if (!sTag.isPresent() && !cTag.isPresent()) {
+ return provisionSubscriber(subsPort);
+ } else if (sTag.isPresent() && cTag.isPresent()) {
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+ if (uplinkPort == null) {
+ log.warn("No uplink port found for OLT device {}", subsPort.deviceId());
+ return false;
+ }
+
+ provisionTransparentFlows(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
+ cTag.get(), sTag.get());
+ return true;
+ } else {
+ log.warn("Provisioning failed for subscriber: {}", subscriberId);
+ return false;
+ }
}
@Override
- public boolean removeSubscriber(AccessSubscriberId subscriberId) {
+ public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag) {
// Check if we can find the connect point to which this subscriber is connected
ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
if (subsPort == null) {
@@ -287,7 +333,23 @@
return false;
}
- return removeSubscriber(subsPort);
+ if (!sTag.isPresent() && !cTag.isPresent()) {
+ return removeSubscriber(subsPort);
+ } else if (sTag.isPresent() && cTag.isPresent()) {
+ // Get the uplink port
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+ if (uplinkPort == null) {
+ log.warn("No uplink port found for OLT device {}", subsPort.deviceId());
+ return false;
+ }
+
+ unprovisionTransparentFlows(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
+ cTag.get(), sTag.get());
+ return true;
+ } else {
+ log.warn("Removing subscriber failed for: {}", subscriberId);
+ return false;
+ }
}
@Override
@@ -528,6 +590,164 @@
.withTreatment(upstreamTreatment);
}
+ private void provisionTransparentFlows(DeviceId deviceId, PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId innerVlan,
+ VlanId outerVlan) {
+
+ CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+ CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+ ForwardingObjective.Builder upFwd = transparentUpBuilder(uplinkPort, subscriberPort,
+ innerVlan, outerVlan);
+
+
+ ForwardingObjective.Builder downFwd = transparentDownBuilder(uplinkPort, subscriberPort,
+ innerVlan, outerVlan);
+
+ ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+ additionalVlans.put(cp, new AbstractMap.SimpleEntry(outerVlan, innerVlan));
+
+ flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+ flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
+
+ upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+ if (downStatus != null) {
+ log.error("Flow with innervlan {} and outerVlan {} on device {} " +
+ "on port {} failed downstream installation: {}",
+ innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
+ } else if (upStatus != null) {
+ log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+ "on port {} failed upstream installation: {}",
+ innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
+ }
+ }, oltInstallers);
+
+ }
+
+ private ForwardingObjective.Builder transparentDownBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId innerVlan,
+ VlanId outerVlan) {
+ TrafficSelector downstream = DefaultTrafficSelector.builder()
+ .matchVlanId(outerVlan)
+ .matchInPort(uplinkPort)
+ .matchInnerVlanId(innerVlan)
+ .build();
+
+ TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
+ .setOutput(subscriberPort)
+ .build();
+
+ return DefaultForwardingObjective.builder()
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withPriority(1000)
+ .makePermanent()
+ .withSelector(downstream)
+ .fromApp(appId)
+ .withTreatment(downstreamTreatment);
+ }
+
+ private ForwardingObjective.Builder transparentUpBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId innerVlan,
+ VlanId outerVlan) {
+ TrafficSelector upstream = DefaultTrafficSelector.builder()
+ .matchVlanId(outerVlan)
+ .matchInPort(subscriberPort)
+ .matchInnerVlanId(innerVlan)
+ .build();
+
+ TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
+ .setOutput(uplinkPort)
+ .build();
+
+ return DefaultForwardingObjective.builder()
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withPriority(1000)
+ .makePermanent()
+ .withSelector(upstream)
+ .fromApp(appId)
+ .withTreatment(upstreamTreatment);
+ }
+
+ private void unprovisionTransparentFlows(DeviceId deviceId, PortNumber uplink,
+ PortNumber subscriberPort, VlanId innerVlan,
+ VlanId outerVlan) {
+
+ ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+ additionalVlans.remove(cp, new AbstractMap.SimpleEntry(outerVlan, innerVlan));
+
+ CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+ CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+ ForwardingObjective.Builder upFwd = transparentUpBuilder(uplink, subscriberPort,
+ innerVlan, outerVlan);
+ ForwardingObjective.Builder downFwd = transparentDownBuilder(uplink, subscriberPort,
+ innerVlan, outerVlan);
+
+
+ flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+ flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
+
+ upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+ if (downStatus != null) {
+ log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+ "on port {} failed downstream uninstallation: {}",
+ innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
+ } else if (upStatus != null) {
+ log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+ "on port {} failed upstream uninstallation: {}",
+ innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
+ }
+ }, oltInstallers);
+
+ }
+
private void processEapolFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
if (!mastershipService.isLocalMaster(devId)) {
return;
diff --git a/app/src/main/java/org/opencord/olt/rest/OltWebResource.java b/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
index 4a786f0..1631b09 100644
--- a/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
+++ b/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
@@ -15,6 +15,7 @@
*/
package org.opencord.olt.rest;
+import org.onlab.packet.VlanId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
@@ -22,6 +23,7 @@
import org.opencord.olt.AccessDeviceService;
import org.opencord.olt.AccessSubscriberId;
+import java.util.Optional;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -93,7 +95,33 @@
@PathParam("portName")String portName) {
AccessDeviceService service = get(AccessDeviceService.class);
- if (service.provisionSubscriber(new AccessSubscriberId(portName))) {
+ Optional<VlanId> emptyVlan = Optional.empty();
+ if (service.provisionSubscriber(new AccessSubscriberId(portName), emptyVlan, emptyVlan)) {
+ return ok("").build();
+ }
+ return Response.status(NOT_FOUND).build();
+ }
+
+ /**
+ * Provision service with particular tags for a subscriber.
+ *
+ * @param portName Name of the port on which the subscriber is connected
+ * @param sTagVal additional outer tag on this port
+ * @param cTagVal additional innter tag on this port
+ * @return 200 OK or 404 NOT_FOUND
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("services/{portName}/{sTag}/{cTag}")
+ public Response provisionAdditionalVlans(
+ @PathParam("portName")String portName,
+ @PathParam("sTag")String sTagVal,
+ @PathParam("cTag")String cTagVal) {
+ AccessDeviceService service = get(AccessDeviceService.class);
+ VlanId cTag = VlanId.vlanId(cTagVal);
+ VlanId sTag = VlanId.vlanId(sTagVal);
+
+ if (service.provisionSubscriber(new AccessSubscriberId(portName), Optional.of(sTag), Optional.of(cTag))) {
return ok("").build();
}
return Response.status(NOT_FOUND).build();
@@ -112,7 +140,33 @@
@PathParam("portName")String portName) {
AccessDeviceService service = get(AccessDeviceService.class);
- if (service.removeSubscriber(new AccessSubscriberId(portName))) {
+ Optional<VlanId> emptyVlan = Optional.empty();
+ if (service.removeSubscriber(new AccessSubscriberId(portName), emptyVlan, emptyVlan)) {
+ return ok("").build();
+ }
+ return Response.status(NOT_FOUND).build();
+ }
+
+ /**
+ * Removes additional vlans of a particular subscriber.
+ *
+ * @param portName Name of the port on which the subscriber is connected
+ * @param sTagVal additional outer tag on this port which needs to be removed
+ * @param cTagVal additional inner tag on this port which needs to be removed
+ * @return 200 OK or 404 NOT_FOUND
+ */
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("services/{portName}/{sTag}/{cTag}")
+ public Response removeAdditionalVlans(
+ @PathParam("portName")String portName,
+ @PathParam("sTag")String sTagVal,
+ @PathParam("cTag")String cTagVal) {
+ AccessDeviceService service = get(AccessDeviceService.class);
+ VlanId cTag = VlanId.vlanId(cTagVal);
+ VlanId sTag = VlanId.vlanId(sTagVal);
+
+ if (service.removeSubscriber(new AccessSubscriberId(portName), Optional.of(sTag), Optional.of(cTag))) {
return ok("").build();
}
return Response.status(NOT_FOUND).build();