SEBA-1009-Minor version upgrade

Change-Id: I3fff2718ed28842872773fa0a93f73f127545f2f
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 14fa4f9..adaeeba 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -21,6 +21,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.TpPort;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -46,6 +47,9 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
@@ -60,14 +64,14 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -88,7 +92,7 @@
         DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
 })
 public class OltFlowService implements AccessDeviceFlowService {
-
+    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
     private static final String APP_NAME = "org.opencord.olt";
     private static final int NONE_TP_ID = -1;
     private static final int NO_PCP = -1;
@@ -112,8 +116,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected SadisService sadisService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
@@ -124,6 +131,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
     /**
      * Create DHCP trap flow on NNI port(s).
      */
@@ -162,15 +172,29 @@
     protected ApplicationId appId;
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
-            = new ConcurrentHashMap<>();
+    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
 
     @Activate
     public void activate(ComponentContext context) {
-        bpService = sadisService.getBandwidthProfileService();
-        subsService = sadisService.getSubscriberInfoService();
+        if (sadisService != null) {
+            bpService = sadisService.getBandwidthProfileService();
+            subsService = sadisService.getSubscriberInfoService();
+        } else {
+            log.warn(SADIS_NOT_RUNNING);
+        }
         componentConfigService.registerProperties(getClass());
         appId = coreService.getAppId(APP_NAME);
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(UniTagInformation.class)
+                .register(SubscriberFlowInfo.class)
+                .register(LinkedBlockingQueue.class)
+                .build();
+        pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+                .withName("volt-pending-eapol")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
         log.info("started");
     }
 
@@ -228,6 +252,20 @@
 
     }
 
+    protected void bindSadisService(SadisService service) {
+        sadisService = service;
+        bpService = sadisService.getBandwidthProfileService();
+        subsService = sadisService.getSubscriberInfoService();
+        log.info("Sadis-service binds to onos.");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        sadisService = null;
+        bpService = null;
+        subsService = null;
+        log.info("Sadis-service unbinds from onos.");
+    }
+
     @Override
     public void processDhcpFilteringObjectives(DeviceId devId, PortNumber port,
                                                MeterId upstreamMeterId,
@@ -580,6 +618,7 @@
                 while (true) {
                     SubscriberFlowInfo fi = queue.remove();
                     if (fi == null) {
+                        pendingEapolForDevice.replace(devId, queue);
                         break;
                     }
                     //TODO this might return the reference and not the actual object
@@ -858,6 +897,10 @@
     }
 
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (bpService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         if (bandwidthProfile == null) {
             return null;
         }
@@ -874,6 +917,10 @@
      * @return the default technology profile id
      */
     private int getDefaultTechProfileId(DeviceId devId, PortNumber portNumber) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return defaultTechProfileId;
+        }
         Port port = deviceService.getPort(devId, portNumber);
         if (port != null) {
             SubscriberAndDeviceInformation info = subsService.get(port.annotations().value(AnnotationKeys.PORT_NAME));