VOL-1256: Support for transparent flows (with different Vlan IDs) on a ONU for DT FTTB Use case, after merge

Change-Id: I1fa25d826d87658e6951a2aa90f4577be81f301d
diff --git a/api/src/main/java/org/opencord/olt/AccessDeviceService.java b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
index 6387a03..2652a31 100644
--- a/api/src/main/java/org/opencord/olt/AccessDeviceService.java
+++ b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
@@ -19,6 +19,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.onlab.packet.VlanId;
 import org.onosproject.event.ListenerService;
@@ -54,17 +55,21 @@
      * Provisions flows for the specific subscriber.
      *
      * @param subscriberId Identification of the subscriber
+     * @param sTag additional outer tag on this port
+     * @param cTag additional inner tag on this port
      * @return true if successful false otherwise
      */
-    boolean provisionSubscriber(AccessSubscriberId subscriberId);
+    boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag);
 
     /**
      * Removes flows for the specific subscriber.
      *
      * @param subscriberId Identification of the subscriber
+     * @param sTag additional outer tag on this port
+     * @param cTag additional inner tag on this port
      * @return true if successful false otherwise
      */
-    boolean removeSubscriber(AccessSubscriberId subscriberId);
+    boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag);
 
     /**
      * Returns information about the provisioned subscribers.
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index e1d1dac..2116c70 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -23,6 +23,7 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.AbstractMap;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Dictionary;
@@ -74,6 +75,10 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
@@ -83,6 +88,7 @@
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
@@ -97,6 +103,7 @@
     private static final String APP_NAME = "org.opencord.olt";
 
     private static final short DEFAULT_VLAN = 0;
+    private static final String ADDITIONAL_VLANS = "additional-vlans";
 
     private final Logger log = getLogger(getClass());
 
@@ -118,6 +125,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected SubscriberAndDeviceInformationService subsService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
     @Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
             label = "Default VLAN RG<->ONU traffic")
     private int defaultVlan = DEFAULT_VLAN;
@@ -138,6 +148,8 @@
             .newFixedThreadPool(4, groupedThreads("onos/olt-service",
                                                   "olt-installer-%d"));
 
+    private ConsistentMultimap<ConnectPoint, Map.Entry<VlanId, VlanId>> additionalVlans;
+
     protected ExecutorService eventExecutor;
 
     private Map<ConnectPoint, SubscriberAndDeviceInformation> programmedSubs;
@@ -159,6 +171,12 @@
             checkAndCreateDeviceFlows(d);
         }
 
+        additionalVlans = storageService.<ConnectPoint, Map.Entry<VlanId, VlanId>>consistentMultimapBuilder()
+                .withName(ADDITIONAL_VLANS)
+                .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+                        AbstractMap.SimpleEntry.class))
+                .build();
+
         deviceService.addListener(deviceListener);
 
         log.info("Started with Application ID {}", appId.id());
@@ -262,12 +280,25 @@
         if (enableIgmpOnProvisioning) {
             processIgmpFilteringObjectives(port.deviceId(), port.port(), false);
         }
+
+        // Remove if there are any flows for the additional Vlans
+        Collection<? extends Map.Entry<VlanId, VlanId>> vlansList = additionalVlans.get(port).value();
+
+        // Remove the flows for the additional vlans for this subscriber
+        for (Map.Entry<VlanId, VlanId> vlans : vlansList) {
+            unprovisionTransparentFlows(port.deviceId(), uplinkPort.number(), port.port(),
+                    vlans.getValue(), vlans.getKey());
+
+            // Remove it from the map also
+            additionalVlans.remove(port, vlans);
+        }
+
         programmedSubs.remove(port);
         return true;
     }
 
     @Override
-    public boolean provisionSubscriber(AccessSubscriberId subscriberId) {
+    public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag) {
         // Check if we can find the connect point to which this subscriber is connected
         ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
         if (subsPort == null) {
@@ -275,11 +306,26 @@
             return false;
         }
 
-        return provisionSubscriber(subsPort);
+        if (!sTag.isPresent() && !cTag.isPresent()) {
+            return provisionSubscriber(subsPort);
+        } else if (sTag.isPresent() && cTag.isPresent()) {
+            Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+            if (uplinkPort == null) {
+                log.warn("No uplink port found for OLT device {}", subsPort.deviceId());
+                return false;
+            }
+
+            provisionTransparentFlows(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
+                    cTag.get(), sTag.get());
+            return true;
+        } else {
+            log.warn("Provisioning failed for subscriber: {}", subscriberId);
+            return false;
+        }
     }
 
     @Override
-    public boolean removeSubscriber(AccessSubscriberId subscriberId) {
+    public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag, Optional<VlanId> cTag) {
         // Check if we can find the connect point to which this subscriber is connected
         ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
         if (subsPort == null) {
@@ -287,7 +333,23 @@
             return false;
         }
 
-        return removeSubscriber(subsPort);
+        if (!sTag.isPresent() && !cTag.isPresent()) {
+            return removeSubscriber(subsPort);
+        } else if (sTag.isPresent() && cTag.isPresent()) {
+            // Get the uplink port
+            Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+            if (uplinkPort == null) {
+                log.warn("No uplink port found for OLT device {}", subsPort.deviceId());
+                return false;
+            }
+
+            unprovisionTransparentFlows(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
+                    cTag.get(), sTag.get());
+            return true;
+        } else {
+            log.warn("Removing subscriber failed for: {}", subscriberId);
+            return false;
+        }
     }
 
     @Override
@@ -528,6 +590,164 @@
                 .withTreatment(upstreamTreatment);
     }
 
+    private void provisionTransparentFlows(DeviceId deviceId, PortNumber uplinkPort,
+                                           PortNumber subscriberPort,
+                                           VlanId innerVlan,
+                                           VlanId outerVlan) {
+
+        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+        ForwardingObjective.Builder upFwd = transparentUpBuilder(uplinkPort, subscriberPort,
+                innerVlan, outerVlan);
+
+
+        ForwardingObjective.Builder downFwd = transparentDownBuilder(uplinkPort, subscriberPort,
+                innerVlan, outerVlan);
+
+        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+        additionalVlans.put(cp, new AbstractMap.SimpleEntry(outerVlan, innerVlan));
+
+        flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                upFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                upFuture.complete(error);
+            }
+        }));
+
+        flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                downFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                downFuture.complete(error);
+            }
+        }));
+
+        upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+            if (downStatus != null) {
+                log.error("Flow with innervlan {} and outerVlan {} on device {} " +
+                        "on port {} failed downstream installation: {}",
+                        innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
+            } else if (upStatus != null) {
+                log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+                        "on port {} failed upstream installation: {}",
+                        innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
+            }
+        }, oltInstallers);
+
+    }
+
+    private ForwardingObjective.Builder transparentDownBuilder(PortNumber uplinkPort,
+                                                               PortNumber subscriberPort,
+                                                               VlanId innerVlan,
+                                                               VlanId outerVlan) {
+        TrafficSelector downstream = DefaultTrafficSelector.builder()
+                .matchVlanId(outerVlan)
+                .matchInPort(uplinkPort)
+                .matchInnerVlanId(innerVlan)
+                .build();
+
+        TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
+                .setOutput(subscriberPort)
+                .build();
+
+        return DefaultForwardingObjective.builder()
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withPriority(1000)
+                .makePermanent()
+                .withSelector(downstream)
+                .fromApp(appId)
+                .withTreatment(downstreamTreatment);
+    }
+
+    private ForwardingObjective.Builder transparentUpBuilder(PortNumber uplinkPort,
+                                                             PortNumber subscriberPort,
+                                                             VlanId innerVlan,
+                                                             VlanId outerVlan) {
+        TrafficSelector upstream = DefaultTrafficSelector.builder()
+                .matchVlanId(outerVlan)
+                .matchInPort(subscriberPort)
+                .matchInnerVlanId(innerVlan)
+                .build();
+
+        TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
+                .setOutput(uplinkPort)
+                .build();
+
+        return DefaultForwardingObjective.builder()
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withPriority(1000)
+                .makePermanent()
+                .withSelector(upstream)
+                .fromApp(appId)
+                .withTreatment(upstreamTreatment);
+    }
+
+    private void unprovisionTransparentFlows(DeviceId deviceId, PortNumber uplink,
+                                             PortNumber subscriberPort, VlanId innerVlan,
+                                             VlanId outerVlan) {
+
+        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+
+        additionalVlans.remove(cp, new AbstractMap.SimpleEntry(outerVlan, innerVlan));
+
+        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+        ForwardingObjective.Builder upFwd = transparentUpBuilder(uplink, subscriberPort,
+                innerVlan, outerVlan);
+        ForwardingObjective.Builder downFwd = transparentDownBuilder(uplink, subscriberPort,
+                innerVlan, outerVlan);
+
+
+        flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                upFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                upFuture.complete(error);
+            }
+        }));
+
+        flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                downFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                downFuture.complete(error);
+            }
+        }));
+
+        upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+            if (downStatus != null) {
+                log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+                        "on port {} failed downstream uninstallation: {}",
+                        innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
+            } else if (upStatus != null) {
+                log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
+                        "on port {} failed upstream uninstallation: {}",
+                        innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
+            }
+        }, oltInstallers);
+
+    }
+
     private void processEapolFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
         if (!mastershipService.isLocalMaster(devId)) {
             return;
diff --git a/app/src/main/java/org/opencord/olt/rest/OltWebResource.java b/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
index 4a786f0..1631b09 100644
--- a/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
+++ b/app/src/main/java/org/opencord/olt/rest/OltWebResource.java
@@ -15,6 +15,7 @@
  */
 package org.opencord.olt.rest;
 
+import org.onlab.packet.VlanId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
@@ -22,6 +23,7 @@
 import org.opencord.olt.AccessDeviceService;
 import org.opencord.olt.AccessSubscriberId;
 
+import java.util.Optional;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -93,7 +95,33 @@
             @PathParam("portName")String portName) {
         AccessDeviceService service = get(AccessDeviceService.class);
 
-        if (service.provisionSubscriber(new AccessSubscriberId(portName))) {
+        Optional<VlanId> emptyVlan = Optional.empty();
+        if (service.provisionSubscriber(new AccessSubscriberId(portName), emptyVlan, emptyVlan)) {
+            return ok("").build();
+        }
+        return Response.status(NOT_FOUND).build();
+    }
+
+    /**
+     * Provision service with particular tags for a subscriber.
+     *
+     * @param portName Name of the port on which the subscriber is connected
+     * @param sTagVal additional outer tag on this port
+     * @param cTagVal additional innter tag on this port
+     * @return 200 OK or 404 NOT_FOUND
+     */
+    @POST
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("services/{portName}/{sTag}/{cTag}")
+    public Response provisionAdditionalVlans(
+            @PathParam("portName")String portName,
+            @PathParam("sTag")String sTagVal,
+            @PathParam("cTag")String cTagVal) {
+        AccessDeviceService service = get(AccessDeviceService.class);
+        VlanId cTag = VlanId.vlanId(cTagVal);
+        VlanId sTag = VlanId.vlanId(sTagVal);
+
+        if (service.provisionSubscriber(new AccessSubscriberId(portName), Optional.of(sTag), Optional.of(cTag))) {
             return ok("").build();
         }
         return Response.status(NOT_FOUND).build();
@@ -112,7 +140,33 @@
             @PathParam("portName")String portName) {
         AccessDeviceService service = get(AccessDeviceService.class);
 
-        if (service.removeSubscriber(new AccessSubscriberId(portName))) {
+        Optional<VlanId> emptyVlan = Optional.empty();
+        if (service.removeSubscriber(new AccessSubscriberId(portName), emptyVlan, emptyVlan)) {
+            return ok("").build();
+        }
+        return Response.status(NOT_FOUND).build();
+    }
+
+    /**
+     * Removes additional vlans of a particular subscriber.
+     *
+     * @param portName Name of the port on which the subscriber is connected
+     * @param sTagVal additional outer tag on this port which needs to be removed
+     * @param cTagVal additional inner tag on this port which needs to be removed
+     * @return 200 OK or 404 NOT_FOUND
+     */
+    @DELETE
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("services/{portName}/{sTag}/{cTag}")
+    public Response removeAdditionalVlans(
+            @PathParam("portName")String portName,
+            @PathParam("sTag")String sTagVal,
+            @PathParam("cTag")String cTagVal) {
+        AccessDeviceService service = get(AccessDeviceService.class);
+        VlanId cTag = VlanId.vlanId(cTagVal);
+        VlanId sTag = VlanId.vlanId(sTagVal);
+
+        if (service.removeSubscriber(new AccessSubscriberId(portName), Optional.of(sTag), Optional.of(cTag))) {
             return ok("").build();
         }
         return Response.status(NOT_FOUND).build();