CORD-151 Refactor cordvtn service to reduce complexity
Change-Id: I489e1d3df7f08d04d6b6a2aa23b9d4e6d7a054e4
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtn.java b/src/main/java/org/onosproject/cordvtn/CordVtn.java
index 072254d..cb8acab 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtn.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtn.java
@@ -21,24 +21,11 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.TpPort;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
@@ -57,11 +44,15 @@
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cordvtn.OvsdbNode.State;
import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
+import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECT;
+import static org.onosproject.net.Device.Type.SWITCH;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * CORD VTN Application that provisions overlay virtual tenant networks.
+ * Provides initial setup or cleanup for provisioning virtual tenant networks
+ * on ovsdb, integration bridge and vm when they are added or deleted.
*/
@Component(immediate = true)
@Service
@@ -69,6 +60,11 @@
protected final Logger log = getLogger(getClass());
+ private static final int NUM_THREADS = 1;
+ private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(OvsdbNode.class);
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -79,112 +75,81 @@
protected LogicalClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigService configService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry configRegistry;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
+ private final ExecutorService eventExecutor = Executors
+ .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler"));
- private static final int DEFAULT_NUM_THREADS = 1;
- private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(OvsdbNode.class);
-
- private final ExecutorService eventExecutor = Executors.newFixedThreadPool(
- DEFAULT_NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler"));
-
- private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final HostListener hostListener = new InternalHostListener();
- private final NodeHandler nodeHandler = new NodeHandler();
+
+ private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
private final BridgeHandler bridgeHandler = new BridgeHandler();
- private final VirtualMachineHandler vmHandler = new VirtualMachineHandler();
+ private final VmHandler vmHandler = new VmHandler();
- private final ConfigFactory configFactory =
- new ConfigFactory(SubjectFactories.APP_SUBJECT_FACTORY, CordVtnConfig.class, "cordvtn") {
- @Override
- public CordVtnConfig createConfig() {
- return new CordVtnConfig();
- }
- };
-
- private ApplicationId appId;
- private NodeId local;
private EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore;
- private NodeConnectionManager nodeConnectionManager;
@Activate
protected void activate() {
- appId = coreService.registerApplication("org.onosproject.cordvtn");
-
- local = clusterService.getLocalNode().id();
+ coreService.registerApplication("org.onosproject.cordvtn");
nodeStore = storageService.<DeviceId, OvsdbNode>eventuallyConsistentMapBuilder()
.withName("cordvtn-nodestore")
.withSerializer(NODE_SERIALIZER)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
- configRegistry.registerConfigFactory(configFactory);
deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
- leadershipService.addListener(leadershipListener);
- leadershipService.runForLeadership(appId.name());
- nodeConnectionManager = new NodeConnectionManager(appId, local, nodeStore,
- mastershipService, leadershipService);
- nodeConnectionManager.start();
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
- nodeConnectionManager.stop();
- leadershipService.removeListener(leadershipListener);
- leadershipService.withdraw(appId.name());
deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
+
eventExecutor.shutdown();
nodeStore.destroy();
- configRegistry.unregisterConfigFactory(configFactory);
+
log.info("Stopped");
}
@Override
- public void addNode(String hostname, IpAddress ip, TpPort port) {
- DefaultOvsdbNode node = new DefaultOvsdbNode(hostname, ip, port, DeviceId.NONE, INIT);
-
- if (nodeStore.containsKey(node.deviceId())) {
- log.warn("Node {} with ovsdb-server {}:{} already exists", hostname, ip, port);
+ public void addNode(OvsdbNode ovsdbNode) {
+ if (nodeStore.containsKey(ovsdbNode.deviceId())) {
+ log.warn("Node {} already exists", ovsdbNode.host());
return;
}
- nodeStore.put(node.deviceId(), node);
- log.info("New node {} with ovsdb-server {}:{} has been added", hostname, ip, port);
+ nodeStore.put(ovsdbNode.deviceId(), ovsdbNode);
+ if (ovsdbNode.state() != INIT) {
+ updateNode(ovsdbNode, INIT);
+ }
}
@Override
- public void deleteNode(IpAddress ip, TpPort port) {
- DeviceId deviceId = DeviceId.deviceId("ovsdb:" + ip + ":" + port);
- OvsdbNode node = nodeStore.get(deviceId);
-
- if (node == null) {
- log.warn("Node with ovsdb-server on {}:{} does not exist", ip, port);
+ public void deleteNode(OvsdbNode ovsdbNode) {
+ if (!nodeStore.containsKey(ovsdbNode.deviceId())) {
+ log.warn("Node {} does not exist", ovsdbNode.host());
return;
}
- nodeConnectionManager.disconnectNode(node);
- nodeStore.remove(node.deviceId());
+ updateNode(ovsdbNode, DISCONNECT);
+ }
+
+ @Override
+ public void updateNode(OvsdbNode ovsdbNode, State state) {
+ if (!nodeStore.containsKey(ovsdbNode.deviceId())) {
+ log.warn("Node {} does not exist", ovsdbNode.host());
+ return;
+ }
+ DefaultOvsdbNode updatedNode = new DefaultOvsdbNode(ovsdbNode.host(),
+ ovsdbNode.ip(),
+ ovsdbNode.port(),
+ state);
+ nodeStore.put(ovsdbNode.deviceId(), updatedNode);
}
@Override
@@ -193,58 +158,33 @@
}
@Override
+ public OvsdbNode getNode(DeviceId deviceId) {
+ return nodeStore.get(deviceId);
+ }
+
+ @Override
public List<OvsdbNode> getNodes() {
return nodeStore.values()
.stream()
.collect(Collectors.toList());
}
- private void initialSetup() {
- // Read ovsdb nodes from network config
- CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
- if (config == null) {
- log.warn("No configuration found");
- return;
- }
- config.ovsdbNodes().forEach(
- node -> addNode(node.hostname(), node.ip(), node.port()));
- }
-
- private synchronized void processLeadershipChange(NodeId leader) {
- // Only the leader performs the initial setup
- if (leader == null || !leader.equals(local)) {
- return;
- }
- initialSetup();
- }
-
- private class InternalLeadershipListener implements LeadershipEventListener {
-
- @Override
- public void event(LeadershipEvent event) {
- if (event.subject().topic().equals(appId.name())) {
- processLeadershipChange(event.subject().leader());
- }
- }
- }
-
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
- ConnectionHandler handler =
- (device.type() == Device.Type.CONTROLLER ? nodeHandler : bridgeHandler);
+ ConnectionHandler handler = (device.type() == SWITCH ? bridgeHandler : ovsdbHandler);
switch (event.type()) {
- case DEVICE_ADDED:
- eventExecutor.submit(() -> handler.connected(device));
- break;
- case DEVICE_AVAILABILITY_CHANGED:
- eventExecutor.submit(() -> handler.disconnected(device));
- break;
- default:
- break;
+ case DEVICE_ADDED:
+ eventExecutor.submit(() -> handler.connected(device));
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ eventExecutor.submit(() -> handler.disconnected(device));
+ break;
+ default:
+ break;
}
}
}
@@ -268,7 +208,7 @@
}
}
- private class NodeHandler implements ConnectionHandler<Device> {
+ private class OvsdbHandler implements ConnectionHandler<Device> {
@Override
public void connected(Device device) {
@@ -296,7 +236,7 @@
}
}
- private class VirtualMachineHandler implements ConnectionHandler<Host> {
+ private class VmHandler implements ConnectionHandler<Host> {
@Override
public void connected(Host host) {
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtnConfig.java b/src/main/java/org/onosproject/cordvtn/CordVtnConfig.java
index c2c37ab..fdaf752 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtnConfig.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtnConfig.java
@@ -27,12 +27,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Configuration object for CORD VTN service.
+ * Configuration object for CordVtn service.
*/
public class CordVtnConfig extends Config<ApplicationId> {
public static final String OVSDB_NODES = "ovsdbNodes";
- public static final String HOSTNAME = "hostname";
+ public static final String HOST = "host";
public static final String IP = "ip";
public static final String PORT = "port";
@@ -49,7 +49,7 @@
return null;
}
nodes.forEach(jsonNode -> ovsdbNodes.add(new OvsdbNodeConfig(
- jsonNode.path(HOSTNAME).asText(),
+ jsonNode.path(HOST).asText(),
IpAddress.valueOf(jsonNode.path(IP).asText()),
TpPort.tpPort(jsonNode.path(PORT).asInt()))));
@@ -57,27 +57,27 @@
}
/**
- * Configuration for an OVSDB node.
+ * Configuration for an ovsdb node.
*/
public static class OvsdbNodeConfig {
- private final String hostname;
+ private final String host;
private final IpAddress ip;
private final TpPort port;
- public OvsdbNodeConfig(String hostname, IpAddress ip, TpPort port) {
- this.hostname = checkNotNull(hostname);
+ public OvsdbNodeConfig(String host, IpAddress ip, TpPort port) {
+ this.host = checkNotNull(host);
this.ip = checkNotNull(ip);
this.port = checkNotNull(port);
}
/**
- * Returns hostname of the node.
+ * Returns host information of the node.
*
- * @return hostname
+ * @return host
*/
- public String hostname() {
- return this.hostname;
+ public String host() {
+ return this.host;
}
/**
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtnConfigManager.java b/src/main/java/org/onosproject/cordvtn/CordVtnConfigManager.java
new file mode 100644
index 0000000..043b376
--- /dev/null
+++ b/src/main/java/org/onosproject/cordvtn/CordVtnConfigManager.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cordvtn;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.slf4j.Logger;
+
+import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Reads node information from the network config file and handles the config
+ * update events.
+ * Only a leader controller performs the node addition or deletion.
+ */
+@Component(immediate = true)
+public class CordVtnConfigManager {
+
+ protected final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry configRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnService cordVtnService;
+
+ private final ConfigFactory configFactory =
+ new ConfigFactory(SubjectFactories.APP_SUBJECT_FACTORY, CordVtnConfig.class, "cordvtn") {
+ @Override
+ public CordVtnConfig createConfig() {
+ return new CordVtnConfig();
+ }
+ };
+
+ private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
+ private final NetworkConfigListener configListener = new InternalConfigListener();
+
+ private NodeId local;
+ private ApplicationId appId;
+
+ @Activate
+ protected void active() {
+ local = clusterService.getLocalNode().id();
+ appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID);
+
+ configService.addListener(configListener);
+ configRegistry.registerConfigFactory(configFactory);
+
+ leadershipService.addListener(leadershipListener);
+ leadershipService.runForLeadership(CordVtnService.CORDVTN_APP_ID);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.removeListener(leadershipListener);
+ leadershipService.withdraw(appId.name());
+
+ configRegistry.unregisterConfigFactory(configFactory);
+ configService.removeListener(configListener);
+ }
+
+ private void readConfiguration() {
+ CordVtnConfig config = configRegistry.getConfig(appId, CordVtnConfig.class);
+
+ if (config == null) {
+ log.warn("No configuration found");
+ return;
+ }
+
+ config.ovsdbNodes().forEach(node -> {
+ DefaultOvsdbNode ovsdbNode =
+ new DefaultOvsdbNode(node.host(), node.ip(), node.port(), INIT);
+ cordVtnService.addNode(ovsdbNode);
+ log.info("Add new node {}", node.host());
+ });
+ }
+
+ private synchronized void processLeadershipChange(NodeId leader) {
+ if (leader == null || !leader.equals(local)) {
+ return;
+ }
+ readConfiguration();
+ }
+
+ private class InternalLeadershipListener implements LeadershipEventListener {
+
+ @Override
+ public void event(LeadershipEvent event) {
+ if (event.subject().topic().equals(appId.name())) {
+ processLeadershipChange(event.subject().leader());
+ }
+ }
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ // TODO handle update event
+ }
+ }
+}
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtnService.java b/src/main/java/org/onosproject/cordvtn/CordVtnService.java
index d26a10a..1f75dce 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtnService.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtnService.java
@@ -15,8 +15,8 @@
*/
package org.onosproject.cordvtn;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.TpPort;
+import org.onosproject.cordvtn.OvsdbNode.State;
+import org.onosproject.net.DeviceId;
import java.util.List;
@@ -24,22 +24,30 @@
* Service for provisioning overlay virtual networks on compute nodes.
*/
public interface CordVtnService {
+
+ String CORDVTN_APP_ID = "org.onosproject.cordvtn";
/**
* Adds a new node to the service.
*
- * @param hostname hostname of the node
- * @param ip ip address to access the ovsdb server running on the node
- * @param port port number to access the ovsdb server running on the node
+ * @param ovsdbNode ovsdb node
*/
- void addNode(String hostname, IpAddress ip, TpPort port);
+ void addNode(OvsdbNode ovsdbNode);
/**
- * Deletes the node from the service.
+ * Deletes a node from the service.
*
- * @param ip ip address to access the ovsdb server running on the node
- * @param port port number to access the ovsdb server running on the node
+ * @param ovsdbNode ovsdb node
*/
- void deleteNode(IpAddress ip, TpPort port);
+ void deleteNode(OvsdbNode ovsdbNode);
+
+ /**
+ * Updates ovsdb node.
+ * It only used for updating node's connection state.
+ *
+ * @param ovsdbNode ovsdb node
+ * @param state ovsdb connection state
+ */
+ void updateNode(OvsdbNode ovsdbNode, State state);
/**
* Returns the number of the nodes known to the service.
@@ -49,6 +57,14 @@
int getNodeCount();
/**
+ * Returns OvsdbNode with given device id.
+ *
+ * @param deviceId device id
+ * @return ovsdb node
+ */
+ OvsdbNode getNode(DeviceId deviceId);
+
+ /**
* Returns all nodes known to the service.
*
* @return list of nodes
diff --git a/src/main/java/org/onosproject/cordvtn/DefaultOvsdbNode.java b/src/main/java/org/onosproject/cordvtn/DefaultOvsdbNode.java
index b8cdbe9..ce8b0f1 100644
--- a/src/main/java/org/onosproject/cordvtn/DefaultOvsdbNode.java
+++ b/src/main/java/org/onosproject/cordvtn/DefaultOvsdbNode.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.cordvtn;
+import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.net.DeviceId;
@@ -26,21 +27,15 @@
*/
public class DefaultOvsdbNode implements OvsdbNode {
- private final String hostname;
+ private final String host;
private final IpAddress ip;
private final TpPort port;
- private final DeviceId deviceId;
- private final DeviceId bridgeId;
private final State state;
- public DefaultOvsdbNode(String hostname, IpAddress ip, TpPort port,
- DeviceId bridgeId, State state) {
- this.hostname = hostname;
+ public DefaultOvsdbNode(String host, IpAddress ip, TpPort port, State state) {
+ this.host = host;
this.ip = ip;
this.port = port;
- this.deviceId = DeviceId.deviceId(
- "ovsdb:" + ip.toString() + ":" + port.toString());
- this.bridgeId = bridgeId;
this.state = state;
}
@@ -55,8 +50,8 @@
}
@Override
- public String hostname() {
- return this.hostname;
+ public String host() {
+ return this.host;
}
@Override
@@ -66,12 +61,12 @@
@Override
public DeviceId deviceId() {
- return this.deviceId;
+ return DeviceId.deviceId("ovsdb:" + this.ip.toString() + ":" + this.port.toString());
}
@Override
- public DeviceId bridgeId() {
- return this.bridgeId;
+ public DeviceId intBrId() {
+ return DeviceId.deviceId("of:" + this.host);
}
@Override
@@ -82,8 +77,9 @@
if (o instanceof DefaultOvsdbNode) {
DefaultOvsdbNode that = (DefaultOvsdbNode) o;
- // We compare the ip and port only.
- if (this.ip.equals(that.ip) && this.port.equals(that.port)) {
+ if (this.host.equals(that.host) &&
+ this.ip.equals(that.ip) &&
+ this.port.equals(that.port)) {
return true;
}
}
@@ -92,6 +88,16 @@
@Override
public int hashCode() {
- return Objects.hash(ip, port);
+ return Objects.hash(host, ip, port);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("host", host)
+ .add("ip", ip)
+ .add("port", port)
+ .add("state", state)
+ .toString();
}
}
diff --git a/src/main/java/org/onosproject/cordvtn/NodeConnectionManager.java b/src/main/java/org/onosproject/cordvtn/NodeConnectionManager.java
index 908aff2..ebba4cd 100644
--- a/src/main/java/org/onosproject/cordvtn/NodeConnectionManager.java
+++ b/src/main/java/org/onosproject/cordvtn/NodeConnectionManager.java
@@ -15,12 +15,19 @@
*/
package org.onosproject.cordvtn;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.DeviceId;
-import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
import org.slf4j.Logger;
import java.util.concurrent.Executors;
@@ -28,120 +35,131 @@
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cordvtn.OvsdbNode.State.CONNECTED;
+import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECTED;
+import static org.onosproject.cordvtn.OvsdbNode.State.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Node connection manager.
+ * Provides the connection state management of all nodes registered to the service
+ * so that the nodes keep connected unless it is requested to be deleted.
*/
+@Component(immediate = true)
public class NodeConnectionManager {
protected final Logger log = getLogger(getClass());
- private final ApplicationId appId;
- private final NodeId localId;
- private final EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore;
- private final MastershipService mastershipService;
- private final LeadershipService leadershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ CordVtnService cordVtnService;
private static final int DELAY_SEC = 5;
- private ScheduledExecutorService connectionExecutor;
- /**
- * Creates a new NodeConnectionManager.
- *
- * @param appId app id
- * @param localId local id
- * @param nodeStore node store
- * @param mastershipService mastership service
- * @param leadershipService leadership service
- */
- public NodeConnectionManager(ApplicationId appId, NodeId localId,
- EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore,
- MastershipService mastershipService,
- LeadershipService leadershipService) {
- this.appId = appId;
- this.localId = localId;
- this.nodeStore = nodeStore;
- this.mastershipService = mastershipService;
- this.leadershipService = leadershipService;
- }
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final ScheduledExecutorService connectionExecutor = Executors
+ .newSingleThreadScheduledExecutor(groupedThreads("onos/cordvtn", "connection-manager"));
- /**
- * Starts the node connection manager.
- */
- public void start() {
- connectionExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cordvtn", "connection-executor"));
- connectionExecutor.scheduleWithFixedDelay(() -> nodeStore.values()
+ private NodeId localId;
+
+ @Activate
+ protected void activate() {
+ localId = clusterService.getLocalNode().id();
+ deviceService.addListener(deviceListener);
+
+ connectionExecutor.scheduleWithFixedDelay(() -> cordVtnService.getNodes()
.stream()
.filter(node -> localId.equals(getMaster(node)))
- .forEach(this::connectNode), 0, DELAY_SEC, TimeUnit.SECONDS);
+ .forEach(node -> {
+ connect(node);
+ disconnect(node);
+ }), 0, DELAY_SEC, TimeUnit.SECONDS);
}
- /**
- * Stops the node connection manager.
- */
+ @Deactivate
public void stop() {
connectionExecutor.shutdown();
+ deviceService.removeListener(deviceListener);
}
- /**
- * Adds a new node to the system.
- *
- * @param ovsdbNode ovsdb node
- */
- public void connectNode(OvsdbNode ovsdbNode) {
+ public void connect(OvsdbNode ovsdbNode) {
switch (ovsdbNode.state()) {
case INIT:
case DISCONNECTED:
- // TODO: set the node to passive mode
+ setPassiveMode(ovsdbNode);
case READY:
- // TODO: initiate connection
- break;
- case CONNECTED:
+ setupConnection(ovsdbNode);
break;
default:
+ break;
}
}
- /**
- * Deletes the ovsdb node.
- *
- * @param ovsdbNode ovsdb node
- */
- public void disconnectNode(OvsdbNode ovsdbNode) {
+ public void disconnect(OvsdbNode ovsdbNode) {
switch (ovsdbNode.state()) {
- case CONNECTED:
+ case DISCONNECT:
// TODO: disconnect
break;
- case INIT:
- case READY:
- case DISCONNECTED:
- break;
default:
+ break;
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ Device device = event.subject();
+ if (device.type() != Device.Type.CONTROLLER) {
+ return;
+ }
+
+ DefaultOvsdbNode node;
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
+ if (node != null) {
+ cordVtnService.updateNode(node, CONNECTED);
+ }
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
+ if (node != null) {
+ cordVtnService.updateNode(node, DISCONNECTED);
+ }
+ break;
+ default:
+ break;
+ }
}
}
private NodeId getMaster(OvsdbNode ovsdbNode) {
- // Return the master of the bridge(switch) if it exist or
- // return the current leader
- if (ovsdbNode.bridgeId() == DeviceId.NONE) {
- return leadershipService.getLeader(this.appId.name());
- } else {
- return mastershipService.getMasterFor(ovsdbNode.bridgeId());
+ NodeId master = mastershipService.getMasterFor(ovsdbNode.intBrId());
+
+ // master is null if there's no such device
+ if (master == null) {
+ master = leadershipService.getLeader(CordVtnService.CORDVTN_APP_ID);
}
+ return master;
}
private void setPassiveMode(OvsdbNode ovsdbNode) {
// TODO: need ovsdb client implementation first
// TODO: set the remove ovsdb server passive mode
- // TODO: set the node state READY if it succeed
+ cordVtnService.updateNode(ovsdbNode, READY);
}
- private void connect(OvsdbNode ovsdbNode) {
- // TODO: need ovsdb client implementation first
- }
-
- private void disconnect(OvsdbNode ovsdbNode) {
- // TODO: need ovsdb client implementation first
+ private void setupConnection(OvsdbNode ovsdbNode) {
+ // TODO initiate connection
}
}
diff --git a/src/main/java/org/onosproject/cordvtn/OvsdbNode.java b/src/main/java/org/onosproject/cordvtn/OvsdbNode.java
index bb2a0b7..296bd43 100644
--- a/src/main/java/org/onosproject/cordvtn/OvsdbNode.java
+++ b/src/main/java/org/onosproject/cordvtn/OvsdbNode.java
@@ -24,51 +24,52 @@
*/
public interface OvsdbNode {
/**
- * State of the ovsdb node.
+ * Ovsdb connection state.
*/
enum State {
- INIT, READY, CONNECTED, DISCONNECTED
+ INIT, READY, CONNECTED, DISCONNECT, DISCONNECTED
}
/**
- * Returns the IP address of ovsdb server.
+ * Returns the IP address of the ovsdb server.
*
* @return ip address
*/
IpAddress ip();
/**
- * Returns the port number of ovsdb server.
+ * Returns the port number of the ovsdb server.
*
* @return port number
*/
TpPort port();
/**
- * Returns the hostname of the node.
+ * Returns the host information of the ovsdb server.
+ * It could be hostname or ip address.
*
- * @return hostname
+ * @return host
*/
- String hostname();
+ String host();
/**
- * Returns the state of the node.
+ * Returns the connection state of the ovsdb server.
*
- * @return state of the node
+ * @return connection state
*/
State state();
/**
- * Returns the device ID of the node.
+ * Returns the device id of the ovsdb server.
*
* @return device id
*/
DeviceId deviceId();
/**
- * Returns the device ID of the bridge associated with this node.
+ * Returns the device id of the integration bridge associated with the node.
*
* @return device id
*/
- DeviceId bridgeId();
+ DeviceId intBrId();
}