VOL-542: Use Sadis data (if available) for OLT configuration data instead of AccessDevice Configuration. Removed caching of Sadis data in oltApp

Change-Id: I3736bc48aa985b7a9cffbdfb838789c4b034516f
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 9717bad..a81ac75 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -15,7 +15,6 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,7 +33,9 @@
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
@@ -59,17 +60,17 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
 import org.opencord.cordconfig.access.AccessDeviceConfig;
-import org.opencord.cordconfig.access.AccessDeviceData;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.SubscriberAndDeviceInformationService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
@@ -77,7 +78,6 @@
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -98,7 +98,6 @@
     private static final String APP_NAME = "org.opencord.olt";
 
     private static final short DEFAULT_VLAN = 0;
-    private static final String SUBSCRIBERS = "existing-subscribers";
 
     private final Logger log = getLogger(getClass());
 
@@ -121,7 +120,7 @@
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
+    protected SubscriberAndDeviceInformationService subsService;
 
     @Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
             label = "Default VLAN RG<->ONU traffic")
@@ -142,11 +141,6 @@
     private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
                                                                          groupedThreads("onos/olt-service",
                                                                                         "olt-installer-%d"));
-
-    private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
-
-    private Map<ConnectPoint, VlanId> subscribers;
-
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
     private static final Class<AccessDeviceConfig> CONFIG_CLASS =
@@ -173,26 +167,12 @@
         networkConfig.registerConfigFactory(configFactory);
         networkConfig.addListener(configListener);
 
-
-        networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
-                subject -> {
-                    AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
-                    if (config != null) {
-                        AccessDeviceData data = config.getOlt();
-                        oltData.put(data.deviceId(), data);
-                    }
-                }
-        );
-
-        oltData.keySet().stream()
-                .flatMap(did -> deviceService.getPorts(did).stream())
-                .filter(this::isUni)
-                .forEach(this::initializeUni);
-
-        subscribers = storageService.<ConnectPoint, VlanId>consistentMapBuilder()
-                .withName(SUBSCRIBERS)
-                .withSerializer(Serializer.using(KryoNamespaces.API))
-                .build().asJavaMap();
+        // look for all provisioned devices in Sadis and create EAPOL flows for the
+        // UNI ports
+        Iterable<Device> devices = deviceService.getDevices();
+        for (Device d : devices) {
+            checkAndCreateEapolFlows(d);
+        }
 
         deviceService.addListener(deviceListener);
 
@@ -233,65 +213,103 @@
     }
 
     @Override
-    public void provisionSubscriber(ConnectPoint port, VlanId vlan) {
+    public void provisionSubscriber(ConnectPoint port) {
         checkNotNull(deviceService.getPort(port.deviceId(), port.port()),
                 "Invalid connect point");
-        AccessDeviceData olt = oltData.get(port.deviceId());
+        // Find the subscriber on this connect point
+        SubscriberAndDeviceInformation sub = getSubscriber(port);
+        if (sub == null) {
+            log.warn("No subscriber found for {}", port);
+            return;
+        }
 
-        if (olt == null) {
-            log.warn("No data found for OLT device {}", port.deviceId());
-            throw new IllegalArgumentException("Missing OLT configuration for device");
+        // Get the uplink port
+        Port uplinkPort = getUplinkPort(deviceService.getDevice(port.deviceId()));
+        if (uplinkPort == null) {
+            log.warn("No uplink port found for OLT device {}", port.deviceId());
+            return;
         }
 
         if (enableDhcpOnProvisioning) {
-            processDhcpFilteringObjectives(olt.deviceId(), port.port(), true);
+            processDhcpFilteringObjectives(port.deviceId(), port.port(), true);
         }
 
-        provisionVlans(olt.deviceId(), olt.uplink(), port.port(), vlan, olt.vlan(),
-                olt.defaultVlan());
+        Optional<VlanId> defaultVlan = Optional.empty();
+        provisionVlans(port.deviceId(), uplinkPort.number(), port.port(), sub.cTag(), sub.sTag(),
+                defaultVlan);
 
         if (enableIgmpOnProvisioning) {
-            processIgmpFilteringObjectives(olt.deviceId(), port.port(), true);
+            processIgmpFilteringObjectives(port.deviceId(), port.port(), true);
         }
     }
 
     @Override
     public void removeSubscriber(ConnectPoint port) {
-        AccessDeviceData olt = oltData.get(port.deviceId());
-
-        if (olt == null) {
-            log.warn("No data found for OLT device {}", port.deviceId());
+        // Get the subscriber connected to this port from Sadis
+        SubscriberAndDeviceInformation subscriber = getSubscriber(port);
+        if (subscriber == null) {
+            log.warn("Subscriber on port {} not found", port);
             return;
         }
 
-        VlanId subscriberVlan = subscribers.remove(port);
-
-        if (subscriberVlan == null) {
-            log.warn("Unknown subscriber at location {}", port);
+        // Get the uplink port
+        Port uplinkPort = getUplinkPort(deviceService.getDevice(port.deviceId()));
+        if (uplinkPort == null) {
+            log.warn("No uplink port found for OLT device {}", port.deviceId());
             return;
         }
 
         if (enableDhcpOnProvisioning) {
-            processDhcpFilteringObjectives(olt.deviceId(), port.port(), false);
+            processDhcpFilteringObjectives(port.deviceId(), port.port(), false);
         }
 
-        unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), subscriberVlan,
-                              olt.vlan(), olt.defaultVlan());
+        Optional<VlanId> defaultVlan = Optional.empty();
+        unprovisionSubscriber(port.deviceId(), uplinkPort.number(), port.port(), subscriber.cTag(),
+                              subscriber.sTag(), defaultVlan);
 
         if (enableIgmpOnProvisioning) {
-            processIgmpFilteringObjectives(olt.deviceId(), port.port(), false);
+            processIgmpFilteringObjectives(port.deviceId(), port.port(), false);
         }
     }
 
 
     @Override
     public Collection<Map.Entry<ConnectPoint, VlanId>> getSubscribers() {
-        return subscribers.entrySet();
+        ArrayList<Map.Entry<ConnectPoint, VlanId>> subs = new ArrayList<>();
+
+        // Get the subscribers for all the devices
+        // If the port is UNI, is enabled and exists in Sadis then copy it
+        for (Device d : deviceService.getDevices()) {
+            for (Port p: deviceService.getPorts(d.id())) {
+                if (isUniPort(d, p) && p.isEnabled()) {
+                    ConnectPoint cp = new ConnectPoint(d.id(), p.number());
+
+                    SubscriberAndDeviceInformation sub = getSubscriber(cp);
+                    if (sub != null) {
+                        subs.add(new AbstractMap.SimpleEntry(cp, sub.cTag()));
+                    }
+                }
+            }
+        }
+
+        return subs;
     }
 
     @Override
-    public Map<DeviceId, AccessDeviceData> fetchOlts() {
-        return Maps.newHashMap(oltData);
+    public List<DeviceId> fetchOlts() {
+        // look through all the devices and find the ones that are OLTs as per Sadis
+        List<DeviceId> olts = new ArrayList<>();
+        Iterable<Device> devices = deviceService.getDevices();
+        for (Device d : devices) {
+            String devSerialNo = d.serialNumber();
+            SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
+
+            if (deviceInfo != null) {
+                // So this is indeed a OLT device
+                olts.add(d.id());
+            }
+        }
+        return olts;
     }
 
     private void initializeUni(Port port) {
@@ -379,9 +397,6 @@
                                                           subscriberVlan, deviceVlan,
                                                           defaultVlan);
 
-        ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-        subscribers.put(cp, subscriberVlan);
-
         flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -578,17 +593,88 @@
         flowObjectiveService.filter(devId, igmp);
     }
 
-    private boolean isUni(Port port) {
-        return !oltData.get(port.element().id()).uplink().equals(port.number());
+    /**
+     * Creates EAPOL flows on the UNIs, if device is
+     * present in Sadis config.
+     *
+     * @param dev Device to look for
+     */
+    private void checkAndCreateEapolFlows(Device dev) {
+        // we create only for the ones we are master of
+        if (!mastershipService.isLocalMaster(dev.id())) {
+                return;
+        }
+        // check if this device is provisioned in Sadis
+        String devSerialNo = dev.serialNumber();
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
+        log.debug("checkAndCreateEapolFlows: deviceInfo {}", deviceInfo);
+
+        if (deviceInfo != null) {
+            // This is an OLT device as per Sadis, we need to create EAPOL flows
+            // on all it's UNIs
+            for (Port p : deviceService.getPorts(dev.id())) {
+                if (isUniPort(dev, p)) {
+                    processFilteringObjectives(dev.id(), p.number(), true);
+                }
+            }
+        }
+    }
+
+    /**
+     * Get the uplink for of the OLT device.
+     *
+     * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
+     * this logic needs to be changed
+     *
+     * @param dev Device to look for
+     * @return The uplink Port of the OLT
+     */
+    private Port getUplinkPort(Device dev) {
+        // check if this device is provisioned in Sadis
+        String devSerialNo = dev.serialNumber();
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
+        log.debug("getUplinkPort: deviceInfo {}", deviceInfo);
+
+        if (deviceInfo != null) {
+            // Return the port that has been configured as the uplink port of this OLT in Sadis
+            for (Port p: deviceService.getPorts(dev.id())) {
+                if (p.number().toLong() == deviceInfo.uplinkPort()) {
+                    log.debug("getUplinkPort: Found port {}", p);
+                    return p;
+                }
+            }
+        }
+
+        log.debug("getUplinkPort: No uplink port found for OLT {}", dev);
+        return null;
+    }
+
+    /**
+     * Return the subscriber on a port.
+     *
+     * @param port On which to find the subscriber
+     * @return subscriber if found else null
+     */
+    private SubscriberAndDeviceInformation getSubscriber(ConnectPoint port) {
+        String portName = deviceService.getPort(port).annotations()
+                .value(AnnotationKeys.PORT_NAME);
+
+        return subsService.get(portName);
+    }
+
+    private boolean isUniPort(Device d, Port p) {
+        Port ulPort = getUplinkPort(d);
+        if (ulPort != null) {
+            return (ulPort.number().toLong() != p.number().toLong());
+        }
+        return false;
     }
 
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
             DeviceId devId = event.subject().id();
-            if (!oltData.containsKey(devId)) {
-                return;
-            }
+            Device dev = event.subject();
             if (event.type() != DeviceEvent.Type.PORT_STATS_UPDATED) {
                 log.debug("Olt got {} event for {}", event.type(), event.subject());
             }
@@ -596,7 +682,7 @@
                 //TODO: Port handling and bookkeeping should be improved once
                 // olt firmware handles correct behaviour.
                 case PORT_ADDED:
-                    if (!oltData.get(devId).uplink().equals(event.port().number())) {
+                    if (isUniPort(dev, event.port())) {
                         post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
 
                         if (event.port().isEnabled()) {
@@ -605,15 +691,25 @@
                     }
                     break;
                 case PORT_REMOVED:
-                    AccessDeviceData olt = oltData.get(devId);
-                    VlanId vlan = subscribers.get(new ConnectPoint(devId,
-                                                                   event.port().number()));
-                    if (vlan != null) {
-                        unprovisionSubscriber(devId, olt.uplink(),
-                                event.port().number(),
-                                vlan, olt.vlan(), olt.defaultVlan());
+                    String portName = event.port().annotations().value(AnnotationKeys.PORT_NAME);
+                    SubscriberAndDeviceInformation subscriber = subsService.get(portName);
+                    if (subscriber == null) {
+                        log.warn("Subscriber {} not found", portName);
+                        break;
                     }
-                    if (!oltData.get(devId).uplink().equals(event.port().number())) {
+
+                    Port uplinkPort = getUplinkPort(dev);
+                    if (uplinkPort == null) {
+                        log.warn("No uplink port found for device {}", dev);
+                        break;
+                    }
+
+                    Optional<VlanId> defaultVlan = Optional.empty();
+                    unprovisionSubscriber(devId, uplinkPort.number(),
+                            event.port().number(),
+                            subscriber.cTag(), subscriber.sTag(), defaultVlan);
+
+                    if (isUniPort(dev, event.port())) {
                         if (event.port().isEnabled()) {
                             processFilteringObjectives(devId, event.port().number(), false);
                         }
@@ -623,9 +719,10 @@
 
                     break;
                 case PORT_UPDATED:
-                    if (oltData.get(devId).uplink().equals(event.port().number())) {
+                    if (!isUniPort(dev, event.port())) {
                         break;
                     }
+
                     if (event.port().isEnabled()) {
                         processFilteringObjectives(devId, event.port().number(), true);
                         post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
@@ -640,16 +737,16 @@
 
                     // Send UNI_ADDED events for all existing ports
                     deviceService.getPorts(devId).stream()
-                            .filter(Olt.this::isUni)
+                            .filter(p -> isUniPort(dev, p))
                             .filter(Port::isEnabled)
                             .forEach(p -> post(new AccessDeviceEvent(
                                     AccessDeviceEvent.Type.UNI_ADDED, devId, p)));
 
-                    provisionDefaultFlows(devId);
+                    checkAndCreateEapolFlows(dev);
                     break;
                 case DEVICE_REMOVED:
                     deviceService.getPorts(devId).stream()
-                            .filter(Olt.this::isUni)
+                            .filter(p -> isUniPort(dev, p))
                             .forEach(p -> post(new AccessDeviceEvent(
                                     AccessDeviceEvent.Type.UNI_REMOVED, devId, p)));
 
@@ -685,19 +782,11 @@
                 case CONFIG_ADDED:
                 case CONFIG_UPDATED:
 
-                    AccessDeviceConfig config =
-                            networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
-                    if (config != null) {
-                        oltData.put(config.getOlt().deviceId(), config.getOlt());
-                        provisionDefaultFlows((DeviceId) event.subject());
-                    }
-
                     break;
                 case CONFIG_REGISTERED:
                 case CONFIG_UNREGISTERED:
                     break;
                 case CONFIG_REMOVED:
-                    oltData.remove(event.subject());
                 default:
                     break;
             }
@@ -708,19 +797,4 @@
             return event.configClass().equals(CONFIG_CLASS);
         }
     }
-
-    private void provisionDefaultFlows(DeviceId deviceId) {
-        if (!mastershipService.isLocalMaster(deviceId)) {
-            return;
-        }
-        List<Port> ports = deviceService.getPorts(deviceId);
-
-        ports.stream()
-                .filter(p -> !oltData.get(p.element().id()).uplink().equals(p.number()))
-                .filter(p -> p.isEnabled())
-                .forEach(p -> processFilteringObjectives((DeviceId) p.element().id(),
-                                                         p.number(), true));
-
-    }
-
 }