SEBA-1009-Minor version upgrade

Change-Id: I3fff2718ed28842872773fa0a93f73f127545f2f
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 24073b0..b95bc08 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -68,6 +68,7 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -80,8 +81,6 @@
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -112,6 +111,7 @@
 public class Olt
         extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
         implements AccessDeviceService {
+    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
     private static final String APP_NAME = "org.opencord.olt";
 
     private static final short EAPOL_DEFAULT_VLAN = 4091;
@@ -132,8 +132,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected SadisService sadisService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected AccessDeviceFlowService oltFlowService;
@@ -198,7 +201,7 @@
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
     private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
-    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
+    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -215,6 +218,8 @@
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
                 .register(UniTagInformation.class)
+                .register(SubscriberFlowInfo.class)
+                .register(LinkedBlockingQueue.class)
                 .build();
 
         programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
@@ -229,11 +234,19 @@
                 .withApplicationId(appId)
                 .build();
 
-        pendingSubscribersForDevice = new ConcurrentHashMap<>();
+        pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+                .withName("volt-pending-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build().asJavaMap();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
-        subsService = sadisService.getSubscriberInfoService();
-        bpService = sadisService.getBandwidthProfileService();
+        if (sadisService != null) {
+            subsService = sadisService.getSubscriberInfoService();
+            bpService = sadisService.getBandwidthProfileService();
+        } else {
+            log.warn(SADIS_NOT_RUNNING);
+        }
 
         List<NodeId> readyNodes = clusterService.getNodes().stream()
                 .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
@@ -292,6 +305,20 @@
         }
     }
 
+    protected void bindSadisService(SadisService service) {
+        sadisService = service;
+        bpService = sadisService.getBandwidthProfileService();
+        subsService = sadisService.getSubscriberInfoService();
+        log.info("Sadis-service binds to onos.");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        sadisService = null;
+        bpService = null;
+        subsService = null;
+        log.info("Sadis-service unbinds from onos.");
+    }
+
     @Override
     public boolean provisionSubscriber(ConnectPoint connectPoint) {
         log.info("Call to provision subscriber at {}", connectPoint);
@@ -590,6 +617,10 @@
      * @return the context of the bandwidth profile information
      */
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (bpService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         if (bandwidthProfile == null) {
             return null;
         }
@@ -826,6 +857,7 @@
             }
         }
     }
+
     private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
         //If false the meter is already being installed, skipping installation
         if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
@@ -851,6 +883,7 @@
                     SubscriberFlowInfo fi = queue.peek();
                     if (fi == null) {
                         log.debug("No more subscribers pending on {}", deviceId);
+                        pendingSubscribersForDevice.replace(deviceId, queue);
                         break;
                     }
                     if (result == null) {
@@ -891,6 +924,7 @@
         });
 
     }
+
     /**
      * Add subscriber flows given meter information for both upstream and
      * downstream directions.
@@ -1100,7 +1134,11 @@
      * @param cp ConnectPoint on which to find the subscriber
      * @return subscriber if found else null
      */
-    SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+    protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         Port port = deviceService.getPort(cp);
         checkNotNull(port, "Invalid connect point");
         String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
@@ -1137,6 +1175,10 @@
      * @return the olt information
      */
     private SubscriberAndDeviceInformation getOltInfo(Device dev) {
+        if (subsService == null) {
+            log.warn(SADIS_NOT_RUNNING);
+            return null;
+        }
         String devSerialNo = dev.serialNumber();
         return subsService.get(devSerialNo);
     }