[VOL-4246] Feature parity with the previous implementation

Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
index af63004..e8a76d0 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,155 +13,196 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-
-import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.onlab.packet.ChassisId;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.Element;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OltTest extends TestBase {
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+
+/**
+ * Set of tests of the ONOS application component.
+ */
+
+@RunWith(MockitoJUnitRunner.class)
+public class OltTest extends OltTestHelpers {
+
+    private Olt component;
+    private final ApplicationId testAppId = new DefaultApplicationId(1, "org.opencord.olt.test");
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private Olt olt;
 
-
-    private static final String SCHEME_NAME = "olt";
-    private static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
-            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+    private DeviceId deviceId = DeviceId.deviceId("test-device");
+    private Device testDevice = new DefaultDevice(ProviderId.NONE, deviceId, Device.Type.OLT,
+            "testManufacturer", "1.0", "1.0", "SN", new ChassisId(1));
+    private Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
+                                                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1")
+                                                        .build());
+    private ConnectPoint cp = new ConnectPoint(deviceId, uniUpdateEnabled.number());
+    private DiscoveredSubscriber sub;
 
     @Before
     public void setUp() {
-        olt = new Olt();
-        olt.deviceService = new MockDeviceService();
-        olt.sadisService = new MockSadisService();
-        olt.subsService = olt.sadisService.getSubscriberInfoService();
-        olt.pendingSubscribersForDevice = Maps.newConcurrentMap();
+        component = new Olt();
+        component.requeueDelay = 0; // avoid delays in the queue add to make things easier in testing
+        component.cfgService = new ComponentConfigAdapter();
+        component.deviceService = Mockito.mock(DeviceService.class);
+        component.storageService = new TestStorageService();
+        component.coreService = Mockito.spy(new CoreServiceAdapter());
+        component.oltDeviceService = Mockito.mock(OltDeviceService.class);
+
+        doReturn(testAppId).when(component.coreService).registerApplication("org.opencord.olt");
+
+        component.discoveredSubscriberExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        groupedThreads("onos/olt",
+                "discovered-cp-%d", log));
+        component.flowsExecutor =
+                Executors.newFixedThreadPool(component.flowProcessingThreads,
+                                             groupedThreads("onos/olt-service",
+                                                            "flows-installer-%d"));
+
+        component.subscriberExecutor = Executors.newFixedThreadPool(component.subscriberProcessingThreads,
+                                                          groupedThreads("onos/olt-service",
+                                                                         "subscriber-installer-%d"));
+        SadisService sadisService = Mockito.mock(SadisService.class);
+        component.oltFlowService = Mockito.mock(OltFlowService.class);
+        component.sadisService = sadisService;
+
+        // reset the spy on oltFlowService
+        reset(component.oltFlowService);
+
+        component.bindSadisService(sadisService);
+        component.eventsQueues = component.storageService.
+                <ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder().build().asJavaMap();
+        component.eventsQueues.put(cp, new LinkedBlockingQueue<>());
+
+        component.discoveredSubscriberExecutor.execute(() -> {
+            component.processDiscoveredSubscribers();
+        });
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        sub = new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
     }
 
-    /**
-     * Tests that the getSubscriber method does throw a NullPointerException with a meaningful message.
-     */
+    @After
+    public void tearDown() {
+        component.deactivate(null);
+    }
+
     @Test
-    public void testGetSubscriberError() {
-        ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
-        try {
-            olt.getSubscriber(cp);
-        } catch (NullPointerException e) {
-            assertEquals(e.getMessage(), "Invalid connect point");
-        }
+    public void testProcessDiscoveredSubscribersBasicPortSuccess() throws Exception {
+        doReturn(true).when(component.deviceService).isAvailable(any());
+        doReturn(sub.port).when(component.deviceService).getPort(any(), any());
+        doReturn(true).when(component.oltFlowService).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+        doReturn(true).when(component.oltDeviceService).isLocalLeader(cp.deviceId());
+
+        // adding the discovered subscriber to the queue
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        q.add(sub);
+        component.eventsQueues.put(cp, q);
+
+        // check that we're calling the correct method
+        TimeUnit.MILLISECONDS.sleep(600);
+        verify(component.oltFlowService, atLeastOnce()).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+
+        // check if the method doesn't throw an exception we're removing the subscriber from the queue
+        LinkedBlockingQueue<DiscoveredSubscriber> updatedQueue = component.eventsQueues.get(cp);
+        assert updatedQueue.isEmpty();
     }
 
-    /**
-     * Tests that the getSubscriber method returns Subscriber informations.
-     */
     @Test
-    public void testGetSubscriber() {
-        ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 2);
+    public void testProcessDiscoveredSubscribersBasicPortException() throws Exception {
+        doReturn(true).when(component.deviceService).isAvailable(any());
+        doReturn(sub.port).when(component.deviceService).getPort(any(), any());
+        doReturn(false).when(component.oltFlowService).handleBasicPortFlows(any(), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+        doReturn(true).when(component.oltDeviceService).isLocalLeader(cp.deviceId());
+        // replace the queue with a spy
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        LinkedBlockingQueue<DiscoveredSubscriber> spiedQueue = spy(q);
+        // adding the discovered subscriber to the queue
+        spiedQueue.add(sub);
+        component.eventsQueues.put(cp, spiedQueue);
 
-        SubscriberAndDeviceInformation s = olt.getSubscriber(cp);
+        TimeUnit.MILLISECONDS.sleep(600);
 
-        assertEquals(s.circuitId(), CLIENT_CIRCUIT_ID);
-        assertEquals(s.nasPortId(), CLIENT_NAS_PORT_ID);
+        // check that we're calling the correct method,
+        // since the subscriber is not removed from the queue we're calling the method multiple times
+        verify(component.oltFlowService, atLeastOnce()).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+
+        // check if the method throws an exception we are not removing the subscriber from the queue
+        verify(spiedQueue, never()).remove(sub);
     }
 
-    private class MockDevice extends DefaultDevice {
+    @Test
+    public void testAddSubscriberToQueue() {
 
-        public MockDevice(ProviderId providerId, DeviceId id, Type type,
-                          String manufacturer, String hwVersion, String swVersion,
-                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
-            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
-                    chassisId, annotations);
-        }
+        // replace the queue with a spy
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        LinkedBlockingQueue<DiscoveredSubscriber> spiedQueue = spy(q);
+        component.eventsQueues.put(cp, spiedQueue);
+
+        // trying to add the same event twice should result in a single item in the queue
+        component.addSubscriberToQueue(sub);
+        component.addSubscriberToQueue(sub);
+
+        verify(spiedQueue, times(1)).add(sub);
+        Assert.assertEquals(1, spiedQueue.size());
+
+
+
     }
 
-    private class MockDeviceService extends DeviceServiceAdapter {
-
-        private ProviderId providerId = new ProviderId("of", "foo");
-        private final Device device1 = new MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
-                "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
-                DEVICE_ANNOTATIONS);
-
-        @Override
-        public Device getDevice(DeviceId devId) {
-            return device1;
-
-        }
-
-        @Override
-        public Port getPort(ConnectPoint cp) {
-            log.info("Looking up port {}", cp.port().toString());
-            if (cp.port().toString().equals("1")) {
-                return null;
-            }
-            return new MockPort();
-        }
-    }
-
-    private class MockPort implements Port {
-
-        @Override
-        public boolean isEnabled() {
-            return true;
-        }
-
-        @Override
-        public long portSpeed() {
-            return 1000;
-        }
-
-        @Override
-        public Element element() {
-            return null;
-        }
-
-        @Override
-        public PortNumber number() {
-            return null;
-        }
-
-        @Override
-        public Annotations annotations() {
-            return new MockAnnotations();
-        }
-
-        @Override
-        public Type type() {
-            return Port.Type.FIBER;
-        }
-
-        private class MockAnnotations implements Annotations {
-
-            @Override
-            public String value(String val) {
-                return "BRCM12345678";
-            }
-
-            @Override
-            public Set<String> keys() {
-                return null;
-            }
-        }
-    }
-
-
-}
\ No newline at end of file
+}