fix removal of rules in a distributed setting

Change-Id: I44cb49990b8051f5f1542c11cbda6846049906e3
diff --git a/app/src/main/java/org/onosproject/olt/impl/Olt.java b/app/src/main/java/org/onosproject/olt/impl/Olt.java
index 301039f..96573f2 100644
--- a/app/src/main/java/org/onosproject/olt/impl/Olt.java
+++ b/app/src/main/java/org/onosproject/olt/impl/Olt.java
@@ -16,7 +16,6 @@
 package org.onosproject.olt.impl;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -61,6 +60,9 @@
 import org.onosproject.olt.AccessDeviceEvent;
 import org.onosproject.olt.AccessDeviceListener;
 import org.onosproject.olt.AccessDeviceService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
@@ -69,7 +71,6 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -90,6 +91,7 @@
         implements AccessDeviceService {
 
     private static final short DEFAULT_VLAN = 0;
+    private static final String SUBSCRIBERS = "existing-subscribers";
 
     private final Logger log = getLogger(getClass());
 
@@ -108,6 +110,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
 
     @Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
             label = "Default VLAN RG<->ONU traffic")
@@ -123,10 +127,7 @@
 
     private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
 
-    private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives =
-            Maps.newConcurrentMap();
-
-    private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
+    private Map<ConnectPoint, VlanId> subscribers;
 
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
@@ -172,6 +173,11 @@
                 .forEach(p -> processFilteringObjectives((DeviceId) p.element().id(),
                                                          p.number(), true));
 
+        subscribers = storageService.<ConnectPoint, VlanId>consistentMapBuilder()
+                .withName(SUBSCRIBERS)
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build().asJavaMap();
+
         deviceService.addListener(deviceListener);
 
         log.info("Started with Application ID {}", appId.id());
@@ -220,7 +226,15 @@
             return;
         }
 
-        unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan());
+        VlanId subscriberVlan = subscribers.remove(port);
+
+        if (subscriberVlan == null) {
+            log.warn("Unknown subscriber at location {}", port);
+            return;
+        }
+
+        unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), subscriberVlan,
+                              olt.vlan(), olt.defaultVlan());
 
     }
 
@@ -230,39 +244,43 @@
     }
 
     private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
-                                       PortNumber subscriberPort, VlanId deviceVlan) {
-
-        //FIXME: This method is slightly ugly but it'll do until we have a better
-        // way to remove flows from the flow store.
+                                       PortNumber subscriberPort, VlanId subscriberVlan,
+                                       VlanId deviceVlan, Optional<VlanId> defaultVlan) {
 
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
         CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
 
-        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-
-        VlanId subscriberVlan = subscribers.remove(cp);
-
-        Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
-
-        if (fwds == null || fwds.size() != 2) {
-            log.warn("Unknown or incomplete subscriber at {}", cp);
-            return;
-        }
+        ForwardingObjective.Builder upFwd = upBuilder(uplink, subscriberPort,
+                                                      subscriberVlan, deviceVlan,
+                                                      defaultVlan);
+        ForwardingObjective.Builder downFwd = downBuilder(uplink, subscriberPort,
+                                                          subscriberVlan, deviceVlan,
+                                                          defaultVlan);
 
 
-        fwds.stream().forEach(
-                fwd -> flowObjectiveService.forward(deviceId,
-                                                    fwd.remove(new ObjectiveContext() {
-                                                        @Override
-                                                        public void onSuccess(Objective objective) {
-                                                            upFuture.complete(null);
-                                                        }
+        flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                upFuture.complete(null);
+            }
 
-                                                        @Override
-                                                        public void onError(Objective objective, ObjectiveError error) {
-                                                            upFuture.complete(error);
-                                                        }
-                                                    })));
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                upFuture.complete(error);
+            }
+        }));
+
+        flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                downFuture.complete(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                downFuture.complete(error);
+            }
+        }));
 
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
@@ -291,53 +309,17 @@
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
         CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
 
-        TrafficSelector upstream = DefaultTrafficSelector.builder()
-                .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
-                .matchInPort(subscriberPort)
-                .build();
-
-        TrafficSelector downstream = DefaultTrafficSelector.builder()
-                .matchVlanId(deviceVlan)
-                .matchInPort(uplinkPort)
-                .matchInnerVlanId(subscriberVlan)
-                .build();
-
-        TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
-                .pushVlan()
-                .setVlanId(subscriberVlan)
-                .pushVlan()
-                .setVlanId(deviceVlan)
-                .setOutput(uplinkPort)
-                .build();
-
-        TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
-                .popVlan()
-                .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
-                .setOutput(subscriberPort)
-                .build();
+        ForwardingObjective.Builder upFwd = upBuilder(uplinkPort, subscriberPort,
+                                                      subscriberVlan, deviceVlan,
+                                                      defaultVlan);
 
 
-        ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder()
-                .withFlag(ForwardingObjective.Flag.VERSATILE)
-                .withPriority(1000)
-                .makePermanent()
-                .withSelector(upstream)
-                .fromApp(appId)
-                .withTreatment(upstreamTreatment);
-
-
-        ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
-                .withFlag(ForwardingObjective.Flag.VERSATILE)
-                .withPriority(1000)
-                .makePermanent()
-                .withSelector(downstream)
-                .fromApp(appId)
-                .withTreatment(downstreamTreatment);
+        ForwardingObjective.Builder downFwd = downBuilder(uplinkPort, subscriberPort,
+                                                          subscriberVlan, deviceVlan,
+                                                          defaultVlan);
 
         ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-
         subscribers.put(cp, subscriberVlan);
-        objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
 
         flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
             @Override
@@ -383,6 +365,60 @@
 
     }
 
+    private ForwardingObjective.Builder downBuilder(PortNumber uplinkPort,
+                                                    PortNumber subscriberPort,
+                                                    VlanId subscriberVlan,
+                                                    VlanId deviceVlan,
+                                                    Optional<VlanId> defaultVlan) {
+        TrafficSelector downstream = DefaultTrafficSelector.builder()
+                .matchVlanId(deviceVlan)
+                .matchInPort(uplinkPort)
+                .matchInnerVlanId(subscriberVlan)
+                .build();
+
+        TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
+                .popVlan()
+                .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
+                .setOutput(subscriberPort)
+                .build();
+
+        return DefaultForwardingObjective.builder()
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withPriority(1000)
+                .makePermanent()
+                .withSelector(downstream)
+                .fromApp(appId)
+                .withTreatment(downstreamTreatment);
+    }
+
+    private ForwardingObjective.Builder upBuilder(PortNumber uplinkPort,
+                                                  PortNumber subscriberPort,
+                                                  VlanId subscriberVlan,
+                                                  VlanId deviceVlan,
+                                                  Optional<VlanId> defaultVlan) {
+        TrafficSelector upstream = DefaultTrafficSelector.builder()
+                .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
+                .matchInPort(subscriberPort)
+                .build();
+
+
+        TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
+                .pushVlan()
+                .setVlanId(subscriberVlan)
+                .pushVlan()
+                .setVlanId(deviceVlan)
+                .setOutput(uplinkPort)
+                .build();
+
+        return DefaultForwardingObjective.builder()
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withPriority(1000)
+                .makePermanent()
+                .withSelector(upstream)
+                .fromApp(appId)
+                .withTreatment(upstreamTreatment);
+    }
+
     private void processFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
 
@@ -430,9 +466,11 @@
                     break;
                 case PORT_REMOVED:
                     AccessDeviceData olt = oltData.get(devId);
+                    VlanId vlan = subscribers.get(new ConnectPoint(devId,
+                                                                   event.port().number()));
                     unprovisionSubscriber(devId, olt.uplink(),
                                           event.port().number(),
-                                          olt.vlan());
+                                          vlan, olt.vlan(), olt.defaultVlan());
                     if (!oltData.get(devId).uplink().equals(event.port().number()) &&
                             event.port().isEnabled()) {
                         processFilteringObjectives(devId, event.port().number(), false);