pom modifications to appify vtn
renaming package
diff --git a/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
new file mode 100644
index 0000000..c037c05
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
@@ -0,0 +1,1029 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.collect.Sets;
+import com.jcraft.jsch.Session;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.opencord.cordvtn.api.ConnectionHandler;
+import org.opencord.cordvtn.api.CordVtnConfig;
+import org.opencord.cordvtn.api.CordVtnNode;
+import org.opencord.cordvtn.api.CordVtnNodeState;
+import org.opencord.cordvtn.api.CordVtnService;
+import org.opencord.cordvtn.api.NetworkAddress;
+import org.opencord.cordvtn.api.SshAccessInfo;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.TunnelConfig;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelName;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.host.HostService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.cordvtn.impl.CordVtnPipeline.DEFAULT_TUNNEL;
+import static org.onosproject.net.Device.Type.SWITCH;
+import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
+import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Reads node information from the network config file and handles the config
+ * update events.
+ * Only a leader controller performs the node addition or deletion.
+ */
+@Component(immediate = true)
+@Service(value = CordVtnNodeManager.class)
+public class CordVtnNodeManager {
+
+ protected final Logger log = getLogger(getClass());
+
+ private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(CordVtnNode.class)
+ .register(NodeState.class)
+ .register(SshAccessInfo.class)
+ .register(NetworkAddress.class);
+
+ private static final String DEFAULT_BRIDGE = "br-int";
+ private static final String VPORT_PREFIX = "tap";
+ private static final String OK = "OK";
+ private static final String NO = "NO";
+
+ private static final Map<String, String> DEFAULT_TUNNEL_OPTIONS = new HashMap<String, String>() {
+ {
+ put("key", "flow");
+ put("remote_ip", "flow");
+ }
+ };
+ private static final int DPID_BEGIN = 3;
+ private static final int OFPORT = 6653;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry configRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceAdminService adminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OvsdbController controller;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnInstanceManager instanceManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnPipeline pipeline;
+
+ private final ExecutorService eventExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/cordvtn-node", "event-handler"));
+
+ private final NetworkConfigListener configListener = new InternalConfigListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final MapEventListener<String, CordVtnNode> nodeStoreListener = new InternalMapListener();
+
+ private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
+ private final BridgeHandler bridgeHandler = new BridgeHandler();
+
+ private ConsistentMap<String, CordVtnNode> nodeStore;
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ private enum NodeState implements CordVtnNodeState {
+
+ INIT {
+ @Override
+ public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
+ if (!nodeManager.isOvsdbConnected(node)) {
+ nodeManager.connectOvsdb(node);
+ } else {
+ nodeManager.createIntegrationBridge(node);
+ }
+ }
+ },
+ BRIDGE_CREATED {
+ @Override
+ public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
+ if (!nodeManager.isOvsdbConnected(node)) {
+ nodeManager.connectOvsdb(node);
+ } else {
+ nodeManager.createTunnelInterface(node);
+ nodeManager.addDataPlaneInterface(node);
+ }
+ }
+ },
+ PORTS_ADDED {
+ @Override
+ public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
+ nodeManager.setIpAddress(node);
+ }
+ },
+ COMPLETE {
+ @Override
+ public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
+ nodeManager.postInit(node);
+ }
+ },
+ INCOMPLETE {
+ @Override
+ public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
+ }
+ };
+
+ public abstract void process(CordVtnNodeManager nodeManager, CordVtnNode node);
+ }
+
+ @Activate
+ protected void activate() {
+ appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID);
+
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ nodeStore = storageService.<String, CordVtnNode>consistentMapBuilder()
+ .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
+ .withName("cordvtn-nodestore")
+ .withApplicationId(appId)
+ .build();
+
+ nodeStore.addListener(nodeStoreListener);
+ deviceService.addListener(deviceListener);
+ configService.addListener(configListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ configService.removeListener(configListener);
+ deviceService.removeListener(deviceListener);
+ nodeStore.removeListener(nodeStoreListener);
+
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ /**
+ * Adds or updates a new node to the service.
+ *
+ * @param node cordvtn node
+ */
+ public void addOrUpdateNode(CordVtnNode node) {
+ checkNotNull(node);
+ nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, getNodeState(node)));
+ }
+
+ /**
+ * Deletes a node from the service.
+ *
+ * @param node cordvtn node
+ */
+ public void deleteNode(CordVtnNode node) {
+ checkNotNull(node);
+
+ if (isOvsdbConnected(node)) {
+ disconnectOvsdb(node);
+ }
+
+ nodeStore.remove(node.hostname());
+ }
+
+ /**
+ * Returns node initialization state.
+ *
+ * @param node cordvtn node
+ * @return true if initial node setup is completed, otherwise false
+ */
+ public boolean isNodeInitComplete(CordVtnNode node) {
+ checkNotNull(node);
+ return nodeStore.containsKey(node.hostname()) && getNodeState(node).equals(NodeState.COMPLETE);
+ }
+
+ /**
+ * Returns detailed node initialization state.
+ *
+ * @param node cordvtn node
+ * @return string including detailed node init state
+ */
+ public String checkNodeInitState(CordVtnNode node) {
+ checkNotNull(node);
+
+ if (!nodeStore.containsKey(node.hostname())) {
+ log.warn("Node {} does not exist, add node first", node.hostname());
+ return null;
+ }
+
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.debug("Failed to SSH to {}", node.hostname());
+ return null;
+ }
+
+ Set<IpAddress> intBrIps = getCurrentIps(session, DEFAULT_BRIDGE);
+ String result = String.format(
+ "Current state : %s%n" +
+ "br-int created and connected to ONOS : %s (%s)%n" +
+ "VXLAN interface added to br-int : %s%n" +
+ "Data plane interface is added to br-int and enabled : %s (%s)%n" +
+ "IP flushed from data plane interface : %s (%s)%n" +
+ "Data plane IP added to br-int : %s (%s)%n" +
+ "Local management IP added to br-int : %s (%s)",
+ node.state(),
+ isBrIntCreated(node) ? OK : NO, node.intBrId(),
+ isTunnelIntfCreated(node) ? OK : NO,
+ isDataPlaneIntfAdded(node) ? OK : NO, node.dpIntf(),
+ isInterfaceUp(session, node.dpIntf()) &&
+ getCurrentIps(session, node.dpIntf()).isEmpty() ? OK : NO, node.dpIntf(),
+ intBrIps.contains(node.dpIp().ip()) ? OK : NO, node.dpIp().cidr(),
+ intBrIps.contains(node.localMgmtIp().ip()) ? OK : NO, node.localMgmtIp().cidr());
+
+ disconnect(session);
+
+ return result;
+ }
+
+ /**
+ * Returns the number of the nodes known to the service.
+ *
+ * @return number of nodes
+ */
+ public int getNodeCount() {
+ return nodeStore.size();
+ }
+
+ /**
+ * Returns all nodes known to the service.
+ *
+ * @return list of nodes
+ */
+ public List<CordVtnNode> getNodes() {
+ return nodeStore.values().stream().map(Versioned::value).collect(Collectors.toList());
+ }
+
+ /**
+ * Returns all nodes in complete state.
+ *
+ * @return set of nodes
+ */
+ public Set<CordVtnNode> completeNodes() {
+ return getNodes().stream().filter(this::isNodeInitComplete).collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns physical data plane port number of a given device.
+ *
+ * @param deviceId integration bridge device id
+ * @return port number; null otherwise
+ */
+ public PortNumber dpPort(DeviceId deviceId) {
+ CordVtnNode node = nodeByBridgeId(deviceId);
+ if (node == null) {
+ log.warn("Failed to get node for {}", deviceId);
+ return null;
+ }
+ Port port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> portName(p).contains(node.dpIntf()) &&
+ p.isEnabled())
+ .findFirst().orElse(null);
+
+ return port == null ? null : port.number();
+ }
+
+ /**
+ * Returns physical data plane IP address of a given device.
+ *
+ * @param deviceId integration bridge device id
+ * @return ip address; null otherwise
+ */
+ public IpAddress dpIp(DeviceId deviceId) {
+ CordVtnNode node = nodeByBridgeId(deviceId);
+ if (node == null) {
+ log.warn("Failed to get node for {}", deviceId);
+ return null;
+ }
+ return node.dpIp().ip();
+ }
+
+ /**
+ * Returns tunnel port number of a given device.
+ *
+ * @param deviceId integration bridge device id
+ * @return port number
+ */
+ public PortNumber tunnelPort(DeviceId deviceId) {
+ Port port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> portName(p).contains(DEFAULT_TUNNEL))
+ .findFirst().orElse(null);
+
+ return port == null ? null : port.number();
+ }
+
+ /**
+ * Returns if current node state saved in nodeStore is COMPLETE or not.
+ *
+ * @param node cordvtn node
+ * @return true if it's complete state, otherwise false
+ */
+ private boolean isNodeStateComplete(CordVtnNode node) {
+ checkNotNull(node);
+
+ // the state saved in nodeStore can be wrong if IP address settings are changed
+ // after the node init has been completed since there's no way to detect it
+ // getNodeState and checkNodeInitState always return correct answer but can be slow
+ Versioned<CordVtnNode> versionedNode = nodeStore.get(node.hostname());
+ CordVtnNodeState state = versionedNode.value().state();
+ return state != null && state.equals(NodeState.COMPLETE);
+ }
+
+ /**
+ * Initiates node to serve virtual tenant network.
+ *
+ * @param node cordvtn node
+ */
+ private void initNode(CordVtnNode node) {
+ checkNotNull(node);
+
+ NodeState state = (NodeState) node.state();
+ log.debug("Processing node: {} state: {}", node.hostname(), state);
+
+ state.process(this, node);
+ }
+
+ /**
+ * Performs tasks after node initialization.
+ * It disconnects unnecessary OVSDB connection and installs initial flow
+ * rules on the device.
+ *
+ * @param node cordvtn node
+ */
+ private void postInit(CordVtnNode node) {
+ disconnectOvsdb(node);
+ pipeline.initPipeline(node, dpPort(node.intBrId()), tunnelPort(node.intBrId()));
+
+ deviceService.getPorts(node.intBrId()).stream()
+ .filter(port -> portName(port).startsWith(VPORT_PREFIX) &&
+ port.isEnabled())
+ .forEach(port -> instanceManager.addInstance(connectPoint(port)));
+
+ hostService.getHosts().forEach(host -> {
+ if (deviceService.getPort(host.location().deviceId(),
+ host.location().port()) == null) {
+ instanceManager.removeInstance(connectPoint(host));
+ }
+ });
+
+ log.info("Finished init {}", node.hostname());
+ }
+
+ /**
+ * Sets a new state for a given cordvtn node.
+ *
+ * @param node cordvtn node
+ * @param newState new node state
+ */
+ private void setNodeState(CordVtnNode node, NodeState newState) {
+ checkNotNull(node);
+
+ log.debug("Changed {} state: {}", node.hostname(), newState);
+ nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, newState));
+ }
+
+ /**
+ * Checks current state of a given cordvtn node and returns it.
+ *
+ * @param node cordvtn node
+ * @return node state
+ */
+ private NodeState getNodeState(CordVtnNode node) {
+ checkNotNull(node);
+
+ if (isBrIntCreated(node) && isTunnelIntfCreated(node) &&
+ isDataPlaneIntfAdded(node) && isIpAddressSet(node)) {
+ return NodeState.COMPLETE;
+ } else if (isDataPlaneIntfAdded(node) && isTunnelIntfCreated(node)) {
+ return NodeState.PORTS_ADDED;
+ } else if (isBrIntCreated(node)) {
+ return NodeState.BRIDGE_CREATED;
+ } else {
+ return NodeState.INIT;
+ }
+ }
+
+ /**
+ * Returns connection state of OVSDB server for a given node.
+ *
+ * @param node cordvtn node
+ * @return true if it is connected, false otherwise
+ */
+ private boolean isOvsdbConnected(CordVtnNode node) {
+ checkNotNull(node);
+
+ OvsdbClientService ovsdbClient = getOvsdbClient(node);
+ return deviceService.isAvailable(node.ovsdbId()) &&
+ ovsdbClient != null && ovsdbClient.isConnected();
+ }
+
+ /**
+ * Connects to OVSDB server for a given node.
+ *
+ * @param node cordvtn node
+ */
+ private void connectOvsdb(CordVtnNode node) {
+ checkNotNull(node);
+
+ if (!nodeStore.containsKey(node.hostname())) {
+ log.warn("Node {} does not exist", node.hostname());
+ return;
+ }
+
+ if (!isOvsdbConnected(node)) {
+ controller.connect(node.hostMgmtIp().ip(), node.ovsdbPort());
+ }
+ }
+
+ /**
+ * Disconnects OVSDB server for a given node.
+ *
+ * @param node cordvtn node
+ */
+ private void disconnectOvsdb(CordVtnNode node) {
+ checkNotNull(node);
+
+ if (!nodeStore.containsKey(node.hostname())) {
+ log.warn("Node {} does not exist", node.hostname());
+ return;
+ }
+
+ if (isOvsdbConnected(node)) {
+ OvsdbClientService ovsdbClient = getOvsdbClient(node);
+ ovsdbClient.disconnect();
+ }
+ }
+
+ /**
+ * Returns OVSDB client for a given node.
+ *
+ * @param node cordvtn node
+ * @return OVSDB client, or null if it fails to get OVSDB client
+ */
+ private OvsdbClientService getOvsdbClient(CordVtnNode node) {
+ checkNotNull(node);
+
+ OvsdbClientService ovsdbClient = controller.getOvsdbClient(
+ new OvsdbNodeId(node.hostMgmtIp().ip(), node.ovsdbPort().toInt()));
+ if (ovsdbClient == null) {
+ log.trace("Couldn't find OVSDB client for {}", node.hostname());
+ }
+ return ovsdbClient;
+ }
+
+ /**
+ * Creates an integration bridge for a given node.
+ *
+ * @param node cordvtn node
+ */
+ private void createIntegrationBridge(CordVtnNode node) {
+ if (isBrIntCreated(node)) {
+ return;
+ }
+
+ List<ControllerInfo> controllers = new ArrayList<>();
+ Sets.newHashSet(clusterService.getNodes()).stream()
+ .forEach(controller -> {
+ ControllerInfo ctrlInfo = new ControllerInfo(controller.ip(), OFPORT, "tcp");
+ controllers.add(ctrlInfo);
+ });
+
+ String dpid = node.intBrId().toString().substring(DPID_BEGIN);
+
+ try {
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (device.is(BridgeConfig.class)) {
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(BridgeName.bridgeName(DEFAULT_BRIDGE), dpid, controllers);
+ } else {
+ log.warn("The bridging behaviour is not supported in device {}", device.id());
+ }
+ } catch (ItemNotFoundException e) {
+ log.warn("Failed to create integration bridge on {}", node.hostname());
+ }
+ }
+
+ /**
+ * Creates tunnel interface to the integration bridge for a given node.
+ *
+ * @param node cordvtn node
+ */
+ private void createTunnelInterface(CordVtnNode node) {
+ if (isTunnelIntfCreated(node)) {
+ return;
+ }
+
+ DefaultAnnotations.Builder optionBuilder = DefaultAnnotations.builder();
+ for (String key : DEFAULT_TUNNEL_OPTIONS.keySet()) {
+ optionBuilder.set(key, DEFAULT_TUNNEL_OPTIONS.get(key));
+ }
+
+ TunnelDescription description = new DefaultTunnelDescription(
+ null, null, VXLAN, TunnelName.tunnelName(DEFAULT_TUNNEL),
+ optionBuilder.build());
+
+ try {
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (device.is(TunnelConfig.class)) {
+ TunnelConfig tunnelConfig = device.as(TunnelConfig.class);
+ tunnelConfig.createTunnelInterface(BridgeName.bridgeName(DEFAULT_BRIDGE), description);
+ } else {
+ log.warn("The tunneling behaviour is not supported in device {}", device.id());
+ }
+ } catch (ItemNotFoundException e) {
+ log.warn("Failed to create tunnel interface on {}", node.hostname());
+ }
+ }
+
+ /**
+ * Adds data plane interface to a given node.
+ *
+ * @param node cordvtn node
+ */
+ private void addDataPlaneInterface(CordVtnNode node) {
+ if (isDataPlaneIntfAdded(node)) {
+ return;
+ }
+
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.debug("Failed to SSH to {}", node.hostname());
+ return;
+ }
+
+ if (!isInterfaceUp(session, node.dpIntf())) {
+ log.warn("Interface {} is not available", node.dpIntf());
+ return;
+ }
+ disconnect(session);
+
+ try {
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (device.is(BridgeConfig.class)) {
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addPort(BridgeName.bridgeName(DEFAULT_BRIDGE), node.dpIntf());
+ } else {
+ log.warn("The bridging behaviour is not supported in device {}", device.id());
+ }
+ } catch (ItemNotFoundException e) {
+ log.warn("Failed to add {} on {}", node.dpIntf(), node.hostname());
+ }
+ }
+
+ /**
+ * Flushes IP address from data plane interface and adds data plane IP address
+ * to integration bridge.
+ *
+ * @param node cordvtn node
+ */
+ private void setIpAddress(CordVtnNode node) {
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.debug("Failed to SSH to {}", node.hostname());
+ return;
+ }
+
+ getCurrentIps(session, DEFAULT_BRIDGE).stream()
+ .filter(ip -> !ip.equals(node.localMgmtIp().ip()))
+ .filter(ip -> !ip.equals(node.dpIp().ip()))
+ .forEach(ip -> deleteIp(session, ip, DEFAULT_BRIDGE));
+
+ boolean result = flushIp(session, node.dpIntf()) &&
+ setInterfaceUp(session, node.dpIntf()) &&
+ addIp(session, node.dpIp(), DEFAULT_BRIDGE) &&
+ addIp(session, node.localMgmtIp(), DEFAULT_BRIDGE) &&
+ setInterfaceUp(session, DEFAULT_BRIDGE);
+
+ disconnect(session);
+
+ if (result) {
+ setNodeState(node, NodeState.COMPLETE);
+ }
+ }
+
+ /**
+ * Checks if integration bridge exists and available.
+ *
+ * @param node cordvtn node
+ * @return true if the bridge is available, false otherwise
+ */
+ private boolean isBrIntCreated(CordVtnNode node) {
+ return (deviceService.getDevice(node.intBrId()) != null
+ && deviceService.isAvailable(node.intBrId()));
+ }
+
+ /**
+ * Checks if tunnel interface exists.
+ *
+ * @param node cordvtn node
+ * @return true if the interface exists, false otherwise
+ */
+ private boolean isTunnelIntfCreated(CordVtnNode node) {
+ return deviceService.getPorts(node.intBrId())
+ .stream()
+ .filter(p -> portName(p).contains(DEFAULT_TUNNEL) &&
+ p.isEnabled())
+ .findAny().isPresent();
+ }
+
+ /**
+ * Checks if data plane interface exists.
+ *
+ * @param node cordvtn node
+ * @return true if the interface exists, false otherwise
+ */
+ private boolean isDataPlaneIntfAdded(CordVtnNode node) {
+ return deviceService.getPorts(node.intBrId())
+ .stream()
+ .filter(p -> portName(p).contains(node.dpIntf()) &&
+ p.isEnabled())
+ .findAny().isPresent();
+ }
+
+ /**
+ * Checks if the IP addresses are correctly set.
+ *
+ * @param node cordvtn node
+ * @return true if the IP is set, false otherwise
+ */
+ private boolean isIpAddressSet(CordVtnNode node) {
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.debug("Failed to SSH to {}", node.hostname());
+ return false;
+ }
+
+ Set<IpAddress> intBrIps = getCurrentIps(session, DEFAULT_BRIDGE);
+ boolean result = getCurrentIps(session, node.dpIntf()).isEmpty() &&
+ isInterfaceUp(session, node.dpIntf()) &&
+ intBrIps.contains(node.dpIp().ip()) &&
+ intBrIps.contains(node.localMgmtIp().ip()) &&
+ isInterfaceUp(session, DEFAULT_BRIDGE);
+
+ disconnect(session);
+ return result;
+ }
+
+ /**
+ * Returns connect point of a given port.
+ *
+ * @param port port
+ * @return connect point
+ */
+ private ConnectPoint connectPoint(Port port) {
+ return new ConnectPoint(port.element().id(), port.number());
+ }
+
+ /**
+ * Returns connect point of a given host.
+ *
+ * @param host host
+ * @return connect point
+ */
+ private ConnectPoint connectPoint(Host host) {
+ return new ConnectPoint(host.location().deviceId(), host.location().port());
+ }
+
+ /**
+ * Returns cordvtn node associated with a given OVSDB device.
+ *
+ * @param ovsdbId OVSDB device id
+ * @return cordvtn node, null if it fails to find the node
+ */
+ private CordVtnNode nodeByOvsdbId(DeviceId ovsdbId) {
+ return getNodes().stream()
+ .filter(node -> node.ovsdbId().equals(ovsdbId))
+ .findFirst().orElse(null);
+ }
+
+ /**
+ * Returns cordvtn node associated with a given integration bridge.
+ *
+ * @param bridgeId device id of integration bridge
+ * @return cordvtn node, null if it fails to find the node
+ */
+ private CordVtnNode nodeByBridgeId(DeviceId bridgeId) {
+ return getNodes().stream()
+ .filter(node -> node.intBrId().equals(bridgeId))
+ .findFirst().orElse(null);
+ }
+
+ /**
+ * Returns port name.
+ *
+ * @param port port
+ * @return port name
+ */
+ private String portName(Port port) {
+ return port.annotations().value("portName");
+ }
+
+ private class OvsdbHandler implements ConnectionHandler<Device> {
+
+ @Override
+ public void connected(Device device) {
+ CordVtnNode node = nodeByOvsdbId(device.id());
+ if (node != null) {
+ setNodeState(node, getNodeState(node));
+ } else {
+ log.debug("{} is detected on unregistered node, ignore it.", device.id());
+ }
+ }
+
+ @Override
+ public void disconnected(Device device) {
+ if (!deviceService.isAvailable(device.id())) {
+ log.debug("Device {} is disconnected", device.id());
+ adminService.removeDevice(device.id());
+ }
+ }
+ }
+
+ private class BridgeHandler implements ConnectionHandler<Device> {
+
+ @Override
+ public void connected(Device device) {
+ CordVtnNode node = nodeByBridgeId(device.id());
+ if (node != null) {
+ setNodeState(node, getNodeState(node));
+ } else {
+ log.debug("{} is detected on unregistered node, ignore it.", device.id());
+ }
+ }
+
+ @Override
+ public void disconnected(Device device) {
+ CordVtnNode node = nodeByBridgeId(device.id());
+ if (node != null) {
+ log.debug("Integration Bridge is disconnected from {}", node.hostname());
+ setNodeState(node, NodeState.INCOMPLETE);
+ }
+ }
+
+ /**
+ * Handles port added situation.
+ * If the added port is tunnel or data plane interface, proceed to the remaining
+ * node initialization. Otherwise, do nothing.
+ *
+ * @param port port
+ */
+ public void portAdded(Port port) {
+ CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
+ String portName = portName(port);
+
+ if (node == null) {
+ log.debug("{} is added to unregistered node, ignore it.", portName);
+ return;
+ }
+
+ log.info("Port {} is added to {}", portName, node.hostname());
+
+ if (portName.startsWith(VPORT_PREFIX)) {
+ if (isNodeStateComplete(node)) {
+ instanceManager.addInstance(connectPoint(port));
+ } else {
+ log.debug("VM is detected on incomplete node, ignore it.", portName);
+ }
+ } else if (portName.contains(DEFAULT_TUNNEL) || portName.equals(node.dpIntf())) {
+ setNodeState(node, getNodeState(node));
+ }
+ }
+
+ /**
+ * Handles port removed situation.
+ * If the removed port is tunnel or data plane interface, proceed to the remaining
+ * node initialization.Others, do nothing.
+ *
+ * @param port port
+ */
+ public void portRemoved(Port port) {
+ CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
+ String portName = portName(port);
+
+ if (node == null) {
+ return;
+ }
+
+ log.info("Port {} is removed from {}", portName, node.hostname());
+
+ if (portName.startsWith(VPORT_PREFIX)) {
+ if (isNodeStateComplete(node)) {
+ instanceManager.removeInstance(connectPoint(port));
+ } else {
+ log.debug("VM is vanished from incomplete node, ignore it.", portName);
+ }
+ } else if (portName.contains(DEFAULT_TUNNEL) || portName.equals(node.dpIntf())) {
+ setNodeState(node, NodeState.INCOMPLETE);
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+
+ NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leaderNodeId)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+
+ Device device = event.subject();
+ ConnectionHandler<Device> handler =
+ (device.type().equals(SWITCH) ? bridgeHandler : ovsdbHandler);
+
+ switch (event.type()) {
+ case PORT_ADDED:
+ eventExecutor.execute(() -> bridgeHandler.portAdded(event.port()));
+ break;
+ case PORT_UPDATED:
+ if (!event.port().isEnabled()) {
+ eventExecutor.execute(() -> bridgeHandler.portRemoved(event.port()));
+ }
+ break;
+ case DEVICE_ADDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(device.id())) {
+ eventExecutor.execute(() -> handler.connected(device));
+ } else {
+ eventExecutor.execute(() -> handler.disconnected(device));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ /**
+ * Reads cordvtn nodes from config file.
+ */
+ private void readConfiguration() {
+ CordVtnConfig config = configRegistry.getConfig(appId, CordVtnConfig.class);
+ if (config == null) {
+ log.debug("No configuration found");
+ return;
+ }
+ config.cordVtnNodes().forEach(this::addOrUpdateNode);
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leaderNodeId)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+
+ if (!event.configClass().equals(CordVtnConfig.class)) {
+ return;
+ }
+
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ eventExecutor.execute(CordVtnNodeManager.this::readConfiguration);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InternalMapListener implements MapEventListener<String, CordVtnNode> {
+
+ @Override
+ public void event(MapEvent<String, CordVtnNode> event) {
+ NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leaderNodeId)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+
+ CordVtnNode oldNode;
+ CordVtnNode newNode;
+
+ switch (event.type()) {
+ case UPDATE:
+ oldNode = event.oldValue().value();
+ newNode = event.newValue().value();
+
+ log.info("Reloaded {}", newNode.hostname());
+ if (!newNode.equals(oldNode)) {
+ log.debug("New node: {}", newNode);
+ }
+ // performs init procedure even if the node is not changed
+ // for robustness since it's no harm to run init procedure
+ // multiple times
+ eventExecutor.execute(() -> initNode(newNode));
+ break;
+ case INSERT:
+ newNode = event.newValue().value();
+ log.info("Added {}", newNode.hostname());
+ eventExecutor.execute(() -> initNode(newNode));
+ break;
+ case REMOVE:
+ oldNode = event.oldValue().value();
+ log.info("Removed {}", oldNode.hostname());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}