CORD-1071 Refactor VTN node service

Done
- Separated interface, implementation and store for node management
- Added unit tests for node manager and handler
- Offloaded more of the event handling off of the Atomix event thread

Todo
- Add REST interface for the node service

Change-Id: Ibf90d3a621013497cc891ca3086db6648f5d49df
diff --git a/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
index 97f5f77..14f96ea 100644
--- a/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
+++ b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
@@ -16,99 +16,66 @@
 package org.opencord.cordvtn.impl;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.jcraft.jsch.Session;
+import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.Device;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.behaviour.BridgeConfig;
-import org.onosproject.net.behaviour.BridgeDescription;
-import org.onosproject.net.behaviour.BridgeName;
-import org.onosproject.net.behaviour.ControllerInfo;
-import org.onosproject.net.behaviour.DefaultBridgeDescription;
-import org.onosproject.net.behaviour.DefaultTunnelDescription;
-import org.onosproject.net.behaviour.InterfaceConfig;
-import org.onosproject.net.behaviour.TunnelDescription;
-import org.onosproject.net.behaviour.TunnelEndPoints;
-import org.onosproject.net.behaviour.TunnelKeys;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.device.DeviceAdminService;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.host.HostService;
-import org.onosproject.ovsdb.controller.OvsdbClientService;
-import org.onosproject.ovsdb.controller.OvsdbController;
-import org.onosproject.ovsdb.controller.OvsdbNodeId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.opencord.cordvtn.api.CordVtnConfig;
-import org.opencord.cordvtn.api.core.InstanceService;
-import org.opencord.cordvtn.api.net.CidrAddr;
-import org.opencord.cordvtn.api.node.ConnectionHandler;
 import org.opencord.cordvtn.api.node.CordVtnNode;
-import org.opencord.cordvtn.api.node.CordVtnNodeState;
-import org.opencord.cordvtn.api.node.SshAccessInfo;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeListener;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
+import org.opencord.cordvtn.api.node.CordVtnNodeStore;
+import org.opencord.cordvtn.api.node.CordVtnNodeStoreDelegate;
 import org.slf4j.Logger;
 
-import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.AnnotationKeys.PORT_NAME;
-import static org.onosproject.net.Device.Type.SWITCH;
-import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
 import static org.opencord.cordvtn.api.Constants.*;
-import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Reads node information from the network config file and handles the config
- * update events.
- * Only a leader controller performs the node addition or deletion.
+ * Manages the inventory of the cordvtn nodes provided via network configuration.
  */
 @Component(immediate = true)
-@Service(value = CordVtnNodeManager.class)
-public class CordVtnNodeManager {
+@Service
+public class CordVtnNodeManager extends ListenerRegistry<CordVtnNodeEvent, CordVtnNodeListener>
+        implements CordVtnNodeAdminService, CordVtnNodeService {
 
     protected final Logger log = getLogger(getClass());
 
-    private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
-            .register(KryoNamespaces.API)
-            .register(CordVtnNode.class)
-            .register(NodeState.class)
-            .register(SshAccessInfo.class)
-            .register(CidrAddr.class);
+    private static final String MSG_NODE  = "Node %s %s";
+    private static final String MSG_CREATED = "created";
+    private static final String MSG_UPDATED = "updated";
+    private static final String MSG_REMOVED = "removed";
 
-    private static final int DPID_BEGIN = 3;
+    private static final String ERR_NULL_NODE = "CordVtn node cannot be null";
+    private static final String ERR_NULL_HOSTNAME = "CordVtn node hostname cannot be null";
+    private static final String ERR_NULL_DEVICE_ID = "Device ID cannot be null";
+    private static final String ERR_NOT_FOUND = "does not exist";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -117,114 +84,32 @@
     protected NetworkConfigService configService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceAdminService adminService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected OvsdbController ovsdbController;
+    protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
+    protected CordVtnNodeStore nodeStore;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected HostService hostService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected InstanceService instanceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CordVtnPipeline pipeline;
-
-    private final ExecutorService eventExecutor =
-            newSingleThreadExecutor(groupedThreads("onos/cordvtn-node", "event-handler", log));
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
 
     private final NetworkConfigListener configListener = new InternalConfigListener();
-    private final DeviceListener deviceListener = new InternalDeviceListener();
-    private final MapEventListener<String, CordVtnNode> nodeStoreListener = new InternalMapListener();
+    private final CordVtnNodeStoreDelegate delegate = new InternalCordVtnNodeStoreDelegate();
 
-    private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
-    private final BridgeHandler bridgeHandler = new BridgeHandler();
-
-    private ConsistentMap<String, CordVtnNode> nodeStore;
-    private List<ControllerInfo> controllers = Lists.newArrayList();
     private ApplicationId appId;
     private NodeId localNodeId;
 
-    private enum NodeState implements CordVtnNodeState {
-
-        INIT {
-            @Override
-            public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
-                // make sure there is OVSDB connection
-                if (!nodeManager.isOvsdbConnected(node)) {
-                    nodeManager.connectOvsdb(node);
-                    return;
-                }
-                nodeManager.createIntegrationBridge(node);
-            }
-        },
-        BRIDGE_CREATED {
-            @Override
-            public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
-                // make sure there is OVSDB connection
-                if (!nodeManager.isOvsdbConnected(node)) {
-                    nodeManager.connectOvsdb(node);
-                    return;
-                }
-
-                nodeManager.createTunnelInterface(node);
-                nodeManager.addSystemInterface(node, node.dataIface());
-                if (node.hostMgmtIface().isPresent()) {
-                    nodeManager.addSystemInterface(node, node.hostMgmtIface().get());
-                }
-            }
-        },
-        PORTS_ADDED {
-            @Override
-            public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
-                nodeManager.setIpAddress(node);
-            }
-        },
-        COMPLETE {
-            @Override
-            public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
-                nodeManager.postInit(node);
-            }
-        },
-        INCOMPLETE {
-            @Override
-            public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
-            }
-        };
-
-        public abstract void process(CordVtnNodeManager nodeManager, CordVtnNode node);
-    }
-
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(CORDVTN_APP_ID);
         leadershipService.runForLeadership(appId.name());
         localNodeId = clusterService.getLocalNode().id();
 
-        nodeStore = storageService.<String, CordVtnNode>consistentMapBuilder()
-                .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
-                .withName("cordvtn-nodestore")
-                .withApplicationId(appId)
-                .build();
-
-        nodeStore.addListener(nodeStoreListener, eventExecutor);
-        deviceService.addListener(deviceListener);
+        nodeStore.setDelegate(delegate);
         configService.addListener(configListener);
 
-        readControllers();
         readNodes();
         log.info("Started");
     }
@@ -232,8 +117,7 @@
     @Deactivate
     protected void deactivate() {
         configService.removeListener(configListener);
-        deviceService.removeListener(deviceListener);
-        nodeStore.removeListener(nodeStoreListener);
+        nodeStore.unsetDelegate(delegate);
 
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
@@ -241,611 +125,60 @@
         log.info("Stopped");
     }
 
-    /**
-     * Adds or updates a new node to the service.
-     *
-     * @param node cordvtn node
-     */
-    public void addOrUpdateNode(CordVtnNode node) {
-        checkNotNull(node);
-        nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, getNodeState(node)));
+    @Override
+    public void createNode(CordVtnNode node) {
+        checkNotNull(node, ERR_NULL_NODE);
+        nodeStore.createNode(node);
+        log.info(format(MSG_NODE, node.hostname(), MSG_CREATED));
     }
 
-    /**
-     * Deletes a node from the service.
-     *
-     * @param node cordvtn node
-     */
-    public void deleteNode(CordVtnNode node) {
-        checkNotNull(node);
-        OvsdbClientService ovsdbClient = getOvsdbClient(node);
-        if (ovsdbClient != null && ovsdbClient.isConnected()) {
-            ovsdbClient.disconnect();
+    @Override
+    public void updateNode(CordVtnNode node) {
+        checkNotNull(node, ERR_NULL_NODE);
+        nodeStore.updateNode(node);
+        log.debug(format(MSG_NODE, node.hostname(), MSG_UPDATED));
+    }
+
+    @Override
+    public CordVtnNode removeNode(String hostname) {
+        checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
+        CordVtnNode removed = nodeStore.removeNode(hostname);
+        if (removed == null) {
+            log.warn(format(MSG_NODE, hostname, ERR_NOT_FOUND));
+            return null;
         }
-        nodeStore.remove(node.hostname());
+        log.info(format(MSG_NODE, hostname, MSG_REMOVED));
+        return removed;
     }
 
-    /**
-     * Returns node initialization state.
-     *
-     * @param node cordvtn node
-     * @return true if initial node setup is completed, otherwise false
-     */
-    public boolean isNodeInitComplete(CordVtnNode node) {
-        checkNotNull(node);
-        return isNodeStateComplete(node);
+    @Override
+    public Set<CordVtnNode> nodes() {
+        return nodeStore.nodes();
     }
 
-    /**
-     * Returns the number of the nodes known to the service.
-     *
-     * @return number of nodes
-     */
-    public int getNodeCount() {
-        return nodeStore.size();
-    }
-
-    /**
-     * Returns all nodes known to the service.
-     *
-     * @return list of nodes
-     */
-    public List<CordVtnNode> getNodes() {
-        return nodeStore.values().stream().map(Versioned::value).collect(Collectors.toList());
-    }
-
-    /**
-     * Returns all nodes in complete state.
-     *
-     * @return set of nodes
-     */
+    @Override
     public Set<CordVtnNode> completeNodes() {
-        return getNodes().stream().filter(this::isNodeStateComplete)
-                .collect(Collectors.toSet());
-    }
-
-    /**
-     * Returns physical data plane port number of a given device.
-     *
-     * @param deviceId integration bridge device id
-     * @return port number; null otherwise
-     */
-    public PortNumber dataPort(DeviceId deviceId) {
-        CordVtnNode node = nodeByBridgeId(deviceId);
-        if (node == null) {
-            log.debug("Failed to get node for {}", deviceId);
-            return null;
-        }
-
-        Optional<PortNumber> port = getPortNumber(deviceId, node.dataIface());
-        return port.isPresent() ? port.get() : null;
-    }
-
-    /**
-     * Returns physical data plane IP address of a given device.
-     *
-     * @param deviceId integration bridge device id
-     * @return ip address; null otherwise
-     */
-    public IpAddress dataIp(DeviceId deviceId) {
-        CordVtnNode node = nodeByBridgeId(deviceId);
-        if (node == null) {
-            log.debug("Failed to get node for {}", deviceId);
-            return null;
-        }
-        return node.dataIp().ip();
-    }
-
-    /**
-     * Returns tunnel port number of a given device.
-     *
-     * @param deviceId integration bridge device id
-     * @return port number
-     */
-    public PortNumber tunnelPort(DeviceId deviceId) {
-        Optional<PortNumber> port = getPortNumber(deviceId, DEFAULT_TUNNEL);
-        return port.isPresent() ? port.get() : null;
-    }
-
-    /**
-     * Returns host management interface port number if exists.
-     *
-     * @param deviceId integration bridge device id
-     * @return port number; null if it does not exist
-     */
-    public PortNumber hostManagementPort(DeviceId deviceId) {
-        CordVtnNode node = nodeByBridgeId(deviceId);
-        if (node == null) {
-            log.debug("Failed to get node for {}", deviceId);
-            return null;
-        }
-
-        if (node.hostMgmtIface().isPresent()) {
-            Optional<PortNumber> port = getPortNumber(deviceId, node.hostMgmtIface().get());
-            return port.isPresent() ? port.get() : null;
-        } else {
-            return null;
-        }
-    }
-
-    private Optional<PortNumber> getPortNumber(DeviceId deviceId, String portName) {
-        PortNumber port = deviceService.getPorts(deviceId).stream()
-                .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
-                        p.isEnabled())
-                .map(Port::number)
-                .findAny()
-                .orElse(null);
-        return Optional.ofNullable(port);
-    }
-
-    /**
-     * Returns if current node state saved in nodeStore is COMPLETE or not.
-     *
-     * @param node cordvtn node
-     * @return true if it's complete state, otherwise false
-     */
-    private boolean isNodeStateComplete(CordVtnNode node) {
-        checkNotNull(node);
-
         // the state saved in nodeStore can be wrong if IP address settings are changed
         // after the node init has been completed since there's no way to detect it
-        // getNodeState and checkNodeInitState always return correct answer but can be slow
-        Versioned<CordVtnNode> versionedNode = nodeStore.get(node.hostname());
-        CordVtnNodeState state = versionedNode.value().state();
-        return state != null && state.equals(NodeState.COMPLETE);
+        Set<CordVtnNode> nodes = nodes().stream()
+                .filter(node -> node.state() == COMPLETE)
+                .collect(Collectors.toSet());
+        return ImmutableSet.copyOf(nodes);
     }
 
-    /**
-     * Initiates node to serve virtual tenant network.
-     *
-     * @param node cordvtn node
-     */
-    private void initNode(CordVtnNode node) {
-        checkNotNull(node);
-
-        NodeState state = (NodeState) node.state();
-        log.debug("Processing node: {} state: {}", node.hostname(), state);
-
-        state.process(this, node);
+    @Override
+    public CordVtnNode node(String hostname) {
+        checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
+        return nodeStore.node(hostname);
     }
 
-    /**
-     * Performs tasks after node initialization.
-     * It disconnects unnecessary OVSDB connection and installs initial flow
-     * rules on the device.
-     *
-     * @param node cordvtn node
-     */
-    private void postInit(CordVtnNode node) {
-        checkNotNull(node);
-
-        // disconnect OVSDB session once the node bootstrap is done
-        OvsdbClientService ovsdbClient = getOvsdbClient(node);
-        if (ovsdbClient != null && ovsdbClient.isConnected()) {
-            ovsdbClient.disconnect();
-        }
-
-        pipeline.initPipeline(node);
-
-        // adds existing instances to the host list
-        deviceService.getPorts(node.integrationBridgeId()).stream()
-                .filter(port -> !node.systemIfaces().contains(portName(port)) &&
-                        !port.number().equals(PortNumber.LOCAL) &&
-                        port.isEnabled())
-                .forEach(port -> instanceService.addInstance(connectPoint(port)));
-
-        // removes stale instances from the host list
-        hostService.getHosts().forEach(host -> {
-            if (deviceService.getPort(
-                    host.location().deviceId(),
-                    host.location().port()) == null) {
-                instanceService.removeInstance(host.location());
-            }
-        });
-
-        log.info("Finished init {}", node.hostname());
-    }
-
-    /**
-     * Sets a new state for a given cordvtn node.
-     *
-     * @param node cordvtn node
-     * @param newState new node state
-     */
-    private void setNodeState(CordVtnNode node, NodeState newState) {
-        checkNotNull(node);
-        log.debug("Changed {} state: {}", node.hostname(), newState);
-        nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, newState));
-    }
-
-    /**
-     * Checks current state of a given cordvtn node and returns it.
-     *
-     * @param node cordvtn node
-     * @return node state
-     */
-    private NodeState getNodeState(CordVtnNode node) {
-        checkNotNull(node);
-        if (!isIntegrationBridgeCreated(node)) {
-            return NodeState.INIT;
-        }
-        for (String iface : node.systemIfaces()) {
-            if (!isIfaceCreated(node, iface)) {
-                return NodeState.BRIDGE_CREATED;
-            }
-        }
-        if (!isIpAddressSet(node)) {
-            return NodeState.PORTS_ADDED;
-        }
-        return NodeState.COMPLETE;
-    }
-
-    /**
-     * Returns connection state of OVSDB server for a given node.
-     *
-     * @param node cordvtn node
-     * @return true if it is connected, false otherwise
-     */
-    private boolean isOvsdbConnected(CordVtnNode node) {
-        checkNotNull(node);
-        OvsdbClientService ovsdbClient = getOvsdbClient(node);
-        return deviceService.isAvailable(node.ovsdbId()) &&
-                ovsdbClient != null &&
-                ovsdbClient.isConnected();
-    }
-
-    /**
-     * Connects to OVSDB server for a given node.
-     *
-     * @param node cordvtn node
-     */
-    private void connectOvsdb(CordVtnNode node) {
-        checkNotNull(node);
-        ovsdbController.connect(node.hostMgmtIp().ip(), node.ovsdbPort());
-    }
-
-    /**
-     * Returns OVSDB client for a given node.
-     *
-     * @param node cordvtn node
-     * @return ovsdb client, or null if there's no ovsdb connection
-     */
-    private OvsdbClientService getOvsdbClient(CordVtnNode node) {
-        checkNotNull(node);
-        OvsdbNodeId ovsdb = new OvsdbNodeId(
-                node.hostMgmtIp().ip(), node.ovsdbPort().toInt());
-        return ovsdbController.getOvsdbClient(ovsdb);
-    }
-
-    private void createIntegrationBridge(CordVtnNode node) {
-        Device device = deviceService.getDevice(node.ovsdbId());
-        if (device == null || !device.is(BridgeConfig.class)) {
-            log.error("Failed to create integration bridge on {}", node.ovsdbId());
-            return;
-        }
-
-        String dpid = node.integrationBridgeId().toString().substring(DPID_BEGIN);
-        BridgeDescription bridgeDesc = DefaultBridgeDescription.builder()
-                .name(INTEGRATION_BRIDGE)
-                .failMode(BridgeDescription.FailMode.SECURE)
-                .datapathId(dpid)
-                .disableInBand()
-                .controllers(controllers)
-                .build();
-
-        BridgeConfig bridgeConfig =  device.as(BridgeConfig.class);
-        bridgeConfig.addBridge(bridgeDesc);
-    }
-
-    private void createTunnelInterface(CordVtnNode node) {
-        Device device = deviceService.getDevice(node.ovsdbId());
-        if (device == null || !device.is(InterfaceConfig.class)) {
-            log.error("Failed to create tunnel interface on {}", node.ovsdbId());
-            return;
-        }
-
-        TunnelDescription tunnelDesc = DefaultTunnelDescription.builder()
-                .deviceId(INTEGRATION_BRIDGE)
-                .ifaceName(DEFAULT_TUNNEL)
-                .type(VXLAN)
-                .remote(TunnelEndPoints.flowTunnelEndpoint())
-                .key(TunnelKeys.flowTunnelKey())
-                .build();
-
-        InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
-        ifaceConfig.addTunnelMode(DEFAULT_TUNNEL, tunnelDesc);
-    }
-
-    private void addSystemInterface(CordVtnNode node, String ifaceName) {
-        Session session = connect(node.sshInfo());
-        if (session == null || !isInterfaceUp(session, ifaceName)) {
-            log.warn("Interface {} is not available on {}", ifaceName, node.hostname());
-            disconnect(session);
-            return;
-        } else {
-            disconnect(session);
-        }
-
-        Device device = deviceService.getDevice(node.ovsdbId());
-        if (!device.is(BridgeConfig.class)) {
-            log.error("BridgeConfig is not supported for {}", node.ovsdbId());
-            return;
-        }
-
-        BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
-        bridgeConfig.addPort(BridgeName.bridgeName(INTEGRATION_BRIDGE), ifaceName);
-    }
-
-    /**
-     * Flushes IP address from data plane interface and adds data plane IP address
-     * to integration bridge.
-     *
-     * @param node cordvtn node
-     */
-    private void setIpAddress(CordVtnNode node) {
-        Session session = connect(node.sshInfo());
-        if (session == null) {
-            log.debug("Failed to SSH to {}", node.hostname());
-            return;
-        }
-        getCurrentIps(session, INTEGRATION_BRIDGE).stream()
-                .filter(ip -> !ip.equals(node.localMgmtIp().ip()))
-                .filter(ip -> !ip.equals(node.dataIp().ip()))
-                .forEach(ip -> deleteIp(session, ip, INTEGRATION_BRIDGE));
-
-        boolean result = flushIp(session, node.dataIface()) &&
-                setInterfaceUp(session, node.dataIface()) &&
-                addIp(session, node.dataIp(), INTEGRATION_BRIDGE) &&
-                addIp(session, node.localMgmtIp(), INTEGRATION_BRIDGE) &&
-                setInterfaceUp(session, INTEGRATION_BRIDGE);
-
-        disconnect(session);
-        if (result) {
-            setNodeState(node, NodeState.COMPLETE);
-        }
-    }
-
-    private boolean isIntegrationBridgeCreated(CordVtnNode node) {
-        return deviceService.getDevice(node.integrationBridgeId()) != null &&
-                deviceService.isAvailable(node.integrationBridgeId());
-    }
-
-    private boolean isIfaceCreated(CordVtnNode node, String ifaceName) {
-        if (Strings.isNullOrEmpty(ifaceName)) {
-            return false;
-        }
-        return deviceService.getPorts(node.integrationBridgeId()).stream()
-                .anyMatch(p -> portName(p).contains(ifaceName) && p.isEnabled());
-    }
-
-    /**
-     * Checks if the IP addresses are correctly set.
-     *
-     * @param node cordvtn node
-     * @return true if the IP is set, false otherwise
-     */
-    private boolean isIpAddressSet(CordVtnNode node) {
-        Session session = connect(node.sshInfo());
-        if (session == null) {
-            log.debug("Failed to SSH to {}", node.hostname());
-            return false;
-        }
-
-        Set<IpAddress> intBrIps = getCurrentIps(session, INTEGRATION_BRIDGE);
-        boolean result = getCurrentIps(session, node.dataIface()).isEmpty() &&
-                isInterfaceUp(session, node.dataIface()) &&
-                intBrIps.contains(node.dataIp().ip()) &&
-                intBrIps.contains(node.localMgmtIp().ip()) &&
-                isInterfaceUp(session, INTEGRATION_BRIDGE);
-
-        disconnect(session);
-        return result;
-    }
-
-    /**
-     * Returns connect point of a given port.
-     *
-     * @param port port
-     * @return connect point
-     */
-    private ConnectPoint connectPoint(Port port) {
-        return new ConnectPoint(port.element().id(), port.number());
-    }
-
-    /**
-     * Returns cordvtn node associated with a given OVSDB device.
-     *
-     * @param ovsdbId OVSDB device id
-     * @return cordvtn node, null if it fails to find the node
-     */
-    private CordVtnNode nodeByOvsdbId(DeviceId ovsdbId) {
-        return getNodes().stream()
-                .filter(node -> node.ovsdbId().equals(ovsdbId))
-                .findFirst().orElse(null);
-    }
-
-    /**
-     * Returns cordvtn node associated with a given integration bridge.
-     *
-     * @param bridgeId device id of integration bridge
-     * @return cordvtn node, null if it fails to find the node
-     */
-    private CordVtnNode nodeByBridgeId(DeviceId bridgeId) {
-        return getNodes().stream()
-                .filter(node -> node.integrationBridgeId().equals(bridgeId))
-                .findFirst().orElse(null);
-    }
-
-    /**
-     * Returns port name.
-     *
-     * @param port port
-     * @return port name
-     */
-    private String portName(Port port) {
-        return port.annotations().value(PORT_NAME);
-    }
-
-    private class OvsdbHandler implements ConnectionHandler<Device> {
-
-        @Override
-        public void connected(Device device) {
-            CordVtnNode node = nodeByOvsdbId(device.id());
-            if (node != null) {
-                setNodeState(node, getNodeState(node));
-            } else {
-                log.info("{} is detected on unregistered node, ignore it.", device.id());
-            }
-        }
-
-        @Override
-        public void disconnected(Device device) {
-            log.debug("Device {} is disconnected", device.id());
-            adminService.removeDevice(device.id());
-        }
-    }
-
-    private class BridgeHandler implements ConnectionHandler<Device> {
-
-        @Override
-        public void connected(Device device) {
-            CordVtnNode node = nodeByBridgeId(device.id());
-            if (node != null) {
-                setNodeState(node, getNodeState(node));
-            } else {
-                log.info("{} is detected on unregistered node, ignore it.", device.id());
-            }
-        }
-
-        @Override
-        public void disconnected(Device device) {
-            CordVtnNode node = nodeByBridgeId(device.id());
-            if (node != null) {
-                log.warn("Integration Bridge is disconnected from {}", node.hostname());
-                setNodeState(node, NodeState.INCOMPLETE);
-            }
-        }
-
-        /**
-         * Handles port added situation.
-         * If the added port is tunnel or data plane interface, proceed to the remaining
-         * node initialization. Otherwise, do nothing.
-         *
-         * @param port port
-         */
-        public void portAdded(Port port) {
-            CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
-            String portName = portName(port);
-            if (node == null) {
-                log.info("{} is added to unregistered node, ignore it.", portName);
-                return;
-            }
-
-            if (node.systemIfaces().contains(portName)) {
-                setNodeState(node, getNodeState(node));
-            } else if (isNodeStateComplete(node)) {
-                // TODO move this logic to InstanceManager
-                instanceService.addInstance(connectPoint(port));
-            } else {
-                log.warn("Instance is detected on incomplete node, ignore it.", portName);
-            }
-        }
-
-        /**
-         * Handles port removed situation.
-         * If the removed port is tunnel or data plane interface, proceed to the remaining
-         * node initialization.Others, do nothing.
-         *
-         * @param port port
-         */
-        public void portRemoved(Port port) {
-            CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
-            String portName = portName(port);
-            if (node == null) {
-                log.info("{} is removed from unregistered node, ignore it.", portName);
-                return;
-            }
-
-            if (node.systemIfaces().contains(portName)) {
-                setNodeState(node, NodeState.INCOMPLETE);
-            } else if (isNodeStateComplete(node)) {
-                // TODO move this logic to InstanceManager
-                instanceService.removeInstance(connectPoint(port));
-            } else {
-                log.warn("VM is vanished from incomplete node, ignore it.", portName);
-            }
-        }
-    }
-
-    private class InternalDeviceListener implements DeviceListener {
-
-        @Override
-        public boolean isRelevant(DeviceEvent event) {
-            NodeId leaderNodeId = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leaderNodeId);
-        }
-
-        @Override
-        public void event(DeviceEvent event) {
-
-            Device device = event.subject();
-            ConnectionHandler<Device> handler = device.type().equals(SWITCH) ?
-                    bridgeHandler : ovsdbHandler;
-
-            switch (event.type()) {
-                case PORT_ADDED:
-                    eventExecutor.execute(() -> {
-                        log.info("Port {} is added to {}",
-                                 event.port().annotations().value(PORT_NAME),
-                                 event.subject().id());
-                        bridgeHandler.portAdded(event.port());
-                    });
-                    break;
-                case PORT_UPDATED:
-                    if (event.port().isEnabled()) {
-                        eventExecutor.execute(() -> {
-                            log.info("Port {} is added to {}",
-                                    event.port().annotations().value(PORT_NAME),
-                                    event.subject().id());
-                            bridgeHandler.portAdded(event.port());
-                        });
-                    } else {
-                        eventExecutor.execute(() -> {
-                            log.info("Port {} is removed from {}",
-                                    event.port().annotations().value(PORT_NAME),
-                                    event.subject().id());
-                            bridgeHandler.portRemoved(event.port());
-                        });
-                    }
-                    break;
-                case PORT_REMOVED:
-                    eventExecutor.execute(() -> {
-                        log.info("Port {} is removed from {}",
-                                event.port().annotations().value(PORT_NAME),
-                                event.subject().id());
-                        bridgeHandler.portRemoved(event.port());
-                    });
-                break;
-                case DEVICE_ADDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                    if (deviceService.isAvailable(device.id())) {
-                        eventExecutor.execute(() -> {
-                            log.info("Device {} is connected",
-                                     event.subject().id());
-                            handler.connected(device);
-                        });
-                    } else {
-                        eventExecutor.execute(() -> {
-                            log.info("Device {} is disconnected",
-                                     event.subject().id());
-                            handler.disconnected(device);
-                        });
-                    }
-                    break;
-                default:
-                    break;
-            }
-        }
+    @Override
+    public CordVtnNode node(DeviceId deviceId) {
+        checkNotNull(deviceId, ERR_NULL_DEVICE_ID);
+        return nodes().stream()
+                .filter(node -> node.integrationBridgeId().equals(deviceId) ||
+                        node.ovsdbId().equals(deviceId))
+                .findAny().orElse(null);
     }
 
     /**
@@ -860,21 +193,18 @@
 
         CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
         if (config == null) {
-            log.debug("No configuration found");
+            log.warn("No configuration found");
             return;
         }
-        config.cordVtnNodes().forEach(this::addOrUpdateNode);
-    }
-
-    private void readControllers() {
-        CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
-        if (config == null) {
-            log.debug("No configuration found");
-            return;
-        }
-        controllers = config.controllers();
-        controllers.forEach(ctrl -> {
-            log.debug("Added controller {}:{}", ctrl.ip(), ctrl.port());
+        config.cordVtnNodes().forEach(node -> {
+            log.info("Read node from network config: {}", node.hostname());
+            CordVtnNode existing = node(node.hostname());
+            if (existing == null) {
+                createNode(node);
+            } else if (!existing.equals(node)) {
+                // FIXME maybe we need to re-check node states
+                updateNode(node);
+            }
         });
     }
 
@@ -889,10 +219,7 @@
             switch (event.type()) {
                 case CONFIG_ADDED:
                 case CONFIG_UPDATED:
-                    eventExecutor.execute(() -> {
-                        readControllers();
-                        readNodes();
-                    });
+                    eventExecutor.execute(CordVtnNodeManager.this::readNodes);
                     break;
                 default:
                     break;
@@ -900,44 +227,12 @@
         }
     }
 
-    private class InternalMapListener implements MapEventListener<String, CordVtnNode> {
+    private class InternalCordVtnNodeStoreDelegate implements CordVtnNodeStoreDelegate {
 
         @Override
-        public void event(MapEvent<String, CordVtnNode> event) {
-            NodeId leaderNodeId = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leaderNodeId)) {
-                // do not allow to proceed without leadership
-                return;
-            }
-
-            CordVtnNode oldNode;
-            CordVtnNode newNode;
-
-            switch (event.type()) {
-            case UPDATE:
-                oldNode = event.oldValue().value();
-                newNode = event.newValue().value();
-
-                log.info("Reloaded {}", newNode.hostname());
-                if (!newNode.equals(oldNode)) {
-                    log.debug("New node: {}", newNode);
-                }
-                // performs init procedure even if the node is not changed
-                // for robustness since it's no harm to run init procedure
-                // multiple times
-                initNode(newNode);
-                break;
-            case INSERT:
-                newNode = event.newValue().value();
-                log.info("Added {}", newNode.hostname());
-                initNode(newNode);
-                break;
-            case REMOVE:
-                oldNode = event.oldValue().value();
-                log.info("Removed {}", oldNode.hostname());
-                break;
-            default:
-                break;
+        public void notify(CordVtnNodeEvent event) {
+            if (event != null) {
+                process(event);
             }
         }
     }