[VOL-3767] Moving the subscriber to REMOVED state to re-install the flows correctly
Change-Id: I1fdd77c957148086749e8239cb30b5b7bf47f624
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 88cd942..4fb56df 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -28,9 +28,11 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
@@ -1646,10 +1648,30 @@
switch (event.type()) {
case RULE_ADDED:
case RULE_REMOVED:
+ DeviceId deviceId = event.subject().deviceId();
Port port = getCpFromFlowRule(event.subject());
if (port == null) {
- log.error("Can't find port in flow {}", event.subject());
- return;
+ log.warn("Port is gone in ONOS, " +
+ "manually creating it {}", event.subject());
+ PortNumber inPort = getPortNumberFromFlowRule(event.subject());
+ cpStatusReadLock.lock();
+ Optional<ServiceKey> keyWithPort = cpStatus.keySet()
+ .stream().filter(key -> key.getPort().connectPoint()
+ .deviceId().equals(deviceId)
+ && key.getPort().connectPoint().port()
+ .equals(inPort)).findFirst();
+ cpStatusReadLock.unlock();
+ if (keyWithPort.isPresent()) {
+ port = new DefaultPort(deviceService.getDevice(deviceId),
+ inPort, false,
+ DefaultAnnotations.builder()
+ .set(AnnotationKeys.PORT_NAME,
+ keyWithPort.get().getPort().name())
+ .build());
+ } else {
+ log.warn("Can't find corresponding status for {}/{}", deviceId, inPort);
+ return;
+ }
}
if (log.isTraceEnabled()) {
log.trace("flow event {} on cp {}: {}", event.type(),
@@ -1676,7 +1698,7 @@
}
updateConnectPointStatus(sk, status, null, null, null);
} else if (isDhcpFlow(flowRule)) {
- ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
if (sk == null) {
return;
}
@@ -1685,7 +1707,7 @@
}
updateConnectPointStatus(sk, null, null, status, null);
} else if (isPppoeFlow(flowRule)) {
- ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
if (sk == null) {
return;
}
@@ -1694,14 +1716,17 @@
}
updateConnectPointStatus(sk, null, null, null, status);
} else if (isDataFlow(flowRule)) {
-
- if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
- getCpFromFlowRule(flowRule).number())) {
+ PortNumber number = getPortNumberFromFlowRule(flowRule);
+ if (number == null) {
+ log.error("Can't capture the port number from flow {}", flowRule);
+ return;
+ }
+ if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), number)) {
// the NNI has data-plane for every subscriber, doesn't make sense to track them
return;
}
- ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+ ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
if (sk == null) {
return;
}
@@ -1784,16 +1809,22 @@
private Port getCpFromFlowRule(FlowRule flowRule) {
DeviceId deviceId = flowRule.deviceId();
- PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+ PortNumber inPort = getPortNumberFromFlowRule(flowRule);
if (inPort != null) {
- PortNumber port = inPort.port();
- return deviceService.getPort(deviceId, port);
+ return deviceService.getPort(deviceId, inPort);
}
return null;
}
- private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
- Port flowPort = getCpFromFlowRule(flowRule);
+ private PortNumber getPortNumberFromFlowRule(FlowRule flowRule) {
+ PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+ if (inPort != null) {
+ return inPort.port();
+ }
+ return null;
+ }
+
+ private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule, Port flowPort) {
SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index 2f0f82e..ca90867 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -46,10 +46,13 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
@@ -68,6 +71,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
@@ -89,6 +93,9 @@
public class OltFlowServiceTest extends OltTestHelpers {
+ public static final String PORT_1 = "port-1";
+ public static final String PORT_2 = "port-2";
+ public static final String PORT_3 = "port-3";
private OltFlowService component;
private OltFlowService oltFlowService;
OltFlowService.InternalFlowListener internalFlowListener;
@@ -144,11 +151,11 @@
Device device =
new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
- DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
Port port2 = new DefaultPort(device, PortNumber.portNumber(2), true,
- DefaultAnnotations.builder().set(PORT_NAME, "port-2").build());
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_2).build());
Port port3 = new DefaultPort(device, PortNumber.portNumber(3), true,
- DefaultAnnotations.builder().set(PORT_NAME, "port-3").build());
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_3).build());
ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
ServiceKey sk2 = new ServiceKey(new AccessDevicePort(port2), new UniTagInformation());
@@ -192,7 +199,7 @@
Device device =
new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
- DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
@@ -822,7 +829,7 @@
Device device =
new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
Port port = new DefaultPort(device, PortNumber.portNumber(1), true,
- DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
List<UniTagInformation> uniTagInformationList = new LinkedList<>();
UniTagInformation hsia = new UniTagInformation.Builder()
@@ -864,4 +871,65 @@
eq(OltFlowService.FlowOperation.ADD), eq(VlanId.vlanId(OltFlowService.EAPOL_DEFAULT_VLAN)));
Assert.assertTrue(res);
}
+
+ @Test
+ public void testRemovedFlowEvent() throws InterruptedException {
+ // test that we update the status in case of REMOVED flow even with non
+ // existing port in the onos device manager
+
+ DeviceId deviceId = DeviceId.deviceId("test-device");
+ ProviderId pid = new ProviderId("of", "foo");
+ Device device =
+ new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+
+ Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
+ DefaultAnnotations.builder().set(PORT_NAME, PORT_1).build());
+ // create empty service for testing
+ List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+ UniTagInformation vlanUniTag = new UniTagInformation.Builder().setPonCTag(VlanId.vlanId((short) 60))
+ .build();
+ uniTagInformationList.add(vlanUniTag);
+ SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+ si.setUniTagList(uniTagInformationList);
+ ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), vlanUniTag);
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchInPort(port1.number())
+ .matchVlanId(VlanId.vlanId((short) 60))
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .immediate()
+ .setOutput(port1.number())
+ .build();
+
+ FlowRule flowRule = DefaultFlowRule.builder()
+ .makePermanent()
+ .withPriority(1000)
+ .forTable(0)
+ .forDevice(deviceId)
+ .fromApp(testAppId)
+ .withSelector(selector)
+ .withTreatment(treatment)
+ .build();
+
+ // cpStatus map for the test
+ component.cpStatus = component.storageService.
+ <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
+ OltPortStatus cp1Status = new OltPortStatus(NONE, PENDING_REMOVE, NONE, NONE);
+ component.cpStatus.put(sk1, cp1Status);
+
+ FlowRuleEvent event = new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, flowRule);
+ doReturn(true).when(component.oltDeviceService).isLocalLeader(any());
+ doReturn(device).when(component.deviceService).getDevice(deviceId);
+ doReturn(si).when(component.subsService).get(PORT_1);
+
+ oltFlowService.internalFlowListener.event(event);
+
+ //Some time to finish the operation
+ TimeUnit.MILLISECONDS.sleep(200);
+
+ OltPortStatus status = component.cpStatus.get(sk1);
+ Assert.assertEquals(REMOVED, status.subscriberFlowsStatus);
+ }
}
\ No newline at end of file