SEBA-1009-Minor version upgrade

Change-Id: I3fff2718ed28842872773fa0a93f73f127545f2f
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 24073b0..b95bc08 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -68,6 +68,7 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -80,8 +81,6 @@
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -112,6 +111,7 @@
 public class Olt
         extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
         implements AccessDeviceService {
+    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
     private static final String APP_NAME = "org.opencord.olt";
 
     private static final short EAPOL_DEFAULT_VLAN = 4091;
@@ -132,8 +132,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected SadisService sadisService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected AccessDeviceFlowService oltFlowService;
@@ -198,7 +201,7 @@
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
     private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
-    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
+    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -215,6 +218,8 @@
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
                 .register(UniTagInformation.class)
+                .register(SubscriberFlowInfo.class)
+                .register(LinkedBlockingQueue.class)
                 .build();
 
         programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
@@ -229,11 +234,19 @@
                 .withApplicationId(appId)
                 .build();
 
-        pendingSubscribersForDevice = new ConcurrentHashMap<>();
+        pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+                .withName("volt-pending-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
-        subsService = sadisService.getSubscriberInfoService();
-        bpService = sadisService.getBandwidthProfileService();
+        if (sadisService != null) {
+            subsService = sadisService.getSubscriberInfoService();
+            bpService = sadisService.getBandwidthProfileService();
+        } else {
+            log.warn(SADIS_NOT_RUNNING);
+        }
 
         List<NodeId> readyNodes = clusterService.getNodes().stream()
                 .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
@@ -292,6 +305,20 @@
         }
     }
 
+    protected void bindSadisService(SadisService service) {
+        sadisService = service;
+        bpService = sadisService.getBandwidthProfileService();
+        subsService = sadisService.getSubscriberInfoService();
+        log.info("Sadis-service binds to onos.");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        sadisService = null;
+        bpService = null;
+        subsService = null;
+        log.info("Sadis-service unbinds from onos.");
+    }
+
     @Override
     public boolean provisionSubscriber(ConnectPoint connectPoint) {
         log.info("Call to provision subscriber at {}", connectPoint);
@@ -590,6 +617,10 @@
      * @return the context of the bandwidth profile information
      */
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (bpService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         if (bandwidthProfile == null) {
             return null;
         }
@@ -826,6 +857,7 @@
             }
         }
     }
+
     private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
         //If false the meter is already being installed, skipping installation
         if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
@@ -851,6 +883,7 @@
                     SubscriberFlowInfo fi = queue.peek();
                     if (fi == null) {
                         log.debug("No more subscribers pending on {}", deviceId);
+                        pendingSubscribersForDevice.replace(deviceId, queue);
                         break;
                     }
                     if (result == null) {
@@ -891,6 +924,7 @@
         });
 
     }
+
     /**
      * Add subscriber flows given meter information for both upstream and
      * downstream directions.
@@ -1100,7 +1134,11 @@
      * @param cp ConnectPoint on which to find the subscriber
      * @return subscriber if found else null
      */
-    SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+    protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         Port port = deviceService.getPort(cp);
         checkNotNull(port, "Invalid connect point");
         String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
@@ -1137,6 +1175,10 @@
      * @return the olt information
      */
     private SubscriberAndDeviceInformation getOltInfo(Device dev) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         String devSerialNo = dev.serialNumber();
         return subsService.get(devSerialNo);
     }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 14fa4f9..adaeeba 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -21,6 +21,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.TpPort;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -46,6 +47,9 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
@@ -60,14 +64,14 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -88,7 +92,7 @@
         DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
 })
 public class OltFlowService implements AccessDeviceFlowService {
-
+    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
     private static final String APP_NAME = "org.opencord.olt";
     private static final int NONE_TP_ID = -1;
     private static final int NO_PCP = -1;
@@ -112,8 +116,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected SadisService sadisService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
@@ -124,6 +131,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
     /**
      * Create DHCP trap flow on NNI port(s).
      */
@@ -162,15 +172,29 @@
     protected ApplicationId appId;
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
-            = new ConcurrentHashMap<>();
+    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
 
     @Activate
     public void activate(ComponentContext context) {
-        bpService = sadisService.getBandwidthProfileService();
-        subsService = sadisService.getSubscriberInfoService();
+        if (sadisService != null) {
+            bpService = sadisService.getBandwidthProfileService();
+            subsService = sadisService.getSubscriberInfoService();
+        } else {
+            log.warn(SADIS_NOT_RUNNING);
+        }
         componentConfigService.registerProperties(getClass());
         appId = coreService.getAppId(APP_NAME);
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(UniTagInformation.class)
+                .register(SubscriberFlowInfo.class)
+                .register(LinkedBlockingQueue.class)
+                .build();
+        pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+                .withName("volt-pending-eapol")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
         log.info("started");
     }
 
@@ -228,6 +252,20 @@
 
     }
 
+    protected void bindSadisService(SadisService service) {
+        sadisService = service;
+        bpService = sadisService.getBandwidthProfileService();
+        subsService = sadisService.getSubscriberInfoService();
+        log.info("Sadis-service binds to onos.");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        sadisService = null;
+        bpService = null;
+        subsService = null;
+        log.info("Sadis-service unbinds from onos.");
+    }
+
     @Override
     public void processDhcpFilteringObjectives(DeviceId devId, PortNumber port,
                                                MeterId upstreamMeterId,
@@ -580,6 +618,7 @@
                 while (true) {
                     SubscriberFlowInfo fi = queue.remove();
                     if (fi == null) {
+                        pendingEapolForDevice.replace(devId, queue);
                         break;
                     }
                     //TODO this might return the reference and not the actual object
@@ -858,6 +897,10 @@
     }
 
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (bpService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         if (bandwidthProfile == null) {
             return null;
         }
@@ -874,6 +917,10 @@
      * @return the default technology profile id
      */
     private int getDefaultTechProfileId(DeviceId devId, PortNumber portNumber) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return defaultTechProfileId;
+        }
         Port port = deviceService.getPort(devId, portNumber);
         if (port != null) {
             SubscriberAndDeviceInformation info = subsService.get(port.annotations().value(AnnotationKeys.PORT_NAME));
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index d57f3c1..2acf10e 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -77,7 +77,6 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
 
 /**
  * Provisions Meters on access devices.
@@ -113,9 +112,9 @@
 
     protected ExecutorService eventExecutor;
 
-    private Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
-    private Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
-    ConsistentMultimap<String, MeterKey> bpInfoToMeter;
+    protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
+    protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+    protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -127,6 +126,7 @@
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
                 .register(MeterKey.class)
+                .register(BandwidthProfileInformation.class)
                 .build();
 
         bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
@@ -137,8 +137,16 @@
 
         meterService.addListener(meterListener);
         componentConfigService.registerProperties(getClass());
-        pendingMeters = Maps.newConcurrentMap();
-        pendingRemoveMeters = Maps.newConcurrentMap();
+        pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
+                .withName("volt-pending-meters")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
+        pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
+                .withName("volt-pending-remove-meters")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
         log.info("Olt Meter service started");
     }