Enable operation in a multi-instance ONOS cluster.

Shared state has been moved to ONOS consistent maps to ensure it
is available throughout the cluster.

Event handling work (e.g. port up, etc) is partitioned between nodes
in the cluster using consistent hashing based on device ID.

Subscriber provisioning requests can be handled by any instance
(the instance that receives the request handles it).

Change-Id: I65cf24a7a7fe4397e1559e5d1c770449979f2566
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 825604c..7a3a3a3 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -15,13 +15,10 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -40,6 +37,10 @@
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
 import org.onosproject.net.meter.MeterService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.osgi.service.component.ComponentContext;
@@ -54,18 +55,21 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Dictionary;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
@@ -88,11 +92,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
     protected boolean deleteMeters = true;
 
-    protected SetMultimap<String, MeterKey> bpInfoToMeter =
-            Multimaps.synchronizedSetMultimap(HashMultimap.create());
-    protected Set<MeterKey> programmedMeters;
+    ConsistentMultimap<String, MeterKey> bpInfoToMeter;
+
     private ApplicationId appId;
     private static final String APP_NAME = "org.opencord.olt";
 
@@ -107,7 +113,19 @@
         eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
                 "events-%d", log));
         appId = coreService.registerApplication(APP_NAME);
-        programmedMeters = Sets.newConcurrentHashSet();
+        modified(context);
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(MeterKey.class)
+                .build();
+
+        bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
+                .withName("volt-bp-info-to-meter")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
         meterService.addListener(meterListener);
         componentConfigService.registerProperties(getClass());
         log.info("Olt Meter service started");
@@ -131,23 +149,25 @@
 
     @Override
     public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-        return ImmutableMap.copyOf(bpInfoToMeter.asMap());
+        return bpInfoToMeter.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
-    @Override
-    public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
+    void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
         bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
     }
 
     @Override
     public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-        if (bpInfoToMeter.get(bandwidthProfile) == null) {
+        if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
             log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
                     bandwidthProfile);
             return null;
         }
 
-        Optional<MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile)
+        Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
                 .stream()
                 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
                 .findFirst();
@@ -164,7 +184,9 @@
 
     @Override
     public ImmutableSet<MeterKey> getProgMeters() {
-        return ImmutableSet.copyOf(programmedMeters);
+        return bpInfoToMeter.stream()
+                .map(Map.Entry::getValue)
+                .collect(ImmutableSet.toImmutableSet());
     }
 
     @Override
@@ -210,11 +232,19 @@
         Meter meter = meterService.submit(meterRequest);
         meterIdRef.set(meter.id());
         addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
-        programmedMeters.add(MeterKey.key(deviceId, meter.id()));
         log.info("Meter is created. Meter Id {}", meter.id());
         return meter.id();
     }
 
+    @Override
+    public void clearMeters(DeviceId deviceId) {
+        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                .filter(e -> e.getValue().deviceId().equals(deviceId))
+                .collect(Collectors.toList());
+
+        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
+    }
+
     private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
         List<Band> meterBands = new ArrayList<>();
 
@@ -257,7 +287,6 @@
                 }
                 if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
                     log.info("Meter Removed Event is received for {}", meter.id());
-                    programmedMeters.remove(key);
                     pendingRemoveMeters.remove(key);
                     removeMeterFromBpMapping(key);
                 }
@@ -294,15 +323,11 @@
         }
 
         private void removeMeterFromBpMapping(MeterKey meterKey) {
-            Iterator<Map.Entry<String, MeterKey>> iterator = bpInfoToMeter.entries().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<String, MeterKey> entry = iterator.next();
-                if (entry.getValue().equals(meterKey)) {
-                    iterator.remove();
-                    log.info("Deleted meter for MeterKey {} - Last prog meters {}", meterKey, programmedMeters);
-                    break;
-                }
-            }
+            List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                    .filter(e -> e.getValue().equals(meterKey))
+                    .collect(Collectors.toList());
+
+            meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
         }
     }
 }