Enable operation in a multi-instance ONOS cluster.

Shared state has been moved to ONOS consistent maps to ensure it
is available throughout the cluster.

Event handling work (e.g. port up, etc) is partitioned between nodes
in the cluster using consistent hashing based on device ID.

Subscriber provisioning requests can be handled by any instance
(the instance that receives the request handles it).

Change-Id: I65cf24a7a7fe4397e1559e5d1c770449979f2566
diff --git a/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java b/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
new file mode 100644
index 0000000..1f682cb
--- /dev/null
+++ b/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ConsistentHasherTest {
+
+    private static final int WEIGHT = 10;
+
+    private static final NodeId N1 = new NodeId("10.0.0.1");
+    private static final NodeId N2 = new NodeId("10.0.0.2");
+    private static final NodeId N3 = new NodeId("10.0.0.3");
+
+    private ConsistentHasher hasher;
+
+    @Before
+    public void setUp() {
+        List<NodeId> servers = new ArrayList<>();
+        servers.add(N1);
+        servers.add(N2);
+
+        hasher = new ConsistentHasher(servers, WEIGHT);
+    }
+
+    @Test
+    public void testHasher() {
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        NodeId server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N1));
+
+        deviceId = DeviceId.deviceId("bsaf");
+        server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N2));
+    }
+
+    @Test
+    public void testAddServer() {
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        NodeId server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N1));
+
+        hasher.addServer(N3);
+
+        server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N3));
+    }
+}
diff --git a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index 16b407b..e0bd109 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -188,11 +188,6 @@
         }
 
         @Override
-        public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
-
-        }
-
-        @Override
         public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
             return null;
         }
@@ -208,6 +203,10 @@
                                    CompletableFuture<Object> meterFuture) {
             return usMeterId;
         }
+
+        @Override
+        public void clearMeters(DeviceId deviceId) {
+        }
     }
 
     private class MockOltFlowObjectiveService implements org.onosproject.net.flowobjective.FlowObjectiveService {
diff --git a/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java b/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
index da7e3e1..000ea35 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
@@ -15,12 +15,11 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.meter.DefaultMeter;
 import org.onosproject.net.meter.Meter;
@@ -28,6 +27,7 @@
 import org.onosproject.net.meter.MeterKey;
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.sadis.BandwidthProfileInformation;
 
 import java.util.Collection;
@@ -41,9 +41,12 @@
     @Before
     public void setUp() {
         oltMeterService = new OltMeterService();
-        oltMeterService.bpInfoToMeter = Multimaps.synchronizedSetMultimap(HashMultimap.create());
-        oltMeterService.programmedMeters = Sets.newConcurrentHashSet();
+        oltMeterService.storageService = new TestStorageService();
         oltMeterService.meterService = new MockMeterService();
+        oltMeterService.coreService = new CoreServiceAdapter();
+        oltMeterService.componentConfigService = new ComponentConfigAdapter();
+        oltMeterService.activate(null);
+        oltMeterService.bpInfoToMeter = new MockConsistentMultimap<>();
     }
 
     @Test
diff --git a/app/src/test/java/org/opencord/olt/impl/TestBase.java b/app/src/test/java/org/opencord/olt/impl/TestBase.java
index b8447a9..bc5a4d6 100644
--- a/app/src/test/java/org/opencord/olt/impl/TestBase.java
+++ b/app/src/test/java/org/opencord/olt/impl/TestBase.java
@@ -15,18 +15,33 @@
  */
 package org.opencord.olt.impl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.MacAddress;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.TestConsistentMultimap;
+import org.onosproject.store.service.Versioned;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
 
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 public class TestBase {
 
@@ -124,4 +139,187 @@
             this.setCircuitId(circuitId);
         }
     }
+
+    class MockConsistentMultimap<K, V> implements ConsistentMultimap<K, V> {
+        private HashMultimap<K, Versioned<V>> innermap;
+        private AtomicLong counter = new AtomicLong();
+
+        public MockConsistentMultimap() {
+            this.innermap = HashMultimap.create();
+        }
+
+        private Versioned<V> version(V v) {
+            return new Versioned<>(v, counter.incrementAndGet(), System.currentTimeMillis());
+        }
+
+        private Versioned<Collection<? extends V>> versionCollection(Collection<? extends V> collection) {
+            return new Versioned<>(collection, counter.incrementAndGet(), System.currentTimeMillis());
+        }
+
+        @Override
+        public int size() {
+            return innermap.size();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return innermap.isEmpty();
+        }
+
+        @Override
+        public boolean containsKey(K key) {
+            return innermap.containsKey(key);
+        }
+
+        @Override
+        public boolean containsValue(V value) {
+            return innermap.containsValue(value);
+        }
+
+        @Override
+        public boolean containsEntry(K key, V value) {
+            return innermap.containsEntry(key, value);
+        }
+
+        @Override
+        public boolean put(K key, V value) {
+            return innermap.put(key, version(value));
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
+            innermap.put(key, version(value));
+            return (Versioned<Collection<? extends V>>) innermap.get(key);
+        }
+
+        @Override
+        public boolean remove(K key, V value) {
+            return innermap.remove(key, value);
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
+            innermap.remove(key, value);
+            return (Versioned<Collection<? extends V>>) innermap.get(key);
+        }
+
+        @Override
+        public boolean removeAll(K key, Collection<? extends V> values) {
+            return false;
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> removeAll(K key) {
+            return null;
+        }
+
+        @Override
+        public boolean putAll(K key, Collection<? extends V> values) {
+            return false;
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
+            return null;
+        }
+
+        @Override
+        public void clear() {
+            innermap.clear();
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> get(K key) {
+            Collection<? extends V> values = innermap.get(key).stream()
+                    .map(v -> v.value())
+                    .collect(Collectors.toList());
+            return versionCollection(values);
+        }
+
+        @Override
+        public Set<K> keySet() {
+            return innermap.keySet();
+        }
+
+        @Override
+        public Multiset<K> keys() {
+            return innermap.keys();
+        }
+
+        @Override
+        public Multiset<V> values() {
+            return null;
+        }
+
+        @Override
+        public Collection<Map.Entry<K, V>> entries() {
+            return null;
+        }
+
+        @Override
+        public Iterator<Map.Entry<K, V>> iterator() {
+            return new ConsistentMultimapIterator(innermap.entries().iterator());
+        }
+
+        @Override
+        public Map<K, Collection<V>> asMap() {
+            return null;
+        }
+
+        @Override
+        public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        }
+
+        @Override
+        public void removeListener(MultimapEventListener<K, V> listener) {
+        }
+
+        @Override
+        public String name() {
+            return "mock multimap";
+        }
+
+        @Override
+        public Type primitiveType() {
+            return null;
+        }
+
+        private class ConsistentMultimapIterator implements Iterator<Map.Entry<K, V>> {
+
+            private final Iterator<Map.Entry<K, Versioned<V>>> it;
+
+            public ConsistentMultimapIterator(Iterator<Map.Entry<K, Versioned<V>>> it) {
+                this.it = it;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override
+            public Map.Entry<K, V> next() {
+                Map.Entry<K, Versioned<V>> e = it.next();
+                return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().value());
+            }
+        }
+
+    }
+
+    public static TestConsistentMultimap.Builder builder() {
+        return new TestConsistentMultimap.Builder();
+    }
+
+    public static class Builder<K, V> extends ConsistentMultimapBuilder<K, V> {
+
+        @Override
+        public AsyncConsistentMultimap<K, V> buildMultimap() {
+            return null;
+        }
+
+        @Override
+        public ConsistentMultimap<K, V> build() {
+            return new TestConsistentMultimap<K, V>();
+        }
+    }
 }