[VOL-3767] Moving the subscriber to REMOVED state to re-install the flows correctly

Change-Id: I1fdd77c957148086749e8239cb30b5b7bf47f624
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 88cd942..4fb56df 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -28,9 +28,11 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultPort;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
@@ -1646,10 +1648,30 @@
             switch (event.type()) {
                 case RULE_ADDED:
                 case RULE_REMOVED:
+                    DeviceId deviceId = event.subject().deviceId();
                     Port port = getCpFromFlowRule(event.subject());
                     if (port == null) {
-                        log.error("Can't find port in flow {}", event.subject());
-                        return;
+                        log.warn("Port is gone in ONOS, " +
+                                         "manually creating it {}", event.subject());
+                        PortNumber inPort = getPortNumberFromFlowRule(event.subject());
+                        cpStatusReadLock.lock();
+                        Optional<ServiceKey> keyWithPort = cpStatus.keySet()
+                                .stream().filter(key -> key.getPort().connectPoint()
+                                        .deviceId().equals(deviceId)
+                                         && key.getPort().connectPoint().port()
+                                        .equals(inPort)).findFirst();
+                        cpStatusReadLock.unlock();
+                        if (keyWithPort.isPresent()) {
+                            port = new DefaultPort(deviceService.getDevice(deviceId),
+                                                   inPort, false,
+                                                   DefaultAnnotations.builder()
+                                                           .set(AnnotationKeys.PORT_NAME,
+                                                                keyWithPort.get().getPort().name())
+                                                           .build());
+                        } else {
+                            log.warn("Can't find corresponding status for {}/{}", deviceId, inPort);
+                            return;
+                        }
                     }
                     if (log.isTraceEnabled()) {
                         log.trace("flow event {} on cp {}: {}", event.type(),
@@ -1676,7 +1698,7 @@
                 }
                 updateConnectPointStatus(sk, status, null, null, null);
             } else if (isDhcpFlow(flowRule)) {
-                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
                 }
@@ -1685,7 +1707,7 @@
                 }
                 updateConnectPointStatus(sk, null, null, status, null);
             } else if (isPppoeFlow(flowRule)) {
-                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
                 }
@@ -1694,14 +1716,17 @@
                 }
                 updateConnectPointStatus(sk, null, null, null, status);
             } else if (isDataFlow(flowRule)) {
-
-                if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
-                        getCpFromFlowRule(flowRule).number())) {
+                PortNumber number = getPortNumberFromFlowRule(flowRule);
+                if (number == null) {
+                    log.error("Can't capture the port number from flow {}", flowRule);
+                    return;
+                }
+                if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), number)) {
                     // the NNI has data-plane for every subscriber, doesn't make sense to track them
                     return;
                 }
 
-                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
                 }
@@ -1784,16 +1809,22 @@
 
         private Port getCpFromFlowRule(FlowRule flowRule) {
             DeviceId deviceId = flowRule.deviceId();
-            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+            PortNumber inPort = getPortNumberFromFlowRule(flowRule);
             if (inPort != null) {
-                PortNumber port = inPort.port();
-                return deviceService.getPort(deviceId, port);
+                return deviceService.getPort(deviceId, inPort);
             }
             return null;
         }
 
-        private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
-            Port flowPort = getCpFromFlowRule(flowRule);
+        private PortNumber getPortNumberFromFlowRule(FlowRule flowRule) {
+            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+            if (inPort != null) {
+                return inPort.port();
+            }
+            return null;
+        }
+
+        private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule, Port flowPort) {
             SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
 
             Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index 2f0f82e..ca90867 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -46,10 +46,13 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
@@ -68,6 +71,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
@@ -89,6 +93,9 @@
 
 public class OltFlowServiceTest extends OltTestHelpers {
 
+    public static final String PORT_1 = "port-1";
+    public static final String PORT_2 = "port-2";
+    public static final String PORT_3 = "port-3";
     private OltFlowService component;
     private OltFlowService oltFlowService;
     OltFlowService.InternalFlowListener internalFlowListener;
@@ -144,11 +151,11 @@
         Device device =
                 new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
         Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
-                DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+                DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
         Port port2 = new DefaultPort(device, PortNumber.portNumber(2), true,
-                DefaultAnnotations.builder().set(PORT_NAME, "port-2").build());
+                DefaultAnnotations.builder().set(PORT_NAME, PORT_2).build());
         Port port3 = new DefaultPort(device, PortNumber.portNumber(3), true,
-                DefaultAnnotations.builder().set(PORT_NAME, "port-3").build());
+                DefaultAnnotations.builder().set(PORT_NAME, PORT_3).build());
 
         ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
         ServiceKey sk2 = new ServiceKey(new AccessDevicePort(port2), new UniTagInformation());
@@ -192,7 +199,7 @@
         Device device =
                 new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
         Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
-                DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+                DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
 
         ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
 
@@ -822,7 +829,7 @@
         Device device =
                 new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
         Port port = new DefaultPort(device, PortNumber.portNumber(1), true,
-                DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+                DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
 
         List<UniTagInformation> uniTagInformationList = new LinkedList<>();
         UniTagInformation hsia = new UniTagInformation.Builder()
@@ -864,4 +871,65 @@
                         eq(OltFlowService.FlowOperation.ADD), eq(VlanId.vlanId(OltFlowService.EAPOL_DEFAULT_VLAN)));
         Assert.assertTrue(res);
     }
+
+    @Test
+    public void testRemovedFlowEvent() throws InterruptedException {
+        // test that we update the status in case of REMOVED flow even with non
+        // existing port in the onos device manager
+
+        DeviceId deviceId = DeviceId.deviceId("test-device");
+        ProviderId pid = new ProviderId("of", "foo");
+        Device device =
+                new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+
+        Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
+                                     DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation vlanUniTag = new UniTagInformation.Builder().setPonCTag(VlanId.vlanId((short) 60))
+                .build();
+        uniTagInformationList.add(vlanUniTag);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), vlanUniTag);
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchInPort(port1.number())
+                .matchVlanId(VlanId.vlanId((short) 60))
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .immediate()
+                .setOutput(port1.number())
+                .build();
+
+        FlowRule flowRule = DefaultFlowRule.builder()
+                .makePermanent()
+                .withPriority(1000)
+                .forTable(0)
+                .forDevice(deviceId)
+                .fromApp(testAppId)
+                .withSelector(selector)
+                .withTreatment(treatment)
+                .build();
+
+        // cpStatus map for the test
+        component.cpStatus = component.storageService.
+                <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
+        OltPortStatus cp1Status = new OltPortStatus(NONE, PENDING_REMOVE, NONE, NONE);
+        component.cpStatus.put(sk1, cp1Status);
+
+        FlowRuleEvent event = new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, flowRule);
+        doReturn(true).when(component.oltDeviceService).isLocalLeader(any());
+        doReturn(device).when(component.deviceService).getDevice(deviceId);
+        doReturn(si).when(component.subsService).get(PORT_1);
+
+        oltFlowService.internalFlowListener.event(event);
+
+        //Some time to finish the operation
+        TimeUnit.MILLISECONDS.sleep(200);
+
+        OltPortStatus status = component.cpStatus.get(sk1);
+        Assert.assertEquals(REMOVED, status.subscriberFlowsStatus);
+    }
 }
\ No newline at end of file