CORD-1071 Refactor VTN node service
Done
- Separated interface, implementation and store for node management
- Added unit tests for node manager and handler
- Offloaded more of the event handling off of the Atomix event thread
Todo
- Add REST interface for the node service
Change-Id: Ibf90d3a621013497cc891ca3086db6648f5d49df
diff --git a/pom.xml b/pom.xml
index f0a5782..ad76ead 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,18 +116,6 @@
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-api</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-net</artifactId>
- <version>${onos.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
<artifactId>onos-protocols-ovsdb-rfc</artifactId>
<version>${onos.version}</version>
</dependency>
@@ -140,6 +128,56 @@
</dependency>
<dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-common</artifactId>
+ <version>${onos.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${onos.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>1.6.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.6.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.53</version>
diff --git a/src/main/java/org/opencord/cordvtn/api/CordVtnConfig.java b/src/main/java/org/opencord/cordvtn/api/CordVtnConfig.java
index bb4ce5e..69ecbe6 100644
--- a/src/main/java/org/opencord/cordvtn/api/CordVtnConfig.java
+++ b/src/main/java/org/opencord/cordvtn/api/CordVtnConfig.java
@@ -28,12 +28,14 @@
import org.onlab.packet.TpPort;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.ControllerInfo;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.InvalidFieldException;
import org.opencord.cordvtn.api.net.CidrAddr;
import org.opencord.cordvtn.api.node.CordVtnNode;
import org.opencord.cordvtn.api.node.SshAccessInfo;
+import org.opencord.cordvtn.impl.DefaultCordVtnNode;
import org.slf4j.Logger;
import java.util.List;
@@ -196,31 +198,31 @@
String ovsdbPort = getConfig(object, OVSDB_PORT);
object.get(CORDVTN_NODES).forEach(vtnNode -> {
- CidrAddr localMgmt = CidrAddr.valueOf(get(LOCAL_MANAGEMENT_IP, ""));
- CidrAddr hostsMgmt = CidrAddr.valueOf(getConfig(vtnNode, HOST_MANAGEMENT_IP));
+ CidrAddr localMgmtIp = CidrAddr.valueOf(get(LOCAL_MANAGEMENT_IP, ""));
+ CidrAddr hostsMgmtIp = CidrAddr.valueOf(getConfig(vtnNode, HOST_MANAGEMENT_IP));
SshAccessInfo sshInfo = new SshAccessInfo(
- hostsMgmt.ip().getIp4Address(),
+ hostsMgmtIp.ip().getIp4Address(),
TpPort.tpPort(Integer.parseInt(getConfig(sshNode, SSH_PORT))),
getConfig(sshNode, SSH_USER),
getConfig(sshNode, SSH_KEY_FILE));
- CordVtnNode.Builder nodeBuilder = CordVtnNode.builder()
+ CordVtnNode.Builder nodeBuilder = DefaultCordVtnNode.builder()
.hostname(getConfig(vtnNode, HOSTNAME))
- .hostMgmtIp(hostsMgmt)
- .localMgmtIp(localMgmt)
- .dataIp(getConfig(vtnNode, DATA_IP))
+ .hostManagementIp(hostsMgmtIp)
+ .localManagementIp(localMgmtIp)
+ .dataIp(CidrAddr.valueOf(getConfig(vtnNode, DATA_IP)))
.sshInfo(sshInfo)
- .integrationBridgeId(getConfig(vtnNode, INTEGRATION_BRIDGE_ID))
- .dataIface(getConfig(vtnNode, DATA_IFACE));
+ .integrationBridgeId(DeviceId.deviceId(getConfig(vtnNode, INTEGRATION_BRIDGE_ID)))
+ .dataInterface(getConfig(vtnNode, DATA_IFACE));
if (!Strings.isNullOrEmpty(ovsdbPort)) {
- nodeBuilder.ovsdbPort(Integer.parseInt(ovsdbPort));
+ nodeBuilder.ovsdbPort(TpPort.tpPort(Integer.parseInt(ovsdbPort)));
}
String hostMgmtIface = getConfig(vtnNode, HOST_MANAGEMENT_IFACE);
if (!Strings.isNullOrEmpty(hostMgmtIface)) {
- nodeBuilder.hostMgmtIface(hostMgmtIface);
+ nodeBuilder.hostManagementInterface(hostMgmtIface);
}
nodes.add(nodeBuilder.build());
diff --git a/src/main/java/org/opencord/cordvtn/api/core/CordVtnPipeline.java b/src/main/java/org/opencord/cordvtn/api/core/CordVtnPipeline.java
new file mode 100644
index 0000000..278492a
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/core/CordVtnPipeline.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.core;
+
+import org.onlab.packet.VlanId;
+import org.onosproject.net.flow.FlowRule;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+
+/**
+ * Service providing cordvtn pipeline.
+ */
+public interface CordVtnPipeline {
+
+ // tables
+ int TABLE_ZERO = 0;
+ int TABLE_IN_PORT = 1;
+ int TABLE_ACCESS = 2;
+ int TABLE_IN_SERVICE = 3;
+ int TABLE_DST = 4;
+ int TABLE_TUNNEL_IN = 5;
+ int TABLE_VLAN = 6;
+
+ // priorities
+ int PRIORITY_MANAGEMENT = 55000;
+ int PRIORITY_HIGH = 50000;
+ int PRIORITY_DEFAULT = 5000;
+ int PRIORITY_LOW = 4000;
+ int PRIORITY_ZERO = 0;
+
+ VlanId VLAN_WAN = VlanId.vlanId((short) 500);
+
+ /**
+ * Initializes the pipeline for the supplied node.
+ *
+ * @param node cordvtn node
+ */
+ void initPipeline(CordVtnNode node);
+
+ /**
+ * Clean up the pipeline for all nodes.
+ */
+ void cleanupPipeline();
+
+ /**
+ * Processes the given flow rule.
+ *
+ * @param install install or remove
+ * @param rule flow rule to process
+ */
+ void processFlowRule(boolean install, FlowRule rule);
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/ConnectionHandler.java b/src/main/java/org/opencord/cordvtn/api/node/ConnectionHandler.java
deleted file mode 100644
index f3e3ad4..0000000
--- a/src/main/java/org/opencord/cordvtn/api/node/ConnectionHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.cordvtn.api.node;
-
-/**
- * Entity capable of handling a subject connected and disconnected situation.
- */
-public interface ConnectionHandler<T> {
-
- /**
- * Processes the connected subject.
- *
- * @param subject subject
- */
- void connected(T subject);
-
- /**
- * Processes the disconnected subject.
- *
- * @param subject subject.
- */
- void disconnected(T subject);
-}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNode.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNode.java
index 153eed3..1e873a4 100644
--- a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNode.java
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNode.java
@@ -15,489 +15,193 @@
*/
package org.opencord.cordvtn.api.node;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
import org.onlab.packet.TpPort;
import org.onosproject.net.DeviceId;
import org.opencord.cordvtn.api.net.CidrAddr;
-import java.util.Comparator;
-import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.opencord.cordvtn.api.Constants.DEFAULT_OVSDB_PORT;
-import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
-
/**
- * Representation of a compute infrastructure node for CORD VTN service.
+ * Representation of a compute node for service instance provisioning.
*/
-public final class CordVtnNode {
-
- private final String hostname;
- private final CidrAddr hostMgmtIp;
- private final CidrAddr localMgmtIp;
- private final CidrAddr dataIp;
- private final Optional<TpPort> ovsdbPort;
- private final SshAccessInfo sshInfo;
- private final DeviceId integrationBridgeId;
- private final String dataIface;
- private final Optional<String> hostMgmtIface;
- private final CordVtnNodeState state;
-
- public static final Comparator<CordVtnNode> CORDVTN_NODE_COMPARATOR =
- (node1, node2) -> node1.hostname().compareTo(node2.hostname());
+public interface CordVtnNode {
/**
- * Creates a new node.
- *
- * @param hostname hostname
- * @param hostMgmtIp host management network address
- * @param localMgmtIp local management network address
- * @param dataIp data network address
- * @param ovsdbPort port number for ovsdb connection
- * @param sshInfo ssh access information
- * @param integrationBridgeId integration bridge identifier
- * @param dataIface data plane interface name
- * @param hostMgmtIface host management network interface
- * @param state cordvtn node state
- */
- private CordVtnNode(String hostname,
- CidrAddr hostMgmtIp,
- CidrAddr localMgmtIp,
- CidrAddr dataIp,
- Optional<TpPort> ovsdbPort,
- SshAccessInfo sshInfo,
- DeviceId integrationBridgeId,
- String dataIface,
- Optional<String> hostMgmtIface,
- CordVtnNodeState state) {
- this.hostname = hostname;
- this.hostMgmtIp = hostMgmtIp;
- this.localMgmtIp = localMgmtIp;
- this.dataIp = dataIp;
- this.ovsdbPort = ovsdbPort;
- this.sshInfo = sshInfo;
- this.integrationBridgeId = integrationBridgeId;
- this.dataIface = dataIface;
- this.hostMgmtIface = hostMgmtIface;
- this.state = state;
- }
-
- /**
- * Returns cordvtn node with new state.
- *
- * @param node cordvtn node
- * @param state cordvtn node init state
- * @return cordvtn node
- */
- public static CordVtnNode getUpdatedNode(CordVtnNode node, CordVtnNodeState state) {
- return new CordVtnNode(node.hostname,
- node.hostMgmtIp, node.localMgmtIp, node.dataIp,
- node.ovsdbPort,
- node.sshInfo,
- node.integrationBridgeId,
- node.dataIface, node.hostMgmtIface,
- state);
- }
-
- /**
- * Returns the hostname.
+ * Returns the hostname of the node.
*
* @return hostname
*/
- public String hostname() {
- return this.hostname;
- }
+ String hostname();
/**
- * Returns the host management network address.
+ * Returns the host management IP address of the node.
*
- * @return network address
+ * @return ip address with cidr notation
*/
- public CidrAddr hostMgmtIp() {
- return this.hostMgmtIp;
- }
+ CidrAddr hostManagementIp();
/**
- * Returns the local management network address.
+ * Returns the local management IP address of the node.
*
- * @return network address
+ * @return ip address with the cidr notation
*/
- public CidrAddr localMgmtIp() {
- return this.localMgmtIp;
- }
+ // TODO remove this after dynamic provisioning of local management network
+ CidrAddr localManagementIp();
/**
- * Returns the data network address.
+ * Returns the data network IP address of the node.
*
- * @return network address
+ * @return ip address with the cidr notation
*/
- public CidrAddr dataIp() {
- return this.dataIp;
- }
+ CidrAddr dataIp();
/**
- * Returns the port number used for OVSDB connection.
- * It returns default OVSDB port 6640, if it's not specified.
+ * Returns the integration bridge device identifier.
*
- * @return port number
+ * @return device id
*/
- public TpPort ovsdbPort() {
- if (this.ovsdbPort.isPresent()) {
- return this.ovsdbPort.get();
- } else {
- return TpPort.tpPort(DEFAULT_OVSDB_PORT);
- }
- }
+ DeviceId integrationBridgeId();
+
+ /**
+ * Returns the data network interface name.
+ *
+ * @return interface name
+ */
+ String dataInterface();
+
+ /**
+ * Returns host management network interface name.
+ *
+ * @return interface name; null if not set
+ */
+ String hostManagementInterface();
+
+ /**
+ * Returns the port number of the OVSDB server.
+ *
+ * @return port number; 6640 if not set
+ */
+ TpPort ovsdbPort();
/**
* Returns the SSH access information.
*
* @return ssh access information
*/
- public SshAccessInfo sshInfo() {
- return this.sshInfo;
- }
-
- /**
- * Returns the identifier of the integration bridge.
- *
- * @return device id
- */
- public DeviceId integrationBridgeId() {
- return this.integrationBridgeId;
- }
-
- /**
- * Returns the identifier of the OVSDB device.
- *
- * @return device id
- */
- public DeviceId ovsdbId() {
- return DeviceId.deviceId("ovsdb:" + this.hostMgmtIp.ip().toString());
- }
-
- /**
- * Returns data network interface name.
- *
- * @return data network interface name
- */
- public String dataIface() {
- return this.dataIface;
- }
-
- /**
- * Returns host management network interface name.
- *
- * @return host management network interface name
- */
- public Optional<String> hostMgmtIface() {
- return this.hostMgmtIface;
- }
-
- /**
- * Returns a set of network interfaces for the VTN service to work properly.
- *
- * @return set of interface names
- */
- public Set<String> systemIfaces() {
- Set<String> ifaces = Sets.newHashSet(DEFAULT_TUNNEL, dataIface);
- if (hostMgmtIface.isPresent()) {
- ifaces.add(hostMgmtIface.get());
- }
- return ifaces;
- }
+ SshAccessInfo sshInfo();
/**
* Returns the state of the node.
*
* @return state
*/
- public CordVtnNodeState state() {
- return this.state;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj instanceof CordVtnNode) {
- CordVtnNode that = (CordVtnNode) obj;
- if (Objects.equals(hostname, that.hostname) &&
- Objects.equals(hostMgmtIp, that.hostMgmtIp) &&
- Objects.equals(localMgmtIp, that.localMgmtIp) &&
- Objects.equals(dataIp, that.dataIp) &&
- Objects.equals(ovsdbPort, that.ovsdbPort) &&
- Objects.equals(sshInfo, that.sshInfo) &&
- Objects.equals(integrationBridgeId, that.integrationBridgeId) &&
- Objects.equals(dataIface, that.dataIface) &&
- Objects.equals(hostMgmtIface, that.hostMgmtIface)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(hostname,
- hostMgmtIp,
- localMgmtIp,
- dataIp,
- ovsdbPort,
- sshInfo,
- integrationBridgeId,
- dataIface,
- hostMgmtIface);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("hostname", hostname)
- .add("hostMgmtIp", hostMgmtIp)
- .add("localMgmtIp", localMgmtIp)
- .add("dataIp", dataIp)
- .add("port", ovsdbPort)
- .add("sshInfo", sshInfo)
- .add("integrationBridgeId", integrationBridgeId)
- .add("dataIface", dataIface)
- .add("hostMgmtIface", hostMgmtIface)
- .add("state", state)
- .toString();
- }
+ CordVtnNodeState state();
/**
- * Returns new node builder instance.
+ * Returns the identifier of the OVSDB device.
*
- * @return cordvtn node builder
+ * @return device id
*/
- public static Builder builder() {
- return new Builder();
- }
+ DeviceId ovsdbId();
/**
- * Builder of node entities.
+ * Returns system interfaces of the node.
+ *
+ * @return set of interface names
*/
- public static final class Builder {
- private String hostname;
- private CidrAddr hostMgmtIp;
- private CidrAddr localMgmtIp;
- private CidrAddr dataIp;
- private Optional<TpPort> ovsdbPort =
- Optional.of(TpPort.tpPort(DEFAULT_OVSDB_PORT));
- private SshAccessInfo sshInfo;
- private DeviceId integrationBridgeId;
- private String dataIface;
- private Optional<String> hostMgmtIface = Optional.empty();
- private CordVtnNodeState state = CordVtnNodeState.noState();
+ Set<String> systemInterfaces();
- private Builder() {
- }
+ /**
+ * Builder of cordvtn node entities.
+ */
+ interface Builder {
/**
- * Builds an immutable cordvtn node.
+ * Returns new cordvtn node.
*
* @return cordvtn node
*/
- public CordVtnNode build() {
- // validate attributes
- checkArgument(!Strings.isNullOrEmpty(hostname));
- checkNotNull(hostMgmtIp);
- checkNotNull(localMgmtIp);
- checkNotNull(dataIp);
- checkNotNull(ovsdbPort);
- checkNotNull(sshInfo);
- checkNotNull(integrationBridgeId);
- checkNotNull(dataIface);
- checkNotNull(hostMgmtIface);
- return new CordVtnNode(hostname,
- hostMgmtIp, localMgmtIp, dataIp,
- ovsdbPort,
- sshInfo,
- integrationBridgeId,
- dataIface,
- hostMgmtIface,
- state);
- }
+ CordVtnNode build();
/**
- * Returns cordvtn node builder with hostname.
+ * Returns cordvtn node builder with the supplied hostname.
*
- * @param hostname hostname
+ * @param hostname hostname of the node
* @return cordvtn node builder
*/
- public Builder hostname(String hostname) {
- checkArgument(!Strings.isNullOrEmpty(hostname));
- this.hostname = hostname;
- return this;
- }
+ Builder hostname(String hostname);
/**
- * Returns cordvtn node builder with host management network IP address.
+ * Returns cordvtn node builder with the supplied host management IP.
*
- * @param hostMgmtIp host management netework ip address
+ * @param hostMgmtIp ip address with cidr notation
* @return cordvtn node builder
*/
- public Builder hostMgmtIp(CidrAddr hostMgmtIp) {
- checkNotNull(hostMgmtIp);
- this.hostMgmtIp = hostMgmtIp;
- return this;
- }
+ Builder hostManagementIp(CidrAddr hostMgmtIp);
/**
- * Returns cordvtn node builder with host management network IP address.
+ * Returns cordvtn node builder with the supplied local management IP.
*
- * @param cidr string value of the host management network ip address
+ * @param localMgmtIp ip address with cidr notation
* @return cordvtn node builder
*/
- public Builder hostMgmtIp(String cidr) {
- this.hostMgmtIp = CidrAddr.valueOf(cidr);
- return this;
- }
+ // TODO remove this after dynamic provisioning of local management network
+ Builder localManagementIp(CidrAddr localMgmtIp);
/**
- * Returns cordvtn node builder with local management network IP address.
+ * Returns cordvtn node builder with the supplied data IP.
*
- * @param localMgmtIp local management network ip address
+ * @param dataIp ip address with cidr notation
* @return cordvtn node builder
*/
- public Builder localMgmtIp(CidrAddr localMgmtIp) {
- checkNotNull(localMgmtIp);
- this.localMgmtIp = localMgmtIp;
- return this;
- }
+ Builder dataIp(CidrAddr dataIp);
/**
- * Returns cordvtn node builder with local management netework IP address.
+ * Returns cordvtn node builder with the supplied integration bridge identifier.
*
- * @param cidr string value of the local management network ip address
+ * @param bridgeId bridge identifier
* @return cordvtn node builder
*/
- public Builder localMgmtIp(String cidr) {
- this.localMgmtIp = CidrAddr.valueOf(cidr);
- return this;
- }
+ Builder integrationBridgeId(DeviceId bridgeId);
/**
- * Returns cordvtn node builder with data network IP address.
+ * Returns cordvtn node builder with the supplied data interface.
*
- * @param dataIp data network ip address
+ * @param dataIface interface name
* @return cordvtn node builder
*/
- public Builder dataIp(CidrAddr dataIp) {
- checkNotNull(dataIp);
- this.dataIp = dataIp;
- return this;
- }
+ Builder dataInterface(String dataIface);
/**
- * Returns cordvtn node builder with data network IP address.
+ * Returns cordvtn node builder with the supplied host management interface.
*
- * @param cidr string value of the data network ip address
+ * @param hostMgmtIface interface name
* @return cordvtn node builder
*/
- public Builder dataIp(String cidr) {
- this.dataIp = CidrAddr.valueOf(cidr);
- return this;
- }
+ Builder hostManagementInterface(String hostMgmtIface);
/**
- * Returns cordvtn node builder with OVSDB server listen port number.
+ * Returns cordvtn node builder with the supplied OVSDB port.
*
- * @param port ovsdb server listen port number
+ * @param ovsdbPort transport layer port number
* @return cordvtn node builder
*/
- public Builder ovsdbPort(TpPort port) {
- checkNotNull(port);
- this.ovsdbPort = Optional.of(port);
- return this;
- }
+ Builder ovsdbPort(TpPort ovsdbPort);
/**
- * Returns cordvtn node builder with OVSDB server listen port number.
+ * Returns cordvtn node builder with the supplied SSH access information.
*
- * @param port int value of the ovsdb server listen port number
- * @return cordvtn node builder
- */
- public Builder ovsdbPort(int port) {
- this.ovsdbPort = Optional.of(TpPort.tpPort(port));
- return this;
- }
-
- /**
- * Returns cordvtn node builder with SSH access information.
* @param sshInfo ssh access information
* @return cordvtn node builder
*/
- public Builder sshInfo(SshAccessInfo sshInfo) {
- checkNotNull(sshInfo);
- this.sshInfo = sshInfo;
- return this;
- }
+ Builder sshInfo(SshAccessInfo sshInfo);
/**
- * Returns cordvtn node builder with integration bridge ID.
+ * Returns cordvtn node builder with the supplied initialize state.
*
- * @param deviceId device id of the integration bridge
+ * @param state cordvtn node state
* @return cordvtn node builder
*/
- public Builder integrationBridgeId(DeviceId deviceId) {
- checkNotNull(deviceId);
- this.integrationBridgeId = deviceId;
- return this;
- }
-
- /**
- * Returns cordvtn node builder with integration bridge ID.
- *
- * @param deviceId string value of the integration bridge device id
- * @return cordvtn node builder
- */
- public Builder integrationBridgeId(String deviceId) {
- this.integrationBridgeId = DeviceId.deviceId(deviceId);
- return this;
- }
-
- /**
- * Returns cordvtn node builder with data network interface name.
- *
- * @param dataIface data network interface name
- * @return cordvtn node builder
- */
- public Builder dataIface(String dataIface) {
- checkArgument(!Strings.isNullOrEmpty(dataIface));
- this.dataIface = dataIface;
- return this;
- }
-
- /**
- * Returns cordvtn node builder with host management network interface.
- *
- * @param hostMgmtIface host management network interface name
- * @return cordvtn node builder
- */
- public Builder hostMgmtIface(String hostMgmtIface) {
- this.hostMgmtIface = Optional.ofNullable(hostMgmtIface);
- return this;
- }
-
- /**
- * Returns cordvtn node builder with init state.
- *
- * @param state init state
- * @return cordvtn node builder
- */
- public Builder state(CordVtnNodeState state) {
- checkNotNull(state);
- this.state = state;
- return this;
- }
+ Builder state(CordVtnNodeState state);
}
}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeAdminService.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeAdminService.java
new file mode 100644
index 0000000..b312185
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeAdminService.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+/**
+ * Service for administering the inventory of {@link CordVtnNode}.
+ */
+public interface CordVtnNodeAdminService {
+
+ /**
+ * Creates a new node.
+ *
+ * @param node cordvtn node
+ */
+ void createNode(CordVtnNode node);
+
+ /**
+ * Updates the node.
+ *
+ * @param node cordvtn node
+ */
+ void updateNode(CordVtnNode node);
+
+ /**
+ * Removes the node with the supplied hostname.
+ *
+ * @param hostname hostname of the node
+ * @return removed node; null if it fails
+ */
+ CordVtnNode removeNode(String hostname);
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeEvent.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeEvent.java
new file mode 100644
index 0000000..3b6651f
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Describes cordvtn node events.
+ */
+public class CordVtnNodeEvent extends AbstractEvent<CordVtnNodeEvent.Type, CordVtnNode> {
+
+ public enum Type {
+
+ /**
+ * Signifies that the new node is created.
+ */
+ NODE_CREATED,
+
+ /**
+ * Signifies that the node is updated.
+ */
+ NODE_UPDATED,
+
+ /**
+ * Signifies that the node is removed.
+ */
+ NODE_REMOVED,
+
+ /**
+ * Signifies that the node state is changed to complete.
+ */
+ NODE_COMPLETE,
+
+ /**
+ * Signifies that the node state is changed to incomplete.
+ */
+ NODE_INCOMPLETE
+ }
+
+ /**
+ * Creates an event of a given type and the specified node.
+ *
+ * @param type cordvtn node event type
+ * @param node cordvtn node subject
+ */
+ public CordVtnNodeEvent(Type type, CordVtnNode node) {
+ super(type, node);
+ }
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeHandler.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeHandler.java
new file mode 100644
index 0000000..7dc1eb2
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+/**
+ * Service providing cordvtn node bootstrapping.
+ */
+public interface CordVtnNodeHandler {
+
+ /**
+ * Processes the init state node.
+ *
+ * @param node cordvtn node
+ */
+ void processInitState(CordVtnNode node);
+
+ /**
+ * Processes the device created state node.
+ *
+ * @param node cordvtn node
+ */
+ void processDeviceCreatedState(CordVtnNode node);
+
+ /**
+ * Processes the port created state node.
+ *
+ * @param node cordvtn node
+ */
+ void processPortCreatedState(CordVtnNode node);
+
+ /**
+ * Processes the complete state node.
+ *
+ * @param node cordvtn node
+ */
+ void processCompleteState(CordVtnNode node);
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeListener.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeListener.java
new file mode 100644
index 0000000..91bd7ed
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener for cordvtn node event.
+ */
+public interface CordVtnNodeListener extends EventListener<CordVtnNodeEvent> {
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeService.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeService.java
new file mode 100644
index 0000000..afa9b17
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeService.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+
+import java.util.Set;
+
+/**
+ * Service for interfacing with the inventory of {@link CordVtnNode}.
+ */
+public interface CordVtnNodeService extends ListenerService<CordVtnNodeEvent, CordVtnNodeListener> {
+
+ /**
+ * Returns all nodes.
+ *
+ * @return set of nodes; empty set if no node presents
+ */
+ Set<CordVtnNode> nodes();
+
+ /**
+ * Returns nodes in complete state.
+ *
+ * @return set of nodes; empty set if no complete node presents
+ */
+ Set<CordVtnNode> completeNodes();
+
+ /**
+ * Returns the node with the given hostname.
+ *
+ * @param hostname hostname of the node
+ * @return cordvtn node; null if no node present with the hostname
+ */
+ CordVtnNode node(String hostname);
+
+ /**
+ * Returns the node with the given integration bridge device identifier.
+ *
+ * @param deviceId integration bridge device id
+ * @return cordvtn node; null if no node present with the device id
+ */
+ CordVtnNode node(DeviceId deviceId);
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeState.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeState.java
index e3d9cd5..49e02ff 100644
--- a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeState.java
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeState.java
@@ -18,13 +18,64 @@
/**
* Entity that defines possible init state of the cordvtn node.
*/
-public interface CordVtnNodeState {
+public enum CordVtnNodeState {
+
+ INIT {
+ @Override
+ public void process(CordVtnNodeHandler handler, CordVtnNode node) {
+ handler.processInitState(node);
+ }
+
+ @Override
+ public CordVtnNodeState nextState() {
+ return DEVICE_CREATED;
+ }
+ },
+ DEVICE_CREATED {
+ @Override
+ public void process(CordVtnNodeHandler handler, CordVtnNode node) {
+ handler.processDeviceCreatedState(node);
+ }
+
+ @Override
+ public CordVtnNodeState nextState() {
+ return PORT_CREATED;
+ }
+ },
+ PORT_CREATED {
+ @Override
+ public void process(CordVtnNodeHandler handler, CordVtnNode node) {
+ handler.processPortCreatedState(node);
+ }
+
+ @Override
+ public CordVtnNodeState nextState() {
+ return COMPLETE;
+ }
+ },
+ COMPLETE {
+ @Override
+ public void process(CordVtnNodeHandler handler, CordVtnNode node) {
+ handler.processCompleteState(node);
+ }
+
+ @Override
+ public CordVtnNodeState nextState() {
+ // last state
+ return COMPLETE;
+ }
+ };
+
/**
- * Returns null for no state.
+ * Processes the current node state to proceed to the next state.
*
- * @return null
+ * @param handler cordvtn node state handler
+ * @param node cordvtn node
*/
- static CordVtnNodeState noState() {
- return null;
- }
+ public abstract void process(CordVtnNodeHandler handler, CordVtnNode node);
+
+ /**
+ * Returns the next node state.
+ */
+ public abstract CordVtnNodeState nextState();
}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStore.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStore.java
new file mode 100644
index 0000000..d4626d5
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStore.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.store.Store;
+
+import java.util.Set;
+
+/**
+ * Manages the inventory of cordvtn nodes; not intended for direct use.
+ */
+public interface CordVtnNodeStore extends Store<CordVtnNodeEvent, CordVtnNodeStoreDelegate> {
+
+ /**
+ * Returns all nodes.
+ *
+ * @return set of nodes; empty set if no node presents
+ */
+ Set<CordVtnNode> nodes();
+
+ /**
+ * Returns the node with the given hostname.
+ *
+ * @param hostname hostname of the node
+ * @return cordvtn node; null if no node present with the hostname
+ */
+ CordVtnNode node(String hostname);
+
+ /**
+ * Creates a new node.
+ *
+ * @param node cordvtn node
+ */
+ void createNode(CordVtnNode node);
+
+ /**
+ * Updates the node.
+ *
+ * @param node cordvtn node
+ */
+ void updateNode(CordVtnNode node);
+
+ /**
+ * Removes the node with the supplied hostname.
+ *
+ * @param hostname hostname of the node
+ * @return removed node; null if it failed
+ */
+ CordVtnNode removeNode(String hostname);
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStoreDelegate.java b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStoreDelegate.java
new file mode 100644
index 0000000..366dea5
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/CordVtnNodeStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * CordVtn Node store delegate.
+ */
+public interface CordVtnNodeStoreDelegate extends StoreDelegate<CordVtnNodeEvent> {
+}
diff --git a/src/main/java/org/opencord/cordvtn/api/node/DeviceHandler.java b/src/main/java/org/opencord/cordvtn/api/node/DeviceHandler.java
new file mode 100644
index 0000000..b6e49c1
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/api/node/DeviceHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.api.node;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.Port;
+
+/**
+ * Entity capable of handling a device state updates.
+ */
+public interface DeviceHandler {
+
+ /**
+ * Processes the connected device.
+ *
+ * @param device device
+ */
+ void connected(Device device);
+
+ /**
+ * Processes the disconnected device.
+ *
+ * @param device device.
+ */
+ void disconnected(Device device);
+
+ /**
+ * Processes newly added port.
+ *
+ * @param port port
+ */
+ default void portAdded(Port port) {
+ // do nothing by default
+ }
+
+ /**
+ * Processes the updated port.
+ *
+ * @param port port
+ */
+ default void portUpdated(Port port) {
+ // do nothing by default
+ }
+
+ /**
+ * Processes the removed port.
+ *
+ * @param port port
+ */
+ default void portRemoved(Port port) {
+ // do nothing by default
+ }
+}
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeCheckCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeCheckCommand.java
index 9e988ca..e3bd85d 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeCheckCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeCheckCommand.java
@@ -23,7 +23,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.opencord.cordvtn.api.node.CordVtnNode;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceService;
@@ -44,18 +44,15 @@
required = true, multiValued = false)
private String hostname = null;
- private static final String COMPLETE = "COMPLETE";
- private static final String INCOMPLETE = "INCOMPLETE";
- private static final String HINT = "hint: try init again if the state is INCOMPLETE" +
- " but all settings OK";
+ private static final String HINT = "hint: try init again if the state is not" +
+ " COMPLETE but all settings are OK";
@Override
protected void execute() {
- CordVtnNodeManager nodeManager = AbstractShellCommand.get(CordVtnNodeManager.class);
+ CordVtnNodeService nodeService = AbstractShellCommand.get(CordVtnNodeService.class);
DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
- CordVtnNode node = nodeManager.getNodes()
- .stream()
+ CordVtnNode node = nodeService.nodes().stream()
.filter(n -> n.hostname().equals(hostname))
.findFirst()
.orElse(null);
@@ -64,7 +61,7 @@
print("Cannot find %s from registered nodes", hostname);
return;
}
- print("Current state: %s (%s)", getState(nodeManager, node), HINT);
+ print("Current state: %s (%s)", node.state().name(), HINT);
print("%n[Integration Bridge Status]");
Device device = deviceService.getDevice(node.integrationBridgeId());
if (device != null) {
@@ -75,7 +72,7 @@
deviceService.isAvailable(device.id()),
device.annotations());
- node.systemIfaces().stream().forEach(iface -> print(
+ node.systemInterfaces().forEach(iface -> print(
getPortState(deviceService, node.integrationBridgeId(), iface)));
} else {
print("%s %s=%s is not available",
@@ -89,7 +86,8 @@
if (session != null) {
Set<IpAddress> ips = getCurrentIps(session, INTEGRATION_BRIDGE);
boolean isUp = isInterfaceUp(session, INTEGRATION_BRIDGE);
- boolean isIp = ips.contains(node.dataIp().ip()) && ips.contains(node.localMgmtIp().ip());
+ boolean isIp = ips.contains(node.dataIp().ip()) &&
+ ips.contains(node.localManagementIp().ip());
print("%s %s up=%s Ips=%s",
isUp && isIp ? MSG_OK : MSG_NO,
@@ -97,9 +95,9 @@
isUp ? Boolean.TRUE : Boolean.FALSE,
getCurrentIps(session, INTEGRATION_BRIDGE));
- print(getSystemIfaceState(session, node.dataIface()));
- if (node.hostMgmtIface().isPresent()) {
- print(getSystemIfaceState(session, node.hostMgmtIface().get()));
+ print(getSystemIfaceState(session, node.dataInterface()));
+ if (node.hostManagementInterface() != null) {
+ print(getSystemIfaceState(session, node.hostManagementInterface()));
}
disconnect(session);
@@ -108,7 +106,8 @@
}
}
- private String getPortState(DeviceService deviceService, DeviceId deviceId, String portName) {
+ private String getPortState(DeviceService deviceService, DeviceId deviceId,
+ String portName) {
Port port = deviceService.getPorts(deviceId).stream()
.filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
p.isEnabled())
@@ -116,11 +115,11 @@
if (port != null) {
return String.format("%s %s portNum=%s enabled=%s %s",
- port.isEnabled() ? MSG_OK : MSG_NO,
- portName,
- port.number(),
- port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
- port.annotations());
+ port.isEnabled() ? MSG_OK : MSG_NO,
+ portName,
+ port.number(),
+ port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
+ port.annotations());
} else {
return String.format("%s %s does not exist", MSG_NO, portName);
}
@@ -135,8 +134,4 @@
isUp ? Boolean.TRUE : Boolean.FALSE,
isIp ? Boolean.TRUE : Boolean.FALSE);
}
-
- private String getState(CordVtnNodeManager nodeManager, CordVtnNode node) {
- return nodeManager.isNodeInitComplete(node) ? COMPLETE : INCOMPLETE;
- }
}
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeDeleteCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeDeleteCommand.java
index d098efe..eea9f44 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeDeleteCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeDeleteCommand.java
@@ -19,11 +19,9 @@
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
import org.opencord.cordvtn.api.node.CordVtnNode;
-import java.util.NoSuchElementException;
-
/**
* Deletes nodes from the service.
*/
@@ -37,21 +35,14 @@
@Override
protected void execute() {
- CordVtnNodeManager nodeManager = AbstractShellCommand.get(CordVtnNodeManager.class);
+ CordVtnNodeAdminService nodeAdminService =
+ AbstractShellCommand.get(CordVtnNodeAdminService.class);
for (String hostname : hostnames) {
- CordVtnNode node;
- try {
- node = nodeManager.getNodes()
- .stream()
- .filter(n -> n.hostname().equals(hostname))
- .findFirst().get();
- } catch (NoSuchElementException e) {
+ CordVtnNode node = nodeAdminService.removeNode(hostname);
+ if (node == null) {
print("Unable to find %s", hostname);
- continue;
}
-
- nodeManager.deleteNode(node);
}
}
}
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeInitCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeInitCommand.java
index c7da215..1a2650e 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeInitCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeInitCommand.java
@@ -19,10 +19,9 @@
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
import org.opencord.cordvtn.api.node.CordVtnNode;
-
-import java.util.NoSuchElementException;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
/**
* Initializes nodes for CordVtn service.
@@ -37,21 +36,17 @@
@Override
protected void execute() {
- CordVtnNodeManager nodeManager = AbstractShellCommand.get(CordVtnNodeManager.class);
+ CordVtnNodeService nodeService = AbstractShellCommand.get(CordVtnNodeService.class);
+ CordVtnNodeAdminService nodeAdminService =
+ AbstractShellCommand.get(CordVtnNodeAdminService.class);
for (String hostname : hostnames) {
- CordVtnNode node;
- try {
- node = nodeManager.getNodes()
- .stream()
- .filter(n -> n.hostname().equals(hostname))
- .findFirst().get();
- } catch (NoSuchElementException e) {
+ CordVtnNode node = nodeService.node(hostname);
+ if (node == null) {
print("Unable to find %s", hostname);
- continue;
+ } else {
+ nodeAdminService.updateNode(node);
}
-
- nodeManager.addOrUpdateNode(node);
}
}
}
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeListCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeListCommand.java
index 7086a09..f1db3fb 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeListCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnNodeListCommand.java
@@ -18,9 +18,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.opencord.cordvtn.api.node.CordVtnNode;
import java.util.Comparator;
@@ -35,19 +36,17 @@
description = "Lists all nodes registered in CORD VTN service")
public class CordVtnNodeListCommand extends AbstractShellCommand {
- private static final String COMPLETE = "COMPLETE";
- private static final String INCOMPLETE = "INCOMPLETE";
private static final String FORMAT = "%-30s%-20s%-20s%-15s%-24s%s";
@Override
protected void execute() {
- CordVtnNodeManager nodeManager = AbstractShellCommand.get(CordVtnNodeManager.class);
- List<CordVtnNode> nodes = nodeManager.getNodes();
+ CordVtnNodeService nodeService = AbstractShellCommand.get(CordVtnNodeService.class);
+ List<CordVtnNode> nodes = Lists.newArrayList(nodeService.nodes());
nodes.sort(Comparator.comparing(CordVtnNode::hostname));
if (outputJson()) {
try {
- print("%s", mapper().writeValueAsString(json(nodeManager, nodes)));
+ print("%s", mapper().writeValueAsString(json(nodes)));
} catch (JsonProcessingException e) {
print("Failed to list networks in JSON format");
}
@@ -57,31 +56,27 @@
for (CordVtnNode node : nodes) {
print(FORMAT, node.hostname(),
- node.hostMgmtIp().cidr(),
+ node.hostManagementIp().cidr(),
node.dataIp().cidr(),
- node.dataIface(),
+ node.dataInterface(),
node.integrationBridgeId().toString(),
- getState(nodeManager, node));
+ node.state().name());
}
- print("Total %s nodes", nodeManager.getNodeCount());
+ print("Total %s nodes", nodes.size());
}
}
- private JsonNode json(CordVtnNodeManager nodeManager, List<CordVtnNode> nodes) {
+ private JsonNode json(List<CordVtnNode> nodes) {
ArrayNode result = mapper().enable(INDENT_OUTPUT).createArrayNode();
for (CordVtnNode node : nodes) {
result.add(mapper().createObjectNode()
.put("hostname", node.hostname())
- .put("managementIp", node.hostMgmtIp().cidr())
+ .put("managementIp", node.hostManagementIp().cidr())
.put("dataIp", node.dataIp().cidr())
- .put("dataInterface", node.dataIface())
+ .put("dataInterface", node.dataInterface())
.put("bridgeId", node.integrationBridgeId().toString())
- .put("state", getState(nodeManager, node)));
+ .put("state", node.state().name()));
}
return result;
}
-
- private String getState(CordVtnNodeManager nodeManager, CordVtnNode node) {
- return nodeManager.isNodeInitComplete(node) ? COMPLETE : INCOMPLETE;
- }
}
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnPurgeRulesCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnPurgeRulesCommand.java
index 06f5aee..e5a4ec5 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnPurgeRulesCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnPurgeRulesCommand.java
@@ -17,7 +17,7 @@
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.cordvtn.impl.CordVtnPipeline;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
/**
* Purges all flow rules installed by CORD VTN service.
@@ -29,6 +29,6 @@
@Override
protected void execute() {
CordVtnPipeline pipeline = AbstractShellCommand.get(CordVtnPipeline.class);
- pipeline.flushRules();
+ pipeline.cleanupPipeline();
}
}
diff --git a/src/main/java/org/opencord/cordvtn/impl/CordVtnArpProxy.java b/src/main/java/org/opencord/cordvtn/impl/CordVtnArpProxy.java
index 4dc69a2..3d0c492 100644
--- a/src/main/java/org/opencord/cordvtn/impl/CordVtnArpProxy.java
+++ b/src/main/java/org/opencord/cordvtn/impl/CordVtnArpProxy.java
@@ -36,10 +36,13 @@
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
@@ -57,6 +60,8 @@
import org.opencord.cordvtn.api.core.ServiceNetworkListener;
import org.opencord.cordvtn.api.core.ServiceNetworkService;
import org.opencord.cordvtn.api.net.ServiceNetwork;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -68,6 +73,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.opencord.cordvtn.api.Constants.DEFAULT_GATEWAY_MAC_STR;
import static org.opencord.cordvtn.api.net.ServiceNetwork.NetworkType.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -97,7 +103,10 @@
protected ComponentConfigService compConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnNodeManager nodeManager;
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnNodeService nodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ServiceNetworkService snetService;
@@ -303,13 +312,11 @@
private void forwardManagementArpRequest(PacketContext context, Ethernet ethPacket) {
DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
- PortNumber hostMgmtPort = nodeManager.hostManagementPort(deviceId);
- Host host = hostService.getConnectedHosts(context.inPacket().receivedFrom())
- .stream()
- .findFirst().orElse(null);
+ PortNumber hostMgmtPort = hostMgmtPort(deviceId);
+ Host host = hostService.getHost(HostId.hostId(ethPacket.getSourceMAC()));
if (host == null ||
- !Instance.of(host).netType().name().contains("MANAGEMENT") ||
+ Instance.of(host).netType() != MANAGEMENT_HOST ||
hostMgmtPort == null) {
context.block();
return;
@@ -324,10 +331,22 @@
treatment,
ByteBuffer.wrap(ethPacket.serialize())));
- log.trace("Forward ARP request to management network");
context.block();
}
+ private PortNumber hostMgmtPort(DeviceId deviceId) {
+ CordVtnNode node = nodeService.node(deviceId);
+ if (node == null || node.hostManagementInterface() == null) {
+ return null;
+ }
+ Optional<Port> port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> p.annotations().value(PORT_NAME)
+ .equals(node.hostManagementInterface()) &&
+ p.isEnabled())
+ .findAny();
+ return port.isPresent() ? port.get().number() : null;
+ }
+
/**
* Emits gratuitous ARP when a gateway mac address has been changed.
*
@@ -465,7 +484,7 @@
private void readPublicGateways() {
CordVtnConfig config = netConfigService.getConfig(appId, CordVtnConfig.class);
if (config == null) {
- log.debug("No configuration found");
+ log.warn("No configuration found");
return;
}
diff --git a/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
index 97f5f77..14f96ea 100644
--- a/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
+++ b/src/main/java/org/opencord/cordvtn/impl/CordVtnNodeManager.java
@@ -16,99 +16,66 @@
package org.opencord.cordvtn.impl;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.jcraft.jsch.Session;
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.Device;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.behaviour.BridgeConfig;
-import org.onosproject.net.behaviour.BridgeDescription;
-import org.onosproject.net.behaviour.BridgeName;
-import org.onosproject.net.behaviour.ControllerInfo;
-import org.onosproject.net.behaviour.DefaultBridgeDescription;
-import org.onosproject.net.behaviour.DefaultTunnelDescription;
-import org.onosproject.net.behaviour.InterfaceConfig;
-import org.onosproject.net.behaviour.TunnelDescription;
-import org.onosproject.net.behaviour.TunnelEndPoints;
-import org.onosproject.net.behaviour.TunnelKeys;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.device.DeviceAdminService;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.host.HostService;
-import org.onosproject.ovsdb.controller.OvsdbClientService;
-import org.onosproject.ovsdb.controller.OvsdbController;
-import org.onosproject.ovsdb.controller.OvsdbNodeId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.opencord.cordvtn.api.CordVtnConfig;
-import org.opencord.cordvtn.api.core.InstanceService;
-import org.opencord.cordvtn.api.net.CidrAddr;
-import org.opencord.cordvtn.api.node.ConnectionHandler;
import org.opencord.cordvtn.api.node.CordVtnNode;
-import org.opencord.cordvtn.api.node.CordVtnNodeState;
-import org.opencord.cordvtn.api.node.SshAccessInfo;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeListener;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
+import org.opencord.cordvtn.api.node.CordVtnNodeStore;
+import org.opencord.cordvtn.api.node.CordVtnNodeStoreDelegate;
import org.slf4j.Logger;
-import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.AnnotationKeys.PORT_NAME;
-import static org.onosproject.net.Device.Type.SWITCH;
-import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
import static org.opencord.cordvtn.api.Constants.*;
-import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Reads node information from the network config file and handles the config
- * update events.
- * Only a leader controller performs the node addition or deletion.
+ * Manages the inventory of the cordvtn nodes provided via network configuration.
*/
@Component(immediate = true)
-@Service(value = CordVtnNodeManager.class)
-public class CordVtnNodeManager {
+@Service
+public class CordVtnNodeManager extends ListenerRegistry<CordVtnNodeEvent, CordVtnNodeListener>
+ implements CordVtnNodeAdminService, CordVtnNodeService {
protected final Logger log = getLogger(getClass());
- private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(CordVtnNode.class)
- .register(NodeState.class)
- .register(SshAccessInfo.class)
- .register(CidrAddr.class);
+ private static final String MSG_NODE = "Node %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
- private static final int DPID_BEGIN = 3;
+ private static final String ERR_NULL_NODE = "CordVtn node cannot be null";
+ private static final String ERR_NULL_HOSTNAME = "CordVtn node hostname cannot be null";
+ private static final String ERR_NULL_DEVICE_ID = "Device ID cannot be null";
+ private static final String ERR_NOT_FOUND = "does not exist";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -117,114 +84,32 @@
protected NetworkConfigService configService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceAdminService adminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected OvsdbController ovsdbController;
+ protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
+ protected CordVtnNodeStore nodeStore;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected HostService hostService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected InstanceService instanceService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnPipeline pipeline;
-
- private final ExecutorService eventExecutor =
- newSingleThreadExecutor(groupedThreads("onos/cordvtn-node", "event-handler", log));
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final NetworkConfigListener configListener = new InternalConfigListener();
- private final DeviceListener deviceListener = new InternalDeviceListener();
- private final MapEventListener<String, CordVtnNode> nodeStoreListener = new InternalMapListener();
+ private final CordVtnNodeStoreDelegate delegate = new InternalCordVtnNodeStoreDelegate();
- private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
- private final BridgeHandler bridgeHandler = new BridgeHandler();
-
- private ConsistentMap<String, CordVtnNode> nodeStore;
- private List<ControllerInfo> controllers = Lists.newArrayList();
private ApplicationId appId;
private NodeId localNodeId;
- private enum NodeState implements CordVtnNodeState {
-
- INIT {
- @Override
- public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
- // make sure there is OVSDB connection
- if (!nodeManager.isOvsdbConnected(node)) {
- nodeManager.connectOvsdb(node);
- return;
- }
- nodeManager.createIntegrationBridge(node);
- }
- },
- BRIDGE_CREATED {
- @Override
- public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
- // make sure there is OVSDB connection
- if (!nodeManager.isOvsdbConnected(node)) {
- nodeManager.connectOvsdb(node);
- return;
- }
-
- nodeManager.createTunnelInterface(node);
- nodeManager.addSystemInterface(node, node.dataIface());
- if (node.hostMgmtIface().isPresent()) {
- nodeManager.addSystemInterface(node, node.hostMgmtIface().get());
- }
- }
- },
- PORTS_ADDED {
- @Override
- public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
- nodeManager.setIpAddress(node);
- }
- },
- COMPLETE {
- @Override
- public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
- nodeManager.postInit(node);
- }
- },
- INCOMPLETE {
- @Override
- public void process(CordVtnNodeManager nodeManager, CordVtnNode node) {
- }
- };
-
- public abstract void process(CordVtnNodeManager nodeManager, CordVtnNode node);
- }
-
@Activate
protected void activate() {
appId = coreService.registerApplication(CORDVTN_APP_ID);
leadershipService.runForLeadership(appId.name());
localNodeId = clusterService.getLocalNode().id();
- nodeStore = storageService.<String, CordVtnNode>consistentMapBuilder()
- .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
- .withName("cordvtn-nodestore")
- .withApplicationId(appId)
- .build();
-
- nodeStore.addListener(nodeStoreListener, eventExecutor);
- deviceService.addListener(deviceListener);
+ nodeStore.setDelegate(delegate);
configService.addListener(configListener);
- readControllers();
readNodes();
log.info("Started");
}
@@ -232,8 +117,7 @@
@Deactivate
protected void deactivate() {
configService.removeListener(configListener);
- deviceService.removeListener(deviceListener);
- nodeStore.removeListener(nodeStoreListener);
+ nodeStore.unsetDelegate(delegate);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
@@ -241,611 +125,60 @@
log.info("Stopped");
}
- /**
- * Adds or updates a new node to the service.
- *
- * @param node cordvtn node
- */
- public void addOrUpdateNode(CordVtnNode node) {
- checkNotNull(node);
- nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, getNodeState(node)));
+ @Override
+ public void createNode(CordVtnNode node) {
+ checkNotNull(node, ERR_NULL_NODE);
+ nodeStore.createNode(node);
+ log.info(format(MSG_NODE, node.hostname(), MSG_CREATED));
}
- /**
- * Deletes a node from the service.
- *
- * @param node cordvtn node
- */
- public void deleteNode(CordVtnNode node) {
- checkNotNull(node);
- OvsdbClientService ovsdbClient = getOvsdbClient(node);
- if (ovsdbClient != null && ovsdbClient.isConnected()) {
- ovsdbClient.disconnect();
+ @Override
+ public void updateNode(CordVtnNode node) {
+ checkNotNull(node, ERR_NULL_NODE);
+ nodeStore.updateNode(node);
+ log.debug(format(MSG_NODE, node.hostname(), MSG_UPDATED));
+ }
+
+ @Override
+ public CordVtnNode removeNode(String hostname) {
+ checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
+ CordVtnNode removed = nodeStore.removeNode(hostname);
+ if (removed == null) {
+ log.warn(format(MSG_NODE, hostname, ERR_NOT_FOUND));
+ return null;
}
- nodeStore.remove(node.hostname());
+ log.info(format(MSG_NODE, hostname, MSG_REMOVED));
+ return removed;
}
- /**
- * Returns node initialization state.
- *
- * @param node cordvtn node
- * @return true if initial node setup is completed, otherwise false
- */
- public boolean isNodeInitComplete(CordVtnNode node) {
- checkNotNull(node);
- return isNodeStateComplete(node);
+ @Override
+ public Set<CordVtnNode> nodes() {
+ return nodeStore.nodes();
}
- /**
- * Returns the number of the nodes known to the service.
- *
- * @return number of nodes
- */
- public int getNodeCount() {
- return nodeStore.size();
- }
-
- /**
- * Returns all nodes known to the service.
- *
- * @return list of nodes
- */
- public List<CordVtnNode> getNodes() {
- return nodeStore.values().stream().map(Versioned::value).collect(Collectors.toList());
- }
-
- /**
- * Returns all nodes in complete state.
- *
- * @return set of nodes
- */
+ @Override
public Set<CordVtnNode> completeNodes() {
- return getNodes().stream().filter(this::isNodeStateComplete)
- .collect(Collectors.toSet());
- }
-
- /**
- * Returns physical data plane port number of a given device.
- *
- * @param deviceId integration bridge device id
- * @return port number; null otherwise
- */
- public PortNumber dataPort(DeviceId deviceId) {
- CordVtnNode node = nodeByBridgeId(deviceId);
- if (node == null) {
- log.debug("Failed to get node for {}", deviceId);
- return null;
- }
-
- Optional<PortNumber> port = getPortNumber(deviceId, node.dataIface());
- return port.isPresent() ? port.get() : null;
- }
-
- /**
- * Returns physical data plane IP address of a given device.
- *
- * @param deviceId integration bridge device id
- * @return ip address; null otherwise
- */
- public IpAddress dataIp(DeviceId deviceId) {
- CordVtnNode node = nodeByBridgeId(deviceId);
- if (node == null) {
- log.debug("Failed to get node for {}", deviceId);
- return null;
- }
- return node.dataIp().ip();
- }
-
- /**
- * Returns tunnel port number of a given device.
- *
- * @param deviceId integration bridge device id
- * @return port number
- */
- public PortNumber tunnelPort(DeviceId deviceId) {
- Optional<PortNumber> port = getPortNumber(deviceId, DEFAULT_TUNNEL);
- return port.isPresent() ? port.get() : null;
- }
-
- /**
- * Returns host management interface port number if exists.
- *
- * @param deviceId integration bridge device id
- * @return port number; null if it does not exist
- */
- public PortNumber hostManagementPort(DeviceId deviceId) {
- CordVtnNode node = nodeByBridgeId(deviceId);
- if (node == null) {
- log.debug("Failed to get node for {}", deviceId);
- return null;
- }
-
- if (node.hostMgmtIface().isPresent()) {
- Optional<PortNumber> port = getPortNumber(deviceId, node.hostMgmtIface().get());
- return port.isPresent() ? port.get() : null;
- } else {
- return null;
- }
- }
-
- private Optional<PortNumber> getPortNumber(DeviceId deviceId, String portName) {
- PortNumber port = deviceService.getPorts(deviceId).stream()
- .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
- p.isEnabled())
- .map(Port::number)
- .findAny()
- .orElse(null);
- return Optional.ofNullable(port);
- }
-
- /**
- * Returns if current node state saved in nodeStore is COMPLETE or not.
- *
- * @param node cordvtn node
- * @return true if it's complete state, otherwise false
- */
- private boolean isNodeStateComplete(CordVtnNode node) {
- checkNotNull(node);
-
// the state saved in nodeStore can be wrong if IP address settings are changed
// after the node init has been completed since there's no way to detect it
- // getNodeState and checkNodeInitState always return correct answer but can be slow
- Versioned<CordVtnNode> versionedNode = nodeStore.get(node.hostname());
- CordVtnNodeState state = versionedNode.value().state();
- return state != null && state.equals(NodeState.COMPLETE);
+ Set<CordVtnNode> nodes = nodes().stream()
+ .filter(node -> node.state() == COMPLETE)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(nodes);
}
- /**
- * Initiates node to serve virtual tenant network.
- *
- * @param node cordvtn node
- */
- private void initNode(CordVtnNode node) {
- checkNotNull(node);
-
- NodeState state = (NodeState) node.state();
- log.debug("Processing node: {} state: {}", node.hostname(), state);
-
- state.process(this, node);
+ @Override
+ public CordVtnNode node(String hostname) {
+ checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
+ return nodeStore.node(hostname);
}
- /**
- * Performs tasks after node initialization.
- * It disconnects unnecessary OVSDB connection and installs initial flow
- * rules on the device.
- *
- * @param node cordvtn node
- */
- private void postInit(CordVtnNode node) {
- checkNotNull(node);
-
- // disconnect OVSDB session once the node bootstrap is done
- OvsdbClientService ovsdbClient = getOvsdbClient(node);
- if (ovsdbClient != null && ovsdbClient.isConnected()) {
- ovsdbClient.disconnect();
- }
-
- pipeline.initPipeline(node);
-
- // adds existing instances to the host list
- deviceService.getPorts(node.integrationBridgeId()).stream()
- .filter(port -> !node.systemIfaces().contains(portName(port)) &&
- !port.number().equals(PortNumber.LOCAL) &&
- port.isEnabled())
- .forEach(port -> instanceService.addInstance(connectPoint(port)));
-
- // removes stale instances from the host list
- hostService.getHosts().forEach(host -> {
- if (deviceService.getPort(
- host.location().deviceId(),
- host.location().port()) == null) {
- instanceService.removeInstance(host.location());
- }
- });
-
- log.info("Finished init {}", node.hostname());
- }
-
- /**
- * Sets a new state for a given cordvtn node.
- *
- * @param node cordvtn node
- * @param newState new node state
- */
- private void setNodeState(CordVtnNode node, NodeState newState) {
- checkNotNull(node);
- log.debug("Changed {} state: {}", node.hostname(), newState);
- nodeStore.put(node.hostname(), CordVtnNode.getUpdatedNode(node, newState));
- }
-
- /**
- * Checks current state of a given cordvtn node and returns it.
- *
- * @param node cordvtn node
- * @return node state
- */
- private NodeState getNodeState(CordVtnNode node) {
- checkNotNull(node);
- if (!isIntegrationBridgeCreated(node)) {
- return NodeState.INIT;
- }
- for (String iface : node.systemIfaces()) {
- if (!isIfaceCreated(node, iface)) {
- return NodeState.BRIDGE_CREATED;
- }
- }
- if (!isIpAddressSet(node)) {
- return NodeState.PORTS_ADDED;
- }
- return NodeState.COMPLETE;
- }
-
- /**
- * Returns connection state of OVSDB server for a given node.
- *
- * @param node cordvtn node
- * @return true if it is connected, false otherwise
- */
- private boolean isOvsdbConnected(CordVtnNode node) {
- checkNotNull(node);
- OvsdbClientService ovsdbClient = getOvsdbClient(node);
- return deviceService.isAvailable(node.ovsdbId()) &&
- ovsdbClient != null &&
- ovsdbClient.isConnected();
- }
-
- /**
- * Connects to OVSDB server for a given node.
- *
- * @param node cordvtn node
- */
- private void connectOvsdb(CordVtnNode node) {
- checkNotNull(node);
- ovsdbController.connect(node.hostMgmtIp().ip(), node.ovsdbPort());
- }
-
- /**
- * Returns OVSDB client for a given node.
- *
- * @param node cordvtn node
- * @return ovsdb client, or null if there's no ovsdb connection
- */
- private OvsdbClientService getOvsdbClient(CordVtnNode node) {
- checkNotNull(node);
- OvsdbNodeId ovsdb = new OvsdbNodeId(
- node.hostMgmtIp().ip(), node.ovsdbPort().toInt());
- return ovsdbController.getOvsdbClient(ovsdb);
- }
-
- private void createIntegrationBridge(CordVtnNode node) {
- Device device = deviceService.getDevice(node.ovsdbId());
- if (device == null || !device.is(BridgeConfig.class)) {
- log.error("Failed to create integration bridge on {}", node.ovsdbId());
- return;
- }
-
- String dpid = node.integrationBridgeId().toString().substring(DPID_BEGIN);
- BridgeDescription bridgeDesc = DefaultBridgeDescription.builder()
- .name(INTEGRATION_BRIDGE)
- .failMode(BridgeDescription.FailMode.SECURE)
- .datapathId(dpid)
- .disableInBand()
- .controllers(controllers)
- .build();
-
- BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
- bridgeConfig.addBridge(bridgeDesc);
- }
-
- private void createTunnelInterface(CordVtnNode node) {
- Device device = deviceService.getDevice(node.ovsdbId());
- if (device == null || !device.is(InterfaceConfig.class)) {
- log.error("Failed to create tunnel interface on {}", node.ovsdbId());
- return;
- }
-
- TunnelDescription tunnelDesc = DefaultTunnelDescription.builder()
- .deviceId(INTEGRATION_BRIDGE)
- .ifaceName(DEFAULT_TUNNEL)
- .type(VXLAN)
- .remote(TunnelEndPoints.flowTunnelEndpoint())
- .key(TunnelKeys.flowTunnelKey())
- .build();
-
- InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
- ifaceConfig.addTunnelMode(DEFAULT_TUNNEL, tunnelDesc);
- }
-
- private void addSystemInterface(CordVtnNode node, String ifaceName) {
- Session session = connect(node.sshInfo());
- if (session == null || !isInterfaceUp(session, ifaceName)) {
- log.warn("Interface {} is not available on {}", ifaceName, node.hostname());
- disconnect(session);
- return;
- } else {
- disconnect(session);
- }
-
- Device device = deviceService.getDevice(node.ovsdbId());
- if (!device.is(BridgeConfig.class)) {
- log.error("BridgeConfig is not supported for {}", node.ovsdbId());
- return;
- }
-
- BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
- bridgeConfig.addPort(BridgeName.bridgeName(INTEGRATION_BRIDGE), ifaceName);
- }
-
- /**
- * Flushes IP address from data plane interface and adds data plane IP address
- * to integration bridge.
- *
- * @param node cordvtn node
- */
- private void setIpAddress(CordVtnNode node) {
- Session session = connect(node.sshInfo());
- if (session == null) {
- log.debug("Failed to SSH to {}", node.hostname());
- return;
- }
- getCurrentIps(session, INTEGRATION_BRIDGE).stream()
- .filter(ip -> !ip.equals(node.localMgmtIp().ip()))
- .filter(ip -> !ip.equals(node.dataIp().ip()))
- .forEach(ip -> deleteIp(session, ip, INTEGRATION_BRIDGE));
-
- boolean result = flushIp(session, node.dataIface()) &&
- setInterfaceUp(session, node.dataIface()) &&
- addIp(session, node.dataIp(), INTEGRATION_BRIDGE) &&
- addIp(session, node.localMgmtIp(), INTEGRATION_BRIDGE) &&
- setInterfaceUp(session, INTEGRATION_BRIDGE);
-
- disconnect(session);
- if (result) {
- setNodeState(node, NodeState.COMPLETE);
- }
- }
-
- private boolean isIntegrationBridgeCreated(CordVtnNode node) {
- return deviceService.getDevice(node.integrationBridgeId()) != null &&
- deviceService.isAvailable(node.integrationBridgeId());
- }
-
- private boolean isIfaceCreated(CordVtnNode node, String ifaceName) {
- if (Strings.isNullOrEmpty(ifaceName)) {
- return false;
- }
- return deviceService.getPorts(node.integrationBridgeId()).stream()
- .anyMatch(p -> portName(p).contains(ifaceName) && p.isEnabled());
- }
-
- /**
- * Checks if the IP addresses are correctly set.
- *
- * @param node cordvtn node
- * @return true if the IP is set, false otherwise
- */
- private boolean isIpAddressSet(CordVtnNode node) {
- Session session = connect(node.sshInfo());
- if (session == null) {
- log.debug("Failed to SSH to {}", node.hostname());
- return false;
- }
-
- Set<IpAddress> intBrIps = getCurrentIps(session, INTEGRATION_BRIDGE);
- boolean result = getCurrentIps(session, node.dataIface()).isEmpty() &&
- isInterfaceUp(session, node.dataIface()) &&
- intBrIps.contains(node.dataIp().ip()) &&
- intBrIps.contains(node.localMgmtIp().ip()) &&
- isInterfaceUp(session, INTEGRATION_BRIDGE);
-
- disconnect(session);
- return result;
- }
-
- /**
- * Returns connect point of a given port.
- *
- * @param port port
- * @return connect point
- */
- private ConnectPoint connectPoint(Port port) {
- return new ConnectPoint(port.element().id(), port.number());
- }
-
- /**
- * Returns cordvtn node associated with a given OVSDB device.
- *
- * @param ovsdbId OVSDB device id
- * @return cordvtn node, null if it fails to find the node
- */
- private CordVtnNode nodeByOvsdbId(DeviceId ovsdbId) {
- return getNodes().stream()
- .filter(node -> node.ovsdbId().equals(ovsdbId))
- .findFirst().orElse(null);
- }
-
- /**
- * Returns cordvtn node associated with a given integration bridge.
- *
- * @param bridgeId device id of integration bridge
- * @return cordvtn node, null if it fails to find the node
- */
- private CordVtnNode nodeByBridgeId(DeviceId bridgeId) {
- return getNodes().stream()
- .filter(node -> node.integrationBridgeId().equals(bridgeId))
- .findFirst().orElse(null);
- }
-
- /**
- * Returns port name.
- *
- * @param port port
- * @return port name
- */
- private String portName(Port port) {
- return port.annotations().value(PORT_NAME);
- }
-
- private class OvsdbHandler implements ConnectionHandler<Device> {
-
- @Override
- public void connected(Device device) {
- CordVtnNode node = nodeByOvsdbId(device.id());
- if (node != null) {
- setNodeState(node, getNodeState(node));
- } else {
- log.info("{} is detected on unregistered node, ignore it.", device.id());
- }
- }
-
- @Override
- public void disconnected(Device device) {
- log.debug("Device {} is disconnected", device.id());
- adminService.removeDevice(device.id());
- }
- }
-
- private class BridgeHandler implements ConnectionHandler<Device> {
-
- @Override
- public void connected(Device device) {
- CordVtnNode node = nodeByBridgeId(device.id());
- if (node != null) {
- setNodeState(node, getNodeState(node));
- } else {
- log.info("{} is detected on unregistered node, ignore it.", device.id());
- }
- }
-
- @Override
- public void disconnected(Device device) {
- CordVtnNode node = nodeByBridgeId(device.id());
- if (node != null) {
- log.warn("Integration Bridge is disconnected from {}", node.hostname());
- setNodeState(node, NodeState.INCOMPLETE);
- }
- }
-
- /**
- * Handles port added situation.
- * If the added port is tunnel or data plane interface, proceed to the remaining
- * node initialization. Otherwise, do nothing.
- *
- * @param port port
- */
- public void portAdded(Port port) {
- CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
- String portName = portName(port);
- if (node == null) {
- log.info("{} is added to unregistered node, ignore it.", portName);
- return;
- }
-
- if (node.systemIfaces().contains(portName)) {
- setNodeState(node, getNodeState(node));
- } else if (isNodeStateComplete(node)) {
- // TODO move this logic to InstanceManager
- instanceService.addInstance(connectPoint(port));
- } else {
- log.warn("Instance is detected on incomplete node, ignore it.", portName);
- }
- }
-
- /**
- * Handles port removed situation.
- * If the removed port is tunnel or data plane interface, proceed to the remaining
- * node initialization.Others, do nothing.
- *
- * @param port port
- */
- public void portRemoved(Port port) {
- CordVtnNode node = nodeByBridgeId((DeviceId) port.element().id());
- String portName = portName(port);
- if (node == null) {
- log.info("{} is removed from unregistered node, ignore it.", portName);
- return;
- }
-
- if (node.systemIfaces().contains(portName)) {
- setNodeState(node, NodeState.INCOMPLETE);
- } else if (isNodeStateComplete(node)) {
- // TODO move this logic to InstanceManager
- instanceService.removeInstance(connectPoint(port));
- } else {
- log.warn("VM is vanished from incomplete node, ignore it.", portName);
- }
- }
- }
-
- private class InternalDeviceListener implements DeviceListener {
-
- @Override
- public boolean isRelevant(DeviceEvent event) {
- NodeId leaderNodeId = leadershipService.getLeader(appId.name());
- return Objects.equals(localNodeId, leaderNodeId);
- }
-
- @Override
- public void event(DeviceEvent event) {
-
- Device device = event.subject();
- ConnectionHandler<Device> handler = device.type().equals(SWITCH) ?
- bridgeHandler : ovsdbHandler;
-
- switch (event.type()) {
- case PORT_ADDED:
- eventExecutor.execute(() -> {
- log.info("Port {} is added to {}",
- event.port().annotations().value(PORT_NAME),
- event.subject().id());
- bridgeHandler.portAdded(event.port());
- });
- break;
- case PORT_UPDATED:
- if (event.port().isEnabled()) {
- eventExecutor.execute(() -> {
- log.info("Port {} is added to {}",
- event.port().annotations().value(PORT_NAME),
- event.subject().id());
- bridgeHandler.portAdded(event.port());
- });
- } else {
- eventExecutor.execute(() -> {
- log.info("Port {} is removed from {}",
- event.port().annotations().value(PORT_NAME),
- event.subject().id());
- bridgeHandler.portRemoved(event.port());
- });
- }
- break;
- case PORT_REMOVED:
- eventExecutor.execute(() -> {
- log.info("Port {} is removed from {}",
- event.port().annotations().value(PORT_NAME),
- event.subject().id());
- bridgeHandler.portRemoved(event.port());
- });
- break;
- case DEVICE_ADDED:
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(device.id())) {
- eventExecutor.execute(() -> {
- log.info("Device {} is connected",
- event.subject().id());
- handler.connected(device);
- });
- } else {
- eventExecutor.execute(() -> {
- log.info("Device {} is disconnected",
- event.subject().id());
- handler.disconnected(device);
- });
- }
- break;
- default:
- break;
- }
- }
+ @Override
+ public CordVtnNode node(DeviceId deviceId) {
+ checkNotNull(deviceId, ERR_NULL_DEVICE_ID);
+ return nodes().stream()
+ .filter(node -> node.integrationBridgeId().equals(deviceId) ||
+ node.ovsdbId().equals(deviceId))
+ .findAny().orElse(null);
}
/**
@@ -860,21 +193,18 @@
CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
if (config == null) {
- log.debug("No configuration found");
+ log.warn("No configuration found");
return;
}
- config.cordVtnNodes().forEach(this::addOrUpdateNode);
- }
-
- private void readControllers() {
- CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
- if (config == null) {
- log.debug("No configuration found");
- return;
- }
- controllers = config.controllers();
- controllers.forEach(ctrl -> {
- log.debug("Added controller {}:{}", ctrl.ip(), ctrl.port());
+ config.cordVtnNodes().forEach(node -> {
+ log.info("Read node from network config: {}", node.hostname());
+ CordVtnNode existing = node(node.hostname());
+ if (existing == null) {
+ createNode(node);
+ } else if (!existing.equals(node)) {
+ // FIXME maybe we need to re-check node states
+ updateNode(node);
+ }
});
}
@@ -889,10 +219,7 @@
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
- eventExecutor.execute(() -> {
- readControllers();
- readNodes();
- });
+ eventExecutor.execute(CordVtnNodeManager.this::readNodes);
break;
default:
break;
@@ -900,44 +227,12 @@
}
}
- private class InternalMapListener implements MapEventListener<String, CordVtnNode> {
+ private class InternalCordVtnNodeStoreDelegate implements CordVtnNodeStoreDelegate {
@Override
- public void event(MapEvent<String, CordVtnNode> event) {
- NodeId leaderNodeId = leadershipService.getLeader(appId.name());
- if (!Objects.equals(localNodeId, leaderNodeId)) {
- // do not allow to proceed without leadership
- return;
- }
-
- CordVtnNode oldNode;
- CordVtnNode newNode;
-
- switch (event.type()) {
- case UPDATE:
- oldNode = event.oldValue().value();
- newNode = event.newValue().value();
-
- log.info("Reloaded {}", newNode.hostname());
- if (!newNode.equals(oldNode)) {
- log.debug("New node: {}", newNode);
- }
- // performs init procedure even if the node is not changed
- // for robustness since it's no harm to run init procedure
- // multiple times
- initNode(newNode);
- break;
- case INSERT:
- newNode = event.newValue().value();
- log.info("Added {}", newNode.hostname());
- initNode(newNode);
- break;
- case REMOVE:
- oldNode = event.oldValue().value();
- log.info("Removed {}", oldNode.hostname());
- break;
- default:
- break;
+ public void notify(CordVtnNodeEvent event) {
+ if (event != null) {
+ process(event);
}
}
}
diff --git a/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNode.java b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNode.java
new file mode 100644
index 0000000..e11e2d3
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNode.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.onlab.packet.TpPort;
+import org.onosproject.net.DeviceId;
+import org.opencord.cordvtn.api.net.CidrAddr;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeState;
+import org.opencord.cordvtn.api.node.SshAccessInfo;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.cordvtn.api.Constants.DEFAULT_OVSDB_PORT;
+import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
+
+/**
+ * Representation of a compute infrastructure node for CORD VTN service.
+ */
+public final class DefaultCordVtnNode implements CordVtnNode {
+
+ private final String hostname;
+ private final CidrAddr hostMgmtIp;
+ private final CidrAddr localMgmtIp;
+ private final CidrAddr dataIp;
+ private final DeviceId integrationBridgeId;
+ private final String dataIface;
+ private final String hostMgmtIface;
+ private final TpPort ovsdbPort;
+ private final SshAccessInfo sshInfo;
+ private final CordVtnNodeState state;
+
+ private DefaultCordVtnNode(String hostname,
+ CidrAddr hostMgmtIp,
+ CidrAddr localMgmtIp,
+ CidrAddr dataIp,
+ DeviceId integrationBridgeId,
+ String dataIface,
+ String hostMgmtIface,
+ TpPort ovsdbPort,
+ SshAccessInfo sshInfo,
+ CordVtnNodeState state) {
+ this.hostname = hostname;
+ this.hostMgmtIp = hostMgmtIp;
+ this.localMgmtIp = localMgmtIp;
+ this.dataIp = dataIp;
+ this.integrationBridgeId = integrationBridgeId;
+ this.dataIface = dataIface;
+ this.hostMgmtIface = hostMgmtIface;
+ this.ovsdbPort = ovsdbPort;
+ this.sshInfo = sshInfo;
+ this.state = state;
+ }
+
+ /**
+ * Returns cordvtn node with the new state.
+ *
+ * @param node cordvtn node
+ * @param state cordvtn node state
+ * @return cordvtn node
+ */
+ public static CordVtnNode updatedState(CordVtnNode node, CordVtnNodeState state) {
+ return new DefaultCordVtnNode(node.hostname(),
+ node.hostManagementIp(),
+ node.localManagementIp(),
+ node.dataIp(),
+ node.integrationBridgeId(),
+ node.dataInterface(),
+ node.hostManagementInterface(),
+ node.ovsdbPort(),
+ node.sshInfo(),
+ state);
+ }
+
+ @Override
+ public String hostname() {
+ return this.hostname;
+ }
+
+ @Override
+ public CidrAddr hostManagementIp() {
+ return this.hostMgmtIp;
+ }
+
+ @Override
+ public CidrAddr localManagementIp() {
+ return this.localMgmtIp;
+ }
+
+ @Override
+ public CidrAddr dataIp() {
+ return this.dataIp;
+ }
+
+ @Override
+ public DeviceId integrationBridgeId() {
+ return this.integrationBridgeId;
+ }
+
+ @Override
+ public String dataInterface() {
+ return this.dataIface;
+ }
+
+ @Override
+ public String hostManagementInterface() {
+ return this.hostMgmtIface;
+ }
+
+ @Override
+ public TpPort ovsdbPort() {
+ return this.ovsdbPort;
+ }
+
+ @Override
+ public SshAccessInfo sshInfo() {
+ return this.sshInfo;
+ }
+
+ @Override
+ public CordVtnNodeState state() {
+ return this.state;
+ }
+
+ @Override
+ public DeviceId ovsdbId() {
+ return DeviceId.deviceId("ovsdb:" + this.hostMgmtIp.ip().toString());
+ }
+
+ @Override
+ public Set<String> systemInterfaces() {
+ Set<String> ifaces = Sets.newHashSet(DEFAULT_TUNNEL, dataIface);
+ if (hostMgmtIface != null) {
+ ifaces.add(hostMgmtIface);
+ }
+ return ImmutableSet.copyOf(ifaces);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultCordVtnNode) {
+ DefaultCordVtnNode that = (DefaultCordVtnNode) obj;
+ if (Objects.equals(hostname, that.hostname) &&
+ Objects.equals(hostMgmtIp, that.hostMgmtIp) &&
+ Objects.equals(localMgmtIp, that.localMgmtIp) &&
+ Objects.equals(dataIp, that.dataIp) &&
+ Objects.equals(integrationBridgeId, that.integrationBridgeId) &&
+ Objects.equals(dataIface, that.dataIface) &&
+ Objects.equals(hostMgmtIface, that.hostMgmtIface) &&
+ Objects.equals(ovsdbPort, that.ovsdbPort) &&
+ Objects.equals(sshInfo, that.sshInfo)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostname,
+ hostMgmtIp,
+ localMgmtIp,
+ dataIp,
+ integrationBridgeId,
+ dataIface,
+ hostMgmtIface,
+ ovsdbPort,
+ sshInfo);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("hostname", hostname)
+ .add("hostMgmtIp", hostMgmtIp)
+ .add("localMgmtIp", localMgmtIp)
+ .add("dataIp", dataIp)
+ .add("integrationBridgeId", integrationBridgeId)
+ .add("dataIface", dataIface)
+ .add("hostMgmtIface", hostMgmtIface)
+ .add("ovsdbPort", ovsdbPort)
+ .add("sshInfo", sshInfo)
+ .add("state", state)
+ .toString();
+ }
+
+ /**
+ * Returns new node builder instance.
+ *
+ * @return cordvtn node builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder implements CordVtnNode.Builder {
+
+ private String hostname;
+ private CidrAddr hostMgmtIp;
+ private CidrAddr localMgmtIp;
+ private CidrAddr dataIp;
+ private DeviceId integrationBridgeId;
+ private String dataIface;
+ private String hostMgmtIface;
+ private TpPort ovsdbPort = TpPort.tpPort(DEFAULT_OVSDB_PORT);
+ private SshAccessInfo sshInfo;
+ private CordVtnNodeState state = CordVtnNodeState.INIT;
+
+ private Builder() {
+ }
+
+ @Override
+ public CordVtnNode build() {
+ checkArgument(!Strings.isNullOrEmpty(hostname), "Hostname cannot be null");
+ checkNotNull(hostMgmtIp, "Host management IP address cannot be null");
+ checkNotNull(localMgmtIp, "Local management IP address cannot be null");
+ checkNotNull(dataIp, "Data IP address cannot be null");
+ checkNotNull(integrationBridgeId, "Integration bridge ID cannot be null");
+ checkArgument(!Strings.isNullOrEmpty(dataIface), "Data interface cannot be null");
+ if (hostMgmtIface != null) {
+ checkArgument(!Strings.isNullOrEmpty(hostMgmtIface),
+ "Host management interface cannot be empty string");
+ }
+ checkNotNull(sshInfo, "SSH access information cannot be null");
+ checkNotNull(state, "Node state cannot be null");
+
+ return new DefaultCordVtnNode(hostname,
+ hostMgmtIp,
+ localMgmtIp,
+ dataIp,
+ integrationBridgeId,
+ dataIface,
+ hostMgmtIface,
+ ovsdbPort,
+ sshInfo, state);
+ }
+
+ @Override
+ public Builder hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ @Override
+ public Builder hostManagementIp(CidrAddr hostMgmtIp) {
+ this.hostMgmtIp = hostMgmtIp;
+ return this;
+ }
+
+ @Override
+ public Builder localManagementIp(CidrAddr localMgmtIp) {
+ this.localMgmtIp = localMgmtIp;
+ return this;
+ }
+
+ @Override
+ public Builder dataIp(CidrAddr dataIp) {
+ this.dataIp = dataIp;
+ return this;
+ }
+
+ @Override
+ public Builder integrationBridgeId(DeviceId deviceId) {
+ this.integrationBridgeId = deviceId;
+ return this;
+ }
+
+ @Override
+ public Builder dataInterface(String dataIface) {
+ this.dataIface = dataIface;
+ return this;
+ }
+
+ @Override
+ public Builder hostManagementInterface(String hostMgmtIface) {
+ this.hostMgmtIface = hostMgmtIface;
+ return this;
+ }
+
+ @Override
+ public Builder ovsdbPort(TpPort port) {
+ this.ovsdbPort = port;
+ return this;
+ }
+
+ @Override
+ public Builder sshInfo(SshAccessInfo sshInfo) {
+ this.sshInfo = sshInfo;
+ return this;
+ }
+
+ @Override
+ public Builder state(CordVtnNodeState state) {
+ this.state = state;
+ return this;
+ }
+ }
+}
diff --git a/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandler.java b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandler.java
new file mode 100644
index 0000000..a663e38
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandler.java
@@ -0,0 +1,601 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.jcraft.jsch.Session;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultBridgeDescription;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelEndPoints;
+import org.onosproject.net.behaviour.TunnelKeys;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.host.HostService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
+import org.opencord.cordvtn.api.CordVtnConfig;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
+import org.opencord.cordvtn.api.core.InstanceService;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeHandler;
+import org.opencord.cordvtn.api.node.CordVtnNodeListener;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
+import org.opencord.cordvtn.api.node.CordVtnNodeState;
+import org.opencord.cordvtn.api.node.DeviceHandler;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.onosproject.net.Device.Type.SWITCH;
+import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
+import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
+import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
+import static org.opencord.cordvtn.api.Constants.INTEGRATION_BRIDGE;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
+import static org.opencord.cordvtn.impl.DefaultCordVtnNode.updatedState;
+import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
+import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.disconnect;
+import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.isInterfaceUp;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the {@link CordVtnNodeHandler} with OVSDB and SSH exec channel.
+ */
+@Component(immediate = true)
+@Service
+public class DefaultCordVtnNodeHandler implements CordVtnNodeHandler {
+
+ protected final Logger log = getLogger(getClass());
+
+ private static final String ERR_INCOMPLETE = "%s is %s from incomplete node, ignore it";
+ private static final String ERR_UNREGISTERED = "%s is %s from unregistered node, ignore it";
+ private static final String ERR_DETECTED = "detected";
+ private static final String ERR_VANISHED = "vanished";
+
+ private static final int DPID_BEGIN = 3;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceAdminService deviceAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HostService hostService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OvsdbController ovsdbController;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnNodeAdminService nodeAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected InstanceService instanceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnPipeline pipelineService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final NetworkConfigListener configListener = new InternalConfigListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final DeviceHandler ovsdbHandler = new OvsdbDeviceHandler();
+ private final DeviceHandler intgBridgeHandler = new IntegrationBridgeDeviceHandler();
+ private final CordVtnNodeListener nodeListener = new InternalCordVtnNodeListener();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+ private List<ControllerInfo> controllers = ImmutableList.of();
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(CORDVTN_APP_ID);
+ leadershipService.runForLeadership(appId.name());
+ localNodeId = clusterService.getLocalNode().id();
+
+ configService.addListener(configListener);
+ deviceService.addListener(deviceListener);
+ nodeService.addListener(nodeListener);
+
+ readControllers();
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ nodeService.removeListener(nodeListener);
+ deviceService.removeListener(deviceListener);
+ configService.removeListener(configListener);
+
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void processInitState(CordVtnNode node) {
+ if (!isOvsdbConnected(node)) {
+ ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
+ return;
+ }
+ createIntegrationBridge(node);
+ }
+
+ @Override
+ public void processDeviceCreatedState(CordVtnNode node) {
+ if (!isOvsdbConnected(node)) {
+ ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
+ return;
+ }
+ createTunnelInterface(node);
+ addSystemInterface(node, node.dataInterface());
+ if (node.hostManagementInterface() != null) {
+ addSystemInterface(node, node.hostManagementInterface());
+ }
+ }
+
+ @Override
+ public void processPortCreatedState(CordVtnNode node) {
+ configureInterface(node);
+ }
+
+ @Override
+ public void processCompleteState(CordVtnNode node) {
+ OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
+ new OvsdbNodeId(
+ node.hostManagementIp().ip(),
+ node.ovsdbPort().toInt()));
+ if (ovsdbClient != null && ovsdbClient.isConnected()) {
+ ovsdbClient.disconnect();
+ }
+ // TODO fix postInit to be done in the proper services
+ postInit(node);
+ log.info("Finished init {}", node.hostname());
+ }
+
+ private boolean isOvsdbConnected(CordVtnNode node) {
+ OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
+ new OvsdbNodeId(
+ node.hostManagementIp().ip(),
+ node.ovsdbPort().toInt()));
+ return deviceService.isAvailable(node.ovsdbId()) && ovsdbClient != null &&
+ ovsdbClient.isConnected();
+ }
+
+ private void createIntegrationBridge(CordVtnNode node) {
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (device == null || !device.is(BridgeConfig.class)) {
+ log.error("Failed to create integration bridge on {}", node.ovsdbId());
+ return;
+ }
+ String dpid = node.integrationBridgeId().toString().substring(DPID_BEGIN);
+ BridgeDescription bridgeDesc = DefaultBridgeDescription.builder()
+ .name(INTEGRATION_BRIDGE)
+ .failMode(BridgeDescription.FailMode.SECURE)
+ .datapathId(dpid)
+ .disableInBand()
+ .controllers(controllers)
+ .build();
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(bridgeDesc);
+ }
+
+ private void createTunnelInterface(CordVtnNode node) {
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create tunnel interface on {}", node.ovsdbId());
+ return;
+ }
+ TunnelDescription tunnelDesc = DefaultTunnelDescription.builder()
+ .deviceId(INTEGRATION_BRIDGE)
+ .ifaceName(DEFAULT_TUNNEL)
+ .type(VXLAN)
+ .remote(TunnelEndPoints.flowTunnelEndpoint())
+ .key(TunnelKeys.flowTunnelKey())
+ .build();
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+ ifaceConfig.addTunnelMode(DEFAULT_TUNNEL, tunnelDesc);
+ }
+
+ private void addSystemInterface(CordVtnNode node, String ifaceName) {
+ Session session = connect(node.sshInfo());
+ if (session == null || !isInterfaceUp(session, ifaceName)) {
+ log.error("Interface {} is not available on {}", ifaceName, node.hostname());
+ disconnect(session);
+ return;
+ } else {
+ disconnect(session);
+ }
+
+ Device device = deviceService.getDevice(node.ovsdbId());
+ if (!device.is(BridgeConfig.class)) {
+ log.error("BridgeConfig is not supported for {}", node.ovsdbId());
+ return;
+ }
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addPort(BridgeName.bridgeName(INTEGRATION_BRIDGE), ifaceName);
+ }
+
+ private void configureInterface(CordVtnNode node) {
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.error("Failed to SSH to {}", node.hostname());
+ return;
+ }
+ getCurrentIps(session, INTEGRATION_BRIDGE).stream()
+ .filter(ip -> !ip.equals(node.localManagementIp().ip()))
+ .filter(ip -> !ip.equals(node.dataIp().ip()))
+ .forEach(ip -> deleteIp(session, ip, INTEGRATION_BRIDGE));
+
+ final boolean result = flushIp(session, node.dataInterface()) &&
+ setInterfaceUp(session, node.dataInterface()) &&
+ addIp(session, node.dataIp(), INTEGRATION_BRIDGE) &&
+ addIp(session, node.localManagementIp(), INTEGRATION_BRIDGE) &&
+ setInterfaceUp(session, INTEGRATION_BRIDGE);
+
+ disconnect(session);
+ if (result) {
+ bootstrapNode(node);
+ }
+ }
+
+ private void postInit(CordVtnNode node) {
+ // TODO move the below line to DefaultCordVtnPipeline
+ pipelineService.initPipeline(node);
+
+ // TODO move the logic below to InstanceManager
+ // adds existing instances to the host list
+ deviceService.getPorts(node.integrationBridgeId()).stream()
+ .filter(port -> port.isEnabled() &&
+ !port.number().equals(PortNumber.LOCAL) &&
+ !node.systemInterfaces().contains(port.annotations().value(PORT_NAME)))
+ .forEach(port -> instanceService.addInstance(new ConnectPoint(
+ port.element().id(),
+ port.number()
+ )));
+ // removes stale instances from the host list
+ hostService.getHosts().forEach(host -> {
+ if (deviceService.getPort(
+ host.location().deviceId(),
+ host.location().port()) == null) {
+ instanceService.removeInstance(host.location());
+ }
+ });
+ }
+
+ private class OvsdbDeviceHandler implements DeviceHandler {
+
+ @Override
+ public void connected(Device device) {
+ CordVtnNode node = nodeService.node(device.id());
+ if (node != null) {
+ bootstrapNode(node);
+ }
+ }
+
+ @Override
+ public void disconnected(Device device) {
+ CordVtnNode node = nodeService.node(device.id());
+ if (node != null && node.state() == COMPLETE) {
+ log.debug("Device({}) from {} disconnected", device.id(), node.hostname());
+ deviceAdminService.removeDevice(device.id());
+ }
+ }
+ }
+
+ private class IntegrationBridgeDeviceHandler implements DeviceHandler {
+
+ @Override
+ public void connected(Device device) {
+ CordVtnNode node = nodeService.node(device.id());
+ if (node != null) {
+ bootstrapNode(node);
+ }
+ }
+
+ @Override
+ public void disconnected(Device device) {
+ CordVtnNode node = nodeService.node(device.id());
+ if (node != null) {
+ log.warn("Device({}) from {} disconnected", device.id(), node.hostname());
+ setState(node, INIT);
+ }
+ }
+
+ @Override
+ public void portAdded(Port port) {
+ CordVtnNode node = nodeService.node((DeviceId) port.element().id());
+ String portName = port.annotations().value(PORT_NAME);
+ if (node == null) {
+ log.warn(format(ERR_UNREGISTERED, portName, ERR_DETECTED));
+ return;
+ }
+ if (node.systemInterfaces().contains(portName)) {
+ if (node.state() == DEVICE_CREATED) {
+ bootstrapNode(node);
+ }
+ } else if (node.state() == COMPLETE) {
+ // TODO move this logic to InstanceManager
+ instanceService.addInstance(new ConnectPoint(port.element().id(),
+ port.number()));
+ } else {
+ log.warn(format(ERR_INCOMPLETE, portName, ERR_DETECTED));
+ }
+ }
+
+ @Override
+ public void portRemoved(Port port) {
+ CordVtnNode node = nodeService.node((DeviceId) port.element().id());
+ String portName = port.annotations().value(PORT_NAME);
+ if (node == null) {
+ log.warn(format(ERR_UNREGISTERED, portName, ERR_VANISHED));
+ return;
+ }
+ if (node.systemInterfaces().contains(portName)) {
+ if (node.state() == PORT_CREATED || node.state() == COMPLETE) {
+ // always falls back to INIT state to avoid a mess caused by
+ // the multiple events received out of order
+ setState(node, INIT);
+ }
+ } else if (node.state() == COMPLETE) {
+ // TODO move this logic to InstanceManager
+ instanceService.removeInstance(new ConnectPoint(port.element().id(),
+ port.number()));
+ } else {
+ log.warn(format(ERR_INCOMPLETE, portName, ERR_VANISHED));
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ eventExecutor.execute(() -> {
+ NodeId leader = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leader)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+ handle(event);
+ });
+ }
+
+ private void handle(DeviceEvent event) {
+ DeviceHandler handler = event.subject().type().equals(SWITCH) ?
+ intgBridgeHandler : ovsdbHandler;
+ Device device = event.subject();
+
+ switch (event.type()) {
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_ADDED:
+ if (deviceService.isAvailable(device.id())) {
+ log.debug("Device {} is connected", device.id());
+ handler.connected(device);
+ } else {
+ log.debug("Device {} is disconnected", device.id());
+ handler.disconnected(device);
+ }
+ break;
+ case PORT_ADDED:
+ log.debug("Port {} is added to {}",
+ event.port().annotations().value(PORT_NAME),
+ device.id());
+ handler.portAdded(event.port());
+ break;
+ case PORT_UPDATED:
+ if (event.port().isEnabled()) {
+ log.debug("Port {} is added to {}",
+ event.port().annotations().value(PORT_NAME),
+ device.id());
+ handler.portAdded(event.port());
+ } else {
+ log.debug("Port {} is removed from {}",
+ event.port().annotations().value(PORT_NAME),
+ device.id());
+ handler.portRemoved(event.port());
+ }
+ break;
+ case PORT_REMOVED:
+ log.debug("Port {} is removed from {}",
+ event.port().annotations().value(PORT_NAME),
+ device.id());
+ handler.portRemoved(event.port());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private Set<String> activePorts(DeviceId deviceId) {
+ Set<String> activePorts = deviceService.getPorts(deviceId).stream()
+ .filter(Port::isEnabled)
+ .map(port -> port.annotations().value(PORT_NAME))
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(activePorts);
+ }
+
+ private boolean isIpAddressSet(CordVtnNode node) {
+ Session session = connect(node.sshInfo());
+ if (session == null) {
+ log.warn("Failed to SSH to {}", node.hostname());
+ return false;
+ }
+ Set<IpAddress> intBrIps = getCurrentIps(session, INTEGRATION_BRIDGE);
+ boolean result = getCurrentIps(session, node.dataInterface()).isEmpty() &&
+ isInterfaceUp(session, node.dataInterface()) &&
+ intBrIps.contains(node.dataIp().ip()) &&
+ intBrIps.contains(node.localManagementIp().ip()) &&
+ isInterfaceUp(session, INTEGRATION_BRIDGE);
+
+ disconnect(session);
+ return result;
+ }
+
+ private boolean isCurrentStateDone(CordVtnNode node) {
+ switch (node.state()) {
+ case INIT:
+ return deviceService.isAvailable(node.integrationBridgeId());
+ case DEVICE_CREATED:
+ Set<String> activePorts = activePorts(node.integrationBridgeId());
+ return node.systemInterfaces().stream().allMatch(activePorts::contains);
+ case PORT_CREATED:
+ return isIpAddressSet(node);
+ case COMPLETE:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ private void setState(CordVtnNode node, CordVtnNodeState newState) {
+ if (node.state() == newState) {
+ return;
+ }
+ CordVtnNode updated = updatedState(node, newState);
+ nodeAdminService.updateNode(updated);
+ log.info("Changed {} state: {}", node.hostname(), newState);
+ }
+
+ private void bootstrapNode(CordVtnNode node) {
+ if (isCurrentStateDone(node)) {
+ setState(node, node.state().nextState());
+ } else {
+ node.state().process(this, node);
+ }
+ }
+
+ private class InternalCordVtnNodeListener implements CordVtnNodeListener {
+
+ @Override
+ public void event(CordVtnNodeEvent event) {
+ eventExecutor.execute(() -> {
+ NodeId leader = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leader)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+ handle(event);
+ });
+ }
+
+ private void handle(CordVtnNodeEvent event) {
+ switch (event.type()) {
+ case NODE_CREATED:
+ case NODE_UPDATED:
+ bootstrapNode(event.subject());
+ break;
+ case NODE_REMOVED:
+ case NODE_COMPLETE:
+ case NODE_INCOMPLETE:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ private void readControllers() {
+ CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
+ if (config == null) {
+ log.warn("No configuration found");
+ return;
+ }
+ controllers = config.controllers();
+ controllers.forEach(ctrl -> {
+ log.debug("Added controller {}:{}", ctrl.ip(), ctrl.port());
+ });
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ if (!event.configClass().equals(CordVtnConfig.class)) {
+ return;
+ }
+
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ readControllers();
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/opencord/cordvtn/impl/CordVtnPipeline.java b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnPipeline.java
similarity index 81%
rename from src/main/java/org/opencord/cordvtn/impl/CordVtnPipeline.java
rename to src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnPipeline.java
index af6f27c..812f99c 100644
--- a/src/main/java/org/opencord/cordvtn/impl/CordVtnPipeline.java
+++ b/src/main/java/org/opencord/cordvtn/impl/DefaultCordVtnPipeline.java
@@ -30,13 +30,12 @@
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Port;
import org.opencord.cordvtn.api.Constants;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.node.CordVtnNode;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
-import org.onosproject.net.behaviour.ExtensionTreatmentResolver;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -47,23 +46,20 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.instructions.ExtensionPropertyException;
-import org.onosproject.net.flow.instructions.ExtensionTreatment;
import org.slf4j.Logger;
-import java.util.Optional;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.COMPLETE;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Provides CORD VTN pipeline.
+ * Implementation of cordvtn pipeline.
*/
@Component(immediate = true)
-@Service(value = CordVtnPipeline.class)
-public final class CordVtnPipeline {
+@Service
+public class DefaultCordVtnPipeline implements CordVtnPipeline {
protected final Logger log = getLogger(getClass());
@@ -76,26 +72,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
- // tables
- public static final int TABLE_ZERO = 0;
- public static final int TABLE_IN_PORT = 1;
- public static final int TABLE_ACCESS = 2;
- public static final int TABLE_IN_SERVICE = 3;
- public static final int TABLE_DST = 4;
- public static final int TABLE_TUNNEL_IN = 5;
- public static final int TABLE_VLAN = 6;
-
- // priorities
- public static final int PRIORITY_MANAGEMENT = 55000;
- public static final int PRIORITY_HIGH = 50000;
- public static final int PRIORITY_DEFAULT = 5000;
- public static final int PRIORITY_LOW = 4000;
- public static final int PRIORITY_ZERO = 0;
-
- public static final int VXLAN_UDP_PORT = 4789;
- public static final VlanId VLAN_WAN = VlanId.vlanId((short) 500);
-
- public static final String PROPERTY_TUNNEL_DST = "tunnelDst";
+ private static final int VXLAN_UDP_PORT = 4789;
private ApplicationId appId;
@@ -110,45 +87,45 @@
log.info("Stopped");
}
- /**
- * Flush flows installed by this application.
- */
- public void flushRules() {
+ @Override
+ public void cleanupPipeline() {
flowRuleService.getFlowRulesById(appId).forEach(flowRule -> processFlowRule(false, flowRule));
}
- /**
- * Installs table miss rule to a give device.
- *
- * @param node cordvtn node
- */
+ @Override
public void initPipeline(CordVtnNode node) {
- checkNotNull(node);
+ checkArgument(node.state() == COMPLETE, "Node is not in COMPLETE state");
- Optional<PortNumber> dataPort = getPortNumber(node.integrationBridgeId(), node.dataIface());
- Optional<PortNumber> tunnelPort = getPortNumber(node.integrationBridgeId(), DEFAULT_TUNNEL);
- if (!dataPort.isPresent() || !tunnelPort.isPresent()) {
- log.warn("Node is not in COMPLETE state");
- return;
- }
-
- Optional<PortNumber> hostMgmtPort = Optional.empty();
- if (node.hostMgmtIface().isPresent()) {
- hostMgmtPort = getPortNumber(node.integrationBridgeId(), node.hostMgmtIface().get());
- }
+ PortNumber dataPort = getPortNumber(node.integrationBridgeId(), node.dataInterface());
+ PortNumber tunnelPort = getPortNumber(node.integrationBridgeId(), DEFAULT_TUNNEL);
+ PortNumber hostMgmtPort = node.hostManagementInterface() == null ?
+ null : getPortNumber(node.integrationBridgeId(), node.hostManagementInterface());
processTableZero(node.integrationBridgeId(),
- dataPort.get(),
- node.dataIp().ip(),
- node.localMgmtIp().ip());
+ dataPort,
+ node.dataIp().ip(),
+ node.localManagementIp().ip());
processInPortTable(node.integrationBridgeId(),
- tunnelPort.get(),
- dataPort.get(),
- hostMgmtPort);
+ tunnelPort,
+ dataPort,
+ hostMgmtPort);
- processAccessTypeTable(node.integrationBridgeId(), dataPort.get());
- processVlanTable(node.integrationBridgeId(), dataPort.get());
+ processAccessTypeTable(node.integrationBridgeId(), dataPort);
+ processVlanTable(node.integrationBridgeId(), dataPort);
+ }
+
+ @Override
+ public void processFlowRule(boolean install, FlowRule rule) {
+ FlowRuleOperations.Builder oBuilder = FlowRuleOperations.builder();
+ oBuilder = install ? oBuilder.add(rule) : oBuilder.remove(rule);
+
+ flowRuleService.apply(oBuilder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ log.error(String.format("Failed %s, %s", ops.toString(), rule.toString()));
+ }
+ }));
}
private void processTableZero(DeviceId deviceId, PortNumber dataPort, IpAddress dataIp,
@@ -380,7 +357,7 @@
}
private void processInPortTable(DeviceId deviceId, PortNumber tunnelPort, PortNumber dataPort,
- Optional<PortNumber> hostMgmtPort) {
+ PortNumber hostMgmtPort) {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchInPort(tunnelPort)
.build();
@@ -421,9 +398,9 @@
processFlowRule(true, flowRule);
- if (hostMgmtPort.isPresent()) {
+ if (hostMgmtPort != null) {
selector = DefaultTrafficSelector.builder()
- .matchInPort(hostMgmtPort.get())
+ .matchInPort(hostMgmtPort)
.build();
treatment = DefaultTrafficTreatment.builder()
@@ -510,43 +487,11 @@
processFlowRule(true, flowRule);
}
- public void processFlowRule(boolean install, FlowRule rule) {
- FlowRuleOperations.Builder oBuilder = FlowRuleOperations.builder();
- oBuilder = install ? oBuilder.add(rule) : oBuilder.remove(rule);
-
- flowRuleService.apply(oBuilder.build(new FlowRuleOperationsContext() {
- @Override
- public void onError(FlowRuleOperations ops) {
- log.error(String.format("Failed %s, %s", ops.toString(), rule.toString()));
- }
- }));
- }
-
- public ExtensionTreatment tunnelDstTreatment(DeviceId deviceId, Ip4Address remoteIp) {
- Device device = deviceService.getDevice(deviceId);
- if (device != null && !device.is(ExtensionTreatmentResolver.class)) {
- log.error("The extension treatment is not supported");
- return null;
- }
-
- ExtensionTreatmentResolver resolver = device.as(ExtensionTreatmentResolver.class);
- ExtensionTreatment treatment = resolver.getExtensionInstruction(NICIRA_SET_TUNNEL_DST.type());
- try {
- treatment.setPropertyValue(PROPERTY_TUNNEL_DST, remoteIp);
- return treatment;
- } catch (ExtensionPropertyException e) {
- log.warn("Failed to get tunnelDst extension treatment for {}", deviceId);
- return null;
- }
- }
-
- private Optional<PortNumber> getPortNumber(DeviceId deviceId, String portName) {
- PortNumber port = deviceService.getPorts(deviceId).stream()
+ private PortNumber getPortNumber(DeviceId deviceId, String portName) {
+ return deviceService.getPorts(deviceId).stream()
.filter(p -> p.annotations().value(AnnotationKeys.PORT_NAME).equals(portName) &&
p.isEnabled())
.map(Port::number)
- .findAny()
- .orElse(null);
- return Optional.ofNullable(port);
+ .findAny().orElse(null);
}
}
diff --git a/src/main/java/org/opencord/cordvtn/impl/DistributedCordVtnNodeStore.java b/src/main/java/org/opencord/cordvtn/impl/DistributedCordVtnNodeStore.java
new file mode 100644
index 0000000..6bd3875
--- /dev/null
+++ b/src/main/java/org/opencord/cordvtn/impl/DistributedCordVtnNodeStore.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.opencord.cordvtn.api.net.CidrAddr;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeHandler;
+import org.opencord.cordvtn.api.node.CordVtnNodeState;
+import org.opencord.cordvtn.api.node.CordVtnNodeStore;
+import org.opencord.cordvtn.api.node.CordVtnNodeStoreDelegate;
+import org.opencord.cordvtn.api.node.SshAccessInfo;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.COMPLETE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of cordvtn nodes using a {@link ConsistentMap}.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedCordVtnNodeStore extends AbstractStore<CordVtnNodeEvent, CordVtnNodeStoreDelegate>
+ implements CordVtnNodeStore {
+
+ protected final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+
+ private static final KryoNamespace SERIALIZER_CORDVTN_NODE = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(CordVtnNode.class)
+ .register(DefaultCordVtnNode.class)
+ .register(CidrAddr.class)
+ .register(SshAccessInfo.class)
+ .register(CordVtnNodeState.class)
+ .register(CordVtnNodeHandler.class)
+ .register(DefaultCordVtnNodeHandler.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final MapEventListener<String, CordVtnNode> nodeStoreListener = new InternalMapListener();
+ private ConsistentMap<String, CordVtnNode> nodeStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(CORDVTN_APP_ID);
+ nodeStore = storageService.<String, CordVtnNode>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_CORDVTN_NODE))
+ .withName("cordvtn-nodestore")
+ .withApplicationId(appId)
+ .build();
+ nodeStore.addListener(nodeStoreListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ nodeStore.removeListener(nodeStoreListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<CordVtnNode> nodes() {
+ Set<CordVtnNode> nodes = nodeStore.values().stream()
+ .map(Versioned::value)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(nodes);
+ }
+
+ @Override
+ public CordVtnNode node(String hostname) {
+ Versioned<CordVtnNode> versioned = nodeStore.get(hostname);
+ return versioned == null ? null : versioned.value();
+ }
+
+ @Override
+ public void createNode(CordVtnNode node) {
+ nodeStore.compute(node.hostname(), (hostname, existing) -> {
+ final String error = node.hostname() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return node;
+ });
+ }
+
+ @Override
+ public void updateNode(CordVtnNode node) {
+ nodeStore.compute(node.hostname(), (hostname, existing) -> {
+ final String error = node.hostname() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return node;
+ });
+ }
+
+ @Override
+ public CordVtnNode removeNode(String hostname) {
+ Versioned<CordVtnNode> removed = nodeStore.remove(hostname);
+ return removed == null ? null : removed.value();
+ }
+
+ private class InternalMapListener implements MapEventListener<String, CordVtnNode> {
+
+ @Override
+ public void event(MapEvent<String, CordVtnNode> event) {
+ switch (event.type()) {
+ case INSERT:
+ log.debug("CordVtn node is created {}", event.newValue().value());
+ eventExecutor.execute(() -> {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_CREATED,
+ event.newValue().value()
+ ));
+ if (event.newValue().value().state() == COMPLETE) {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_COMPLETE,
+ event.newValue().value()
+ ));
+ }
+ });
+ break;
+ case UPDATE:
+ log.debug("CordVtn node is updated {}", event.newValue().value());
+ eventExecutor.execute(() -> {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_UPDATED,
+ event.newValue().value()
+ ));
+ processUpdated(event.oldValue().value(), event.newValue().value());
+ });
+ break;
+ case REMOVE:
+ log.debug("CordVtn node is removed {}", event.oldValue().value());
+ eventExecutor.execute(() -> {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_REMOVED,
+ event.oldValue().value()
+ ));
+ });
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processUpdated(CordVtnNode oldNode, CordVtnNode newNode) {
+ if (oldNode.state() != COMPLETE && newNode.state() == COMPLETE) {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_COMPLETE,
+ newNode
+ ));
+ } else if (oldNode.state() == COMPLETE && newNode.state() != COMPLETE) {
+ notifyDelegate(new CordVtnNodeEvent(
+ CordVtnNodeEvent.Type.NODE_INCOMPLETE,
+ newNode
+ ));
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/opencord/cordvtn/impl/DistributedServiceNetworkStore.java b/src/main/java/org/opencord/cordvtn/impl/DistributedServiceNetworkStore.java
index dc98ced..212a90e 100644
--- a/src/main/java/org/opencord/cordvtn/impl/DistributedServiceNetworkStore.java
+++ b/src/main/java/org/opencord/cordvtn/impl/DistributedServiceNetworkStore.java
@@ -95,7 +95,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final MapEventListener<PortId, ServicePort> servicePortListener =
new ServicePortMapListener();
@@ -108,7 +108,6 @@
@Activate
protected void activate() {
ApplicationId appId = coreService.registerApplication(CORDVTN_APP_ID);
-
serviceNetworkStore = storageService.<NetworkId, ServiceNetwork>consistentMapBuilder()
.withSerializer(Serializer.using(SERIALIZER_SERVICE))
.withName("cordvtn-servicenetstore")
diff --git a/src/main/java/org/opencord/cordvtn/impl/InstanceManager.java b/src/main/java/org/opencord/cordvtn/impl/InstanceManager.java
index e8983c6..bd5e528 100644
--- a/src/main/java/org/opencord/cordvtn/impl/InstanceManager.java
+++ b/src/main/java/org/opencord/cordvtn/impl/InstanceManager.java
@@ -268,23 +268,26 @@
@Override
public void event(ServiceNetworkEvent event) {
- NodeId leader = leadershipService.getLeader(appId.name());
- if (!Objects.equals(localNodeId, leader)) {
- // do not allow to proceed without leadership
- return;
- }
+ eventExecutor.execute(() -> {
+ NodeId leader = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leader)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+ handle(event);
+ });
+ }
+ private void handle(ServiceNetworkEvent event) {
switch (event.type()) {
case SERVICE_PORT_CREATED:
case SERVICE_PORT_UPDATED:
log.debug("Processing service port {}", event.servicePort());
PortId portId = event.servicePort().id();
- eventExecutor.execute(() -> {
- Instance instance = getInstance(portId);
- if (instance != null) {
- addInstance(instance.host().location());
- }
- });
+ Instance instance = getInstance(portId);
+ if (instance != null) {
+ addInstance(instance.host().location());
+ }
break;
case SERVICE_PORT_REMOVED:
case SERVICE_NETWORK_CREATED:
diff --git a/src/main/java/org/opencord/cordvtn/impl/handler/AbstractInstanceHandler.java b/src/main/java/org/opencord/cordvtn/impl/handler/AbstractInstanceHandler.java
index 64cda2e..59cb7f7 100644
--- a/src/main/java/org/opencord/cordvtn/impl/handler/AbstractInstanceHandler.java
+++ b/src/main/java/org/opencord/cordvtn/impl/handler/AbstractInstanceHandler.java
@@ -18,11 +18,21 @@
import com.google.common.collect.ImmutableSet;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.ExtensionTreatmentResolver;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.instructions.ExtensionPropertyException;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
@@ -33,15 +43,21 @@
import org.opencord.cordvtn.api.net.NetworkId;
import org.opencord.cordvtn.api.net.ServiceNetwork;
import org.opencord.cordvtn.api.net.ServicePort;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.slf4j.Logger;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
+import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -53,11 +69,14 @@
protected static final String ERR_VTN_NETWORK = "Failed to get VTN network for %s";
protected static final String ERR_VTN_PORT = "Failed to get VTN port for %s";
+ protected static final String PROPERTY_TUNNEL_DST = "tunnelDst";
protected CoreService coreService;
protected MastershipService mastershipService;
protected HostService hostService;
+ protected DeviceService deviceService;
protected ServiceNetworkService snetService;
+ protected CordVtnNodeService nodeService;
protected ApplicationId appId;
protected Set<ServiceNetwork.NetworkType> netTypes = ImmutableSet.of();
@@ -71,7 +90,9 @@
coreService = services.get(CoreService.class);
mastershipService = services.get(MastershipService.class);
hostService = services.get(HostService.class);
+ deviceService = services.get(DeviceService.class);
snetService = services.get(ServiceNetworkService.class);
+ nodeService = services.get(CordVtnNodeService.class);
appId = coreService.registerApplication(Constants.CORDVTN_APP_ID);
hostService.addListener(hostListener);
@@ -118,6 +139,75 @@
return sport;
}
+ protected PortNumber dataPort(DeviceId deviceId) {
+ CordVtnNode node = nodeService.node(deviceId);
+ if (node == null) {
+ log.debug("Failed to get node for {}", deviceId);
+ return null;
+ }
+ Optional<PortNumber> port = getPortNumber(deviceId, node.dataInterface());
+ return port.isPresent() ? port.get() : null;
+
+ }
+
+ protected PortNumber tunnelPort(DeviceId deviceId) {
+ Optional<PortNumber> port = getPortNumber(deviceId, DEFAULT_TUNNEL);
+ return port.isPresent() ? port.get() : null;
+ }
+
+ protected PortNumber hostManagementPort(DeviceId deviceId) {
+ CordVtnNode node = nodeService.node(deviceId);
+ if (node == null) {
+ log.debug("Failed to get node for {}", deviceId);
+ return null;
+ }
+
+ if (node.hostManagementInterface() != null) {
+ Optional<PortNumber> port =
+ getPortNumber(deviceId, node.hostManagementInterface());
+ return port.isPresent() ? port.get() : null;
+ } else {
+ return null;
+ }
+ }
+
+ protected ExtensionTreatment tunnelDstTreatment(DeviceId deviceId, Ip4Address remoteIp) {
+ Device device = deviceService.getDevice(deviceId);
+ if (device != null && !device.is(ExtensionTreatmentResolver.class)) {
+ log.error("The extension treatment is not supported");
+ return null;
+ }
+
+ ExtensionTreatmentResolver resolver = device.as(ExtensionTreatmentResolver.class);
+ ExtensionTreatment treatment = resolver.getExtensionInstruction(NICIRA_SET_TUNNEL_DST.type());
+ try {
+ treatment.setPropertyValue(PROPERTY_TUNNEL_DST, remoteIp);
+ return treatment;
+ } catch (ExtensionPropertyException e) {
+ log.warn("Failed to get tunnelDst extension treatment for {}", deviceId);
+ return null;
+ }
+ }
+
+ protected IpAddress dataIp(DeviceId deviceId) {
+ CordVtnNode node = nodeService.node(deviceId);
+ if (node == null) {
+ log.debug("Failed to get node for {}", deviceId);
+ return null;
+ }
+ return node.dataIp().ip();
+ }
+
+ private Optional<PortNumber> getPortNumber(DeviceId deviceId, String portName) {
+ PortNumber port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
+ p.isEnabled())
+ .map(Port::number)
+ .findAny()
+ .orElse(null);
+ return Optional.ofNullable(port);
+ }
+
private class InternalHostListener implements HostListener {
@Override
diff --git a/src/main/java/org/opencord/cordvtn/impl/handler/AccessAgentInstanceHandler.java b/src/main/java/org/opencord/cordvtn/impl/handler/AccessAgentInstanceHandler.java
index 460ba25..82de7fc 100644
--- a/src/main/java/org/opencord/cordvtn/impl/handler/AccessAgentInstanceHandler.java
+++ b/src/main/java/org/opencord/cordvtn/impl/handler/AccessAgentInstanceHandler.java
@@ -28,10 +28,11 @@
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.core.Instance;
import org.opencord.cordvtn.api.core.InstanceHandler;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
-import org.opencord.cordvtn.impl.CordVtnPipeline;
+import org.opencord.cordvtn.api.core.ServiceNetworkService;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import static org.opencord.cordvtn.api.net.ServiceNetwork.NetworkType.ACCESS_AGENT;
@@ -42,10 +43,13 @@
public class AccessAgentInstanceHandler extends AbstractInstanceHandler implements InstanceHandler {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnPipeline pipeline;
+ protected ServiceNetworkService snetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnNodeManager nodeManager;
+ protected CordVtnNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnPipeline pipeline;
@Activate
protected void activate() {
@@ -96,7 +100,7 @@
.build();
treatment = DefaultTrafficTreatment.builder()
- .setOutput(nodeManager.dataPort(instance.deviceId()))
+ .setOutput(dataPort(instance.deviceId()))
.build();
flowRule = DefaultFlowRule.builder()
diff --git a/src/main/java/org/opencord/cordvtn/impl/handler/DefaultInstanceHandler.java b/src/main/java/org/opencord/cordvtn/impl/handler/DefaultInstanceHandler.java
index 3f68c22..8279d34 100644
--- a/src/main/java/org/opencord/cordvtn/impl/handler/DefaultInstanceHandler.java
+++ b/src/main/java/org/opencord/cordvtn/impl/handler/DefaultInstanceHandler.java
@@ -47,15 +47,16 @@
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.host.DefaultHostDescription;
import org.onosproject.net.host.HostDescription;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.core.Instance;
import org.opencord.cordvtn.api.core.InstanceHandler;
import org.opencord.cordvtn.api.core.InstanceService;
+import org.opencord.cordvtn.api.core.ServiceNetworkService;
import org.opencord.cordvtn.api.net.AddressPair;
import org.opencord.cordvtn.api.net.ServiceNetwork;
import org.opencord.cordvtn.api.net.ServicePort;
import org.opencord.cordvtn.api.node.CordVtnNode;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
-import org.opencord.cordvtn.impl.CordVtnPipeline;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import java.util.Set;
import java.util.stream.Collectors;
@@ -74,10 +75,13 @@
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnPipeline pipeline;
+ protected ServiceNetworkService snetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnNodeManager nodeManager;
+ protected CordVtnNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnPipeline pipeline;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected InstanceService instanceService;
@@ -95,10 +99,10 @@
@Override
public void instanceDetected(Instance instance) {
+ log.info("Instance is detected or updated {}", instance);
if (instance.isAdditionalInstance()) {
return;
}
- log.info("Instance is detected or updated {}", instance);
ServiceNetwork snet = getServiceNetwork(instance);
populateDefaultRules(instance, snet, true);
@@ -109,7 +113,7 @@
populateVlanRule(
instance,
sport.vlanId(),
- nodeManager.dataPort(instance.deviceId()),
+ dataPort(instance.deviceId()),
true);
}
// FIXME don't add the existing instance again
@@ -138,7 +142,7 @@
populateVlanRule(
instance,
sport.vlanId(),
- nodeManager.dataPort(instance.deviceId()),
+ dataPort(instance.deviceId()),
false);
}
boolean isOriginalInstance = !instance.isAdditionalInstance();
@@ -246,7 +250,7 @@
}
private void populateDstIpRule(Instance instance, long vni, boolean install) {
- Ip4Address tunnelIp = nodeManager.dataIp(instance.deviceId()).getIp4Address();
+ Ip4Address tunnelIp = dataIp(instance.deviceId()).getIp4Address();
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
@@ -270,13 +274,13 @@
pipeline.processFlowRule(install, flowRule);
- for (CordVtnNode node : nodeManager.completeNodes()) {
+ for (CordVtnNode node : nodeService.completeNodes()) {
if (node.integrationBridgeId().equals(instance.deviceId())) {
continue;
}
ExtensionTreatment tunnelDst =
- pipeline.tunnelDstTreatment(node.integrationBridgeId(), tunnelIp);
+ tunnelDstTreatment(node.integrationBridgeId(), tunnelIp);
if (tunnelDst == null) {
continue;
}
@@ -285,7 +289,7 @@
.setEthDst(instance.mac())
.setTunnelId(vni)
.extension(tunnelDst, node.integrationBridgeId())
- .setOutput(nodeManager.tunnelPort(node.integrationBridgeId()))
+ .setOutput(tunnelPort(node.integrationBridgeId()))
.build();
flowRule = DefaultFlowRule.builder()
@@ -337,7 +341,7 @@
.build();
- nodeManager.completeNodes().forEach(node -> {
+ nodeService.completeNodes().forEach(node -> {
FlowRule flowRuleDirect = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
@@ -362,7 +366,7 @@
.drop()
.build();
- nodeManager.completeNodes().forEach(node -> {
+ nodeService.completeNodes().forEach(node -> {
FlowRule flowRuleDirect = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
diff --git a/src/main/java/org/opencord/cordvtn/impl/handler/DependencyHandler.java b/src/main/java/org/opencord/cordvtn/impl/handler/DependencyHandler.java
index 772e1bc..ba63cdd 100644
--- a/src/main/java/org/opencord/cordvtn/impl/handler/DependencyHandler.java
+++ b/src/main/java/org/opencord/cordvtn/impl/handler/DependencyHandler.java
@@ -50,16 +50,16 @@
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupService;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.core.Instance;
-import org.opencord.cordvtn.api.core.ServiceNetworkAdminService;
import org.opencord.cordvtn.api.core.ServiceNetworkEvent;
import org.opencord.cordvtn.api.core.ServiceNetworkListener;
+import org.opencord.cordvtn.api.core.ServiceNetworkService;
import org.opencord.cordvtn.api.net.NetworkId;
import org.opencord.cordvtn.api.net.ServiceNetwork;
import org.opencord.cordvtn.api.net.ServiceNetwork.DependencyType;
import org.opencord.cordvtn.api.node.CordVtnNode;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
-import org.opencord.cordvtn.impl.CordVtnPipeline;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.slf4j.Logger;
import java.util.List;
@@ -69,9 +69,9 @@
import java.util.stream.Collectors;
import static org.onosproject.net.group.DefaultGroupBucket.createSelectGroupBucket;
+import static org.opencord.cordvtn.api.core.CordVtnPipeline.*;
import static org.opencord.cordvtn.api.net.ServiceNetwork.DependencyType.BIDIRECTIONAL;
import static org.opencord.cordvtn.api.net.ServiceNetwork.NetworkType.*;
-import static org.opencord.cordvtn.impl.CordVtnPipeline.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -97,14 +97,14 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ServiceNetworkService snetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CordVtnPipeline pipeline;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnNodeManager nodeManager;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ServiceNetworkAdminService snetService;
-
private final ServiceNetworkListener snetListener = new InternalServiceNetworkListener();
private NodeId localNodeId;
@@ -172,7 +172,7 @@
}
private void updateProviderInstances(ServiceNetwork provider) {
- Set<DeviceId> devices = nodeManager.completeNodes().stream()
+ Set<DeviceId> devices = nodeService.completeNodes().stream()
.map(CordVtnNode::integrationBridgeId)
.collect(Collectors.toSet());
@@ -239,7 +239,7 @@
private void removeGroup(NetworkId netId) {
GroupKey groupKey = getGroupKey(netId);
- nodeManager.completeNodes().forEach(node -> {
+ nodeService.completeNodes().forEach(node -> {
DeviceId deviceId = node.integrationBridgeId();
Group group = groupService.getGroup(deviceId, groupKey);
if (group != null) {
@@ -285,7 +285,7 @@
Map<DeviceId, GroupId> providerGroups = Maps.newHashMap();
Map<DeviceId, Set<PortNumber>> subscriberPorts = Maps.newHashMap();
- nodeManager.completeNodes().forEach(node -> {
+ nodeService.completeNodes().forEach(node -> {
DeviceId deviceId = node.integrationBridgeId();
GroupId groupId = getProviderGroup(provider, deviceId);
providerGroups.put(deviceId, groupId);
@@ -355,7 +355,7 @@
.transition(TABLE_DST)
.build();
- nodeManager.completeNodes().forEach(node -> {
+ nodeService.completeNodes().forEach(node -> {
DeviceId deviceId = node.integrationBridgeId();
FlowRule flowRuleDirect = DefaultFlowRule.builder()
.fromApp(appId)
@@ -409,7 +409,7 @@
Set<Instance> instances) {
List<GroupBucket> buckets = Lists.newArrayList();
instances.forEach(instance -> {
- Ip4Address tunnelIp = nodeManager.dataIp(instance.deviceId()).getIp4Address();
+ Ip4Address tunnelIp = dataIp(instance.deviceId()).getIp4Address();
if (deviceId.equals(instance.deviceId())) {
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
@@ -418,13 +418,12 @@
.build();
buckets.add(createSelectGroupBucket(treatment));
} else {
- ExtensionTreatment tunnelDst =
- pipeline.tunnelDstTreatment(deviceId, tunnelIp);
+ ExtensionTreatment tunnelDst = tunnelDstTreatment(deviceId, tunnelIp);
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setEthDst(instance.mac())
.extension(tunnelDst, deviceId)
.setTunnelId(tunnelId)
- .setOutput(nodeManager.tunnelPort(instance.deviceId()))
+ .setOutput(tunnelPort(instance.deviceId()))
.build();
buckets.add(createSelectGroupBucket(treatment));
}
@@ -435,33 +434,30 @@
private class InternalServiceNetworkListener implements ServiceNetworkListener {
@Override
- public boolean isRelevant(ServiceNetworkEvent event) {
- // do not allow to proceed without leadership
- NodeId leader = leadershipService.getLeader(appId.name());
- return Objects.equals(localNodeId, leader);
+ public void event(ServiceNetworkEvent event) {
+ eventExecutor.execute(() -> {
+ NodeId leader = leadershipService.getLeader(appId.name());
+ if (!Objects.equals(localNodeId, leader)) {
+ // do not allow to proceed without leadership
+ return;
+ }
+ handle(event);
+ });
}
- @Override
- public void event(ServiceNetworkEvent event) {
-
+ private void handle(ServiceNetworkEvent event) {
switch (event.type()) {
case SERVICE_NETWORK_PROVIDER_ADDED:
log.debug("Dependency added: {}", event);
- eventExecutor.execute(() -> {
- dependencyAdded(
- event.subject(),
- event.provider().provider(),
- event.provider().type());
- });
+ dependencyAdded(event.subject(),
+ event.provider().provider(),
+ event.provider().type());
break;
case SERVICE_NETWORK_PROVIDER_REMOVED:
log.debug("Dependency removed: {}", event);
- eventExecutor.execute(() -> {
- dependencyRemoved(
- event.subject(),
- event.provider().provider(),
- event.provider().type());
- });
+ dependencyRemoved(event.subject(),
+ event.provider().provider(),
+ event.provider().type());
break;
case SERVICE_NETWORK_CREATED:
case SERVICE_NETWORK_UPDATED:
diff --git a/src/main/java/org/opencord/cordvtn/impl/handler/ManagementInstanceHandler.java b/src/main/java/org/opencord/cordvtn/impl/handler/ManagementInstanceHandler.java
index 0475f07..aea5da2 100644
--- a/src/main/java/org/opencord/cordvtn/impl/handler/ManagementInstanceHandler.java
+++ b/src/main/java/org/opencord/cordvtn/impl/handler/ManagementInstanceHandler.java
@@ -29,11 +29,12 @@
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.core.Instance;
import org.opencord.cordvtn.api.core.InstanceHandler;
+import org.opencord.cordvtn.api.core.ServiceNetworkService;
import org.opencord.cordvtn.api.net.ServiceNetwork;
-import org.opencord.cordvtn.impl.CordVtnNodeManager;
-import org.opencord.cordvtn.impl.CordVtnPipeline;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
import static org.opencord.cordvtn.api.net.ServiceNetwork.NetworkType.MANAGEMENT_HOST;
import static org.opencord.cordvtn.api.net.ServiceNetwork.NetworkType.MANAGEMENT_LOCAL;
@@ -46,10 +47,13 @@
public class ManagementInstanceHandler extends AbstractInstanceHandler implements InstanceHandler {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnPipeline pipeline;
+ protected ServiceNetworkService snetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CordVtnNodeManager nodeManager;
+ protected CordVtnNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CordVtnPipeline pipeline;
@Activate
protected void activate() {
@@ -123,7 +127,7 @@
}
private void populateHostsManagementRules(Instance instance, boolean install) {
- PortNumber hostMgmtPort = nodeManager.hostManagementPort(instance.deviceId());
+ PortNumber hostMgmtPort = hostManagementPort(instance.deviceId());
if (hostMgmtPort == null) {
log.warn("Can not find host management port in {}", instance.deviceId());
return;
diff --git a/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeManagerTest.java b/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeManagerTest.java
new file mode 100644
index 0000000..ab27957
--- /dev/null
+++ b/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeManagerTest.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.junit.TestUtils;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.Event;
+import org.onosproject.net.Device;
+import org.onosproject.net.config.NetworkConfigServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeListener;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.opencord.cordvtn.api.node.CordVtnNodeEvent.Type.*;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.COMPLETE;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.INIT;
+import static org.opencord.cordvtn.impl.DefaultCordVtnNode.updatedState;
+
+/**
+ * Unit tests for {@link CordVtnNodeManager}.
+ */
+public class CordVtnNodeManagerTest extends CordVtnNodeTest {
+
+ private static final ApplicationId TEST_APP_ID = new DefaultApplicationId(1, "test");
+ private static final String ERR_SIZE = "Number of nodes did not match";
+ private static final String ERR_NODE = "Node did not match";
+ private static final String ERR_NOT_FOUND = "Node did not exist";
+ private static final String ERR_STATE = "Node state did not match";
+
+ private static final String HOSTNAME_1 = "node-01";
+ private static final String HOSTNAME_2 = "node-02";
+ private static final String HOSTNAME_3 = "node-03";
+
+ private static final Device OF_DEVICE_1 = createDevice(1);
+ private static final Device OF_DEVICE_2 = createDevice(2);
+ private static final Device OF_DEVICE_3 = createDevice(3);
+
+ private static final CordVtnNode NODE_1 = createNode(HOSTNAME_1, OF_DEVICE_1, INIT);
+ private static final CordVtnNode NODE_2 = createNode(HOSTNAME_2, OF_DEVICE_2, INIT);
+ private static final CordVtnNode NODE_3 = createNode(HOSTNAME_3, OF_DEVICE_3, COMPLETE);
+
+ private final TestCordVtnNodeListener testListener = new TestCordVtnNodeListener();
+
+ private CordVtnNodeManager target;
+ private DistributedCordVtnNodeStore nodeStore;
+
+ @Before
+ public void setUp() throws Exception {
+ nodeStore = new DistributedCordVtnNodeStore();
+ TestUtils.setField(nodeStore, "coreService", new TestCoreService());
+ TestUtils.setField(nodeStore, "storageService", new TestStorageService());
+ nodeStore.activate();
+
+ nodeStore.createNode(NODE_2);
+ nodeStore.createNode(NODE_3);
+
+ target = new CordVtnNodeManager();
+ target.coreService = new TestCoreService();
+ target.leadershipService = new TestLeadershipService();
+ target.configService = new TestConfigService();
+ target.clusterService = new TestClusterService();
+ target.nodeStore = nodeStore;
+
+ target.activate();
+ target.addListener(testListener);
+ clearEvents();
+ }
+
+ @After
+ public void tearDown() {
+ target.removeListener(testListener);
+ nodeStore.deactivate();
+ target.deactivate();
+ nodeStore = null;
+ target = null;
+ }
+
+ /**
+ * Checks getting nodes returns correct set of the existing nodes.
+ */
+ @Test
+ public void testGetNodes() {
+ assertEquals(ERR_SIZE, 2, target.nodes().size());
+ assertTrue(ERR_NOT_FOUND, target.nodes().contains(NODE_2));
+ assertTrue(ERR_NOT_FOUND, target.nodes().contains(NODE_3));
+ }
+
+ /**
+ * Checks getting complete nodes method returns correct set of the
+ * existing complete nodes.
+ */
+ @Test
+ public void testGetCompleteNodes() {
+ assertEquals(ERR_SIZE, 1, target.completeNodes().size());
+ assertTrue(ERR_NOT_FOUND, target.completeNodes().contains(NODE_3));
+ }
+
+ /**
+ * Checks if getting node by hostname returns correct node.
+ */
+ @Test
+ public void testGetNodeByHostname() {
+ CordVtnNode node = target.node(HOSTNAME_2);
+ assertEquals(ERR_NODE, NODE_2, node);
+ }
+
+ /**
+ * Checks if getting node by device ID returns correct node.
+ */
+ @Test
+ public void testGetNodeByDeviceId() {
+ CordVtnNode node = target.node(OF_DEVICE_2.id());
+ assertEquals(ERR_NODE, NODE_2, node);
+ }
+
+ /**
+ * Checks if node creation with null fails with exception.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testCreateNullNode() {
+ target.createNode(null);
+ }
+
+ /**
+ * Checks if node removal with null hostname fails with exception.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testRemoveNullHostname() {
+ target.removeNode(null);
+ }
+
+ /**
+ * Checks if node update with null fails with exception.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testUpdateNullNode() {
+ target.updateNode(null);
+ }
+
+ /**
+ * Checks if duplicate node creation fails with exception.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateDuplicateNode() {
+ target.createNode(NODE_1);
+ target.createNode(NODE_1);
+ }
+
+ /**
+ * Checks if unregistered node update fails with exception.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testUpdateUnregisteredNode() {
+ target.updateNode(NODE_1);
+ }
+
+ /**
+ * Checks the basic node creation and removal.
+ */
+ @Test
+ public void testCreateAndRemoveNode() {
+ target.createNode(NODE_1);
+ assertEquals(ERR_SIZE, 3, target.nodes().size());
+ assertTrue(target.node(OF_DEVICE_1.id()) != null);
+
+ target.removeNode(HOSTNAME_1);
+ assertEquals(ERR_SIZE, 2, target.nodes().size());
+ assertTrue(target.node(OF_DEVICE_1.id()) == null);
+
+ validateEvents(NODE_CREATED, NODE_REMOVED);
+ }
+
+ /**
+ * Checks if node complete event is triggered when the node state changes
+ * to COMPLETE.
+ */
+ @Test
+ public void testUpdateNodeStateComplete() {
+ target.updateNode(updatedState(NODE_2, COMPLETE));
+ CordVtnNode node = target.node(HOSTNAME_2);
+ assertEquals(ERR_STATE, COMPLETE, node.state());
+
+ validateEvents(NODE_UPDATED, NODE_COMPLETE);
+ }
+
+ /**
+ * Checks if the node incomplete event is triggered when the node state
+ * falls back from COMPLETE to INIT.
+ */
+ @Test
+ public void testUpdateNodeStateIncomplete() {
+ target.updateNode(updatedState(NODE_3, INIT));
+ CordVtnNode node = target.node(HOSTNAME_3);
+ assertEquals(ERR_STATE, INIT, node.state());
+
+ validateEvents(NODE_UPDATED, NODE_INCOMPLETE);
+ }
+
+ private void clearEvents() {
+ TestTools.delay(100);
+ testListener.events.clear();
+ }
+
+ private void validateEvents(Enum... types) {
+ TestTools.assertAfter(100, () -> {
+ int i = 0;
+ assertEquals("Number of events did not match", types.length, testListener.events.size());
+ for (Event event : testListener.events) {
+ assertEquals("Incorrect event received", types[i], event.type());
+ i++;
+ }
+ testListener.events.clear();
+ });
+ }
+
+ private static class TestCordVtnNodeListener implements CordVtnNodeListener {
+ private List<CordVtnNodeEvent> events = Lists.newArrayList();
+
+ @Override
+ public void event(CordVtnNodeEvent event) {
+ events.add(event);
+ }
+ }
+
+ private static class TestCoreService extends CoreServiceAdapter {
+
+ @Override
+ public ApplicationId registerApplication(String name) {
+ return TEST_APP_ID;
+ }
+ }
+
+ private class TestConfigService extends NetworkConfigServiceAdapter {
+
+ }
+
+ private class TestClusterService extends ClusterServiceAdapter {
+
+ }
+
+ private static class TestLeadershipService extends LeadershipServiceAdapter {
+
+ }
+}
diff --git a/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeTest.java b/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeTest.java
new file mode 100644
index 0000000..1bc666f
--- /dev/null
+++ b/src/test/java/org/opencord/cordvtn/impl/CordVtnNodeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.TpPort;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.provider.ProviderId;
+import org.opencord.cordvtn.api.net.CidrAddr;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeState;
+import org.opencord.cordvtn.api.node.SshAccessInfo;
+
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.onosproject.net.Device.Type.SWITCH;
+
+/**
+ * Provides a set of test CordVtnNode parameters for use with CordVtnNode related tests.
+ */
+public abstract class CordVtnNodeTest {
+
+ public static final SshAccessInfo TEST_SSH_INFO = new SshAccessInfo(
+ Ip4Address.valueOf("192.168.0.1"),
+ TpPort.tpPort(22),
+ "root",
+ "/root/.ssh/id_rsa");
+ public static final CidrAddr TEST_CIDR_ADDR = CidrAddr.valueOf("192.168.0.1/24");
+ public static final String TEST_DATA_IFACE = "eth0";
+ public static final String TEST_VXLAN_IFACE = "vxlan";
+
+ public static Device createDevice(long devIdNum) {
+ return new DefaultDevice(new ProviderId("of", "foo"),
+ DeviceId.deviceId(String.format("of:%016d", devIdNum)),
+ SWITCH,
+ "manufacturer",
+ "hwVersion",
+ "swVersion",
+ "serialNumber",
+ new ChassisId(1));
+ }
+
+ public static Port createPort(Device device, long portNumber, String portName) {
+ return new DefaultPort(device,
+ PortNumber.portNumber(portNumber),
+ true,
+ DefaultAnnotations.builder().set(PORT_NAME, portName).build());
+ }
+
+ public static CordVtnNode createNode(String hostname, Device device, CordVtnNodeState state) {
+ return DefaultCordVtnNode.builder()
+ .hostname(hostname)
+ .hostManagementIp(TEST_CIDR_ADDR)
+ .localManagementIp(TEST_CIDR_ADDR)
+ .dataIp(TEST_CIDR_ADDR)
+ .integrationBridgeId(device.id())
+ .dataInterface(TEST_DATA_IFACE)
+ .sshInfo(TEST_SSH_INFO)
+ .state(state)
+ .build();
+ }
+}
diff --git a/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandlerTest.java b/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandlerTest.java
new file mode 100644
index 0000000..038d2da
--- /dev/null
+++ b/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeHandlerTest.java
@@ -0,0 +1,599 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.jcraft.jsch.Session;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onlab.junit.TestTools;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.Port;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.config.NetworkConfigServiceAdapter;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.host.HostDescription;
+import org.onosproject.net.host.HostServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.opencord.cordvtn.api.core.CordVtnPipeline;
+import org.opencord.cordvtn.api.core.InstanceService;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
+import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
+import org.opencord.cordvtn.api.node.CordVtnNodeListener;
+import org.opencord.cordvtn.api.node.CordVtnNodeService;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+import static org.onosproject.net.Device.Type.CONTROLLER;
+import static org.onosproject.net.NetTestTools.injectEventDispatcher;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.opencord.cordvtn.api.Constants.INTEGRATION_BRIDGE;
+import static org.opencord.cordvtn.api.node.CordVtnNodeEvent.Type.NODE_CREATED;
+import static org.opencord.cordvtn.api.node.CordVtnNodeEvent.Type.NODE_UPDATED;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
+import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+
+/**
+ * Unit test for CordVtnNodeHandler which provides cordvtn node bootstrap state machine.
+ */
+@RunWith(PowerMockRunner.class)
+public class DefaultCordVtnNodeHandlerTest extends CordVtnNodeTest {
+
+ private static final String ERR_STATE = "Node state did not match";
+
+ private static final ApplicationId TEST_APP_ID = new DefaultApplicationId(1, "test");
+ private static final NodeId LOCAL_NODE_ID = new NodeId("local");
+ private static final ControllerNode LOCAL_CTRL =
+ new DefaultControllerNode(LOCAL_NODE_ID, IpAddress.valueOf("127.0.0.1"));
+
+ private static final Device OVSDB_DEVICE = new TestDevice(
+ new ProviderId("of", "foo"),
+ DeviceId.deviceId("ovsdb:" + TEST_CIDR_ADDR.ip().toString()),
+ CONTROLLER,
+ "manufacturer",
+ "hwVersion",
+ "swVersion",
+ "serialNumber",
+ new ChassisId(1));
+
+ private static final Device OF_DEVICE_1 = createDevice(1);
+
+ private static final Device OF_DEVICE_2 = createDevice(2);
+ private static final Port OF_DEVICE_2_PORT_1 = createPort(OF_DEVICE_2, 1, TEST_DATA_IFACE);
+ private static final Port OF_DEVICE_2_PORT_2 = createPort(OF_DEVICE_2, 2, TEST_VXLAN_IFACE);
+
+ private static final Device OF_DEVICE_3 = createDevice(3);
+ private static final Port OF_DEVICE_3_PORT_1 = createPort(OF_DEVICE_3, 1, TEST_DATA_IFACE);
+ private static final Port OF_DEVICE_3_PORT_2 = createPort(OF_DEVICE_3, 2, TEST_VXLAN_IFACE);
+
+ private static final Device OF_DEVICE_4 = createDevice(4);
+ private static final Port OF_DEVICE_4_PORT_1 = createPort(OF_DEVICE_4, 1, TEST_DATA_IFACE);
+ private static final Port OF_DEVICE_4_PORT_2 = createPort(OF_DEVICE_4, 2, TEST_VXLAN_IFACE);
+
+ private static final CordVtnNode NODE_1 = createNode("node-01", OF_DEVICE_1, INIT);
+ private static final CordVtnNode NODE_2 = createNode("node-02", OF_DEVICE_2, DEVICE_CREATED);
+ private static final CordVtnNode NODE_3 = createNode("node-03", OF_DEVICE_3, PORT_CREATED);
+ private static final CordVtnNode NODE_4 = createNode("node-04", OF_DEVICE_4, COMPLETE);
+
+ private TestDeviceService deviceService;
+ private TestNodeManager nodeManager;
+ private DefaultCordVtnNodeHandler target;
+
+ @Before
+ public void setUp() throws Exception {
+ this.deviceService = new TestDeviceService();
+ this.nodeManager = new TestNodeManager();
+
+ // add fake ovsdb device
+ this.deviceService.devMap.put(OVSDB_DEVICE.id(), OVSDB_DEVICE);
+
+ // add fake OF devices
+ this.deviceService.devMap.put(OF_DEVICE_1.id(), OF_DEVICE_1);
+ this.deviceService.devMap.put(OF_DEVICE_2.id(), OF_DEVICE_2);
+ this.deviceService.devMap.put(OF_DEVICE_3.id(), OF_DEVICE_3);
+ this.deviceService.devMap.put(OF_DEVICE_4.id(), OF_DEVICE_4);
+
+ // add fake OF ports
+ this.deviceService.portList.add(OF_DEVICE_2_PORT_1);
+ this.deviceService.portList.add(OF_DEVICE_2_PORT_2);
+ this.deviceService.portList.add(OF_DEVICE_3_PORT_1);
+ this.deviceService.portList.add(OF_DEVICE_3_PORT_2);
+ this.deviceService.portList.add(OF_DEVICE_4_PORT_1);
+ this.deviceService.portList.add(OF_DEVICE_4_PORT_2);
+
+ // add fake nodes
+ this.nodeManager.nodeMap.put(OF_DEVICE_1.id(), NODE_1);
+ this.nodeManager.nodeMap.put(OF_DEVICE_2.id(), NODE_2);
+ this.nodeManager.nodeMap.put(OF_DEVICE_3.id(), NODE_3);
+ this.nodeManager.nodeMap.put(OF_DEVICE_4.id(), NODE_4);
+
+ OvsdbClientService mockOvsdbClient = createMock(OvsdbClientService.class);
+ expect(mockOvsdbClient.isConnected())
+ .andReturn(true)
+ .anyTimes();
+ replay(mockOvsdbClient);
+
+ OvsdbController mockOvsdbController = createMock(OvsdbController.class);
+ expect(mockOvsdbController.getOvsdbClient(anyObject()))
+ .andReturn(mockOvsdbClient)
+ .anyTimes();
+ replay(mockOvsdbController);
+
+ DeviceAdminService mockDeviceAdminService = createMock(DeviceAdminService.class);
+ mockDeviceAdminService.removeDevice(anyObject());
+ replay(mockDeviceAdminService);
+
+ target = new DefaultCordVtnNodeHandler();
+ target.coreService = new TestCoreService();
+ target.leadershipService = new TestLeadershipService();
+ target.clusterService = new TestClusterService();
+ target.configService = new TestConfigService();
+ target.deviceService = this.deviceService;
+ target.deviceAdminService = mockDeviceAdminService;
+ target.hostService = new TestHostService();
+ target.ovsdbController = mockOvsdbController;
+ target.nodeService = this.nodeManager;
+ target.nodeAdminService = this.nodeManager;
+ target.instanceService = new TestInstanceService();
+ target.pipelineService = new TestCordVtnPipeline();
+ injectEventDispatcher(target, new TestEventDispatcher());
+ target.activate();
+ }
+
+ @After
+ public void tearDown() {
+ target.deactivate();
+ deviceService = null;
+ nodeManager = null;
+ target = null;
+ }
+
+ /**
+ * Checks if the node state changes from INIT to DEVICE_CREATED when
+ * the integration bridge created.
+ */
+ @Test
+ public void testProcessInitState() {
+ deviceService.addDevice(OF_DEVICE_1);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_1.integrationBridgeId());
+ assertEquals(ERR_STATE, DEVICE_CREATED, current.state());
+ });
+ }
+
+ /**
+ * Checks if the node state changes from DEVICE_CREATED to PORT_CREATED
+ * when the data interface and vxlan interface are added.
+ */
+ @Test
+ public void testProcessDeviceCreatedState() {
+ // Add the data port and check if the state is still in DEVICE_CREAGED
+ deviceService.addPort(OF_DEVICE_2, OF_DEVICE_2_PORT_1);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_2.integrationBridgeId());
+ assertEquals(ERR_STATE, DEVICE_CREATED, current.state());
+ });
+
+ // Add the vxlan port and check if the state changes to PORT_CREATED
+ deviceService.addPort(OF_DEVICE_2, OF_DEVICE_2_PORT_2);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_2.integrationBridgeId());
+ assertEquals(ERR_STATE, PORT_CREATED, current.state());
+ });
+ }
+
+ /**
+ * Checks if the node state changes to COMPLETE when the network interface
+ * configuration is done.
+ */
+ @PrepareForTest(RemoteIpCommandUtil.class)
+ @Test
+ public void testProcessPortCreatedState() {
+ Session mockSession = createMock(Session.class);
+ mockStatic(RemoteIpCommandUtil.class);
+ expect(connect(anyObject())).andReturn(mockSession);
+ expect(getCurrentIps(mockSession, INTEGRATION_BRIDGE)).andReturn(Sets.newHashSet(TEST_CIDR_ADDR.ip()));
+ expect(getCurrentIps(mockSession, TEST_DATA_IFACE)).andReturn(Sets.newHashSet());
+ expect(isInterfaceUp(anyObject(), anyObject())).andReturn(true).anyTimes();
+ RemoteIpCommandUtil.disconnect(anyObject());
+ PowerMock.replay(RemoteIpCommandUtil.class);
+
+ // There's no events for IP address changes on the interfaces, so just
+ // Set node state updated to trigger node bootstrap
+ nodeManager.updateNode(NODE_3);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_3.integrationBridgeId());
+ assertEquals(ERR_STATE, COMPLETE, current.state());
+ });
+ }
+
+ /**
+ * Checks if the node state falls back to INIT when the integration bridge
+ * is removed.
+ */
+ @Test
+ public void testBackToInitStateWhenDeviceRemoved() {
+ // Remove the device from DEVICE_CREATED state node
+ deviceService.removeDevice(OF_DEVICE_2);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_2.integrationBridgeId());
+ assertEquals(ERR_STATE, INIT, current.state());
+ });
+
+ // Remove the device from PORT_CREATED state node
+ deviceService.removeDevice(OF_DEVICE_3);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_3.integrationBridgeId());
+ assertEquals(ERR_STATE, INIT, current.state());
+ });
+
+ // Remove the device from COMPLETE state node
+ deviceService.removeDevice(OF_DEVICE_4);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_4.integrationBridgeId());
+ assertEquals(ERR_STATE, INIT, current.state());
+ });
+ }
+
+ /**
+ * Checks if the node state falls back to DEVICE_CREATED when the ports
+ * are removed.
+ */
+ @Test
+ public void testBackToDeviceCreatedStateWhenPortRemoved() {
+ // Remove the device from PORT_CREATED state node
+ deviceService.removePort(OF_DEVICE_3, OF_DEVICE_3_PORT_1);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_3.integrationBridgeId());
+ assertEquals(ERR_STATE, DEVICE_CREATED, current.state());
+ });
+
+ // Remove the device from COMPLETE state node
+ deviceService.removePort(OF_DEVICE_4, OF_DEVICE_4_PORT_1);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_4.integrationBridgeId());
+ assertEquals(ERR_STATE, DEVICE_CREATED, current.state());
+ });
+ }
+
+ /**
+ * Checks if the node state falls back to PORT_CREATED when the interface
+ * configurations are incomplete.
+ */
+ @PrepareForTest(RemoteIpCommandUtil.class)
+ @Test
+ public void testBackToPortCreatedStateWhenIpRemoved() {
+ // Mocks SSH connection failure
+ mockStatic(RemoteIpCommandUtil.class);
+ expect(connect(anyObject())).andReturn(null);
+ PowerMock.replay(RemoteIpCommandUtil.class);
+
+ // ONOS is not able to detect IP is removed from the interface
+ // Just triggers node update event for the PORT_CREATED node and
+ // check if it stays in PORT_CREATED state
+ nodeManager.updateNode(NODE_3);
+ TestTools.assertAfter(100, () -> {
+ CordVtnNode current = nodeManager.node(NODE_3.integrationBridgeId());
+ assertEquals(ERR_STATE, PORT_CREATED, current.state());
+ });
+ }
+
+ private static final class TestDevice extends DefaultDevice {
+ InterfaceConfig mockInterfaceConfig = createMock(InterfaceConfig.class);
+ BridgeConfig mockBridgeConfig = createMock(BridgeConfig.class);
+
+ private TestDevice(ProviderId providerId,
+ DeviceId id,
+ Type type,
+ String manufacturer,
+ String hwVersion,
+ String swVersion,
+ String serialNumber,
+ ChassisId chassisId,
+ Annotations... annotations) {
+ super(providerId,
+ id,
+ type,
+ manufacturer,
+ hwVersion,
+ swVersion,
+ serialNumber,
+ chassisId,
+ annotations);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <B extends Behaviour> B as(Class<B> projectionClass) {
+ if (projectionClass.equals(BridgeConfig.class)) {
+ expect(this.mockBridgeConfig.addBridge((BridgeDescription) anyObject()))
+ .andReturn(true)
+ .anyTimes();
+ this.mockBridgeConfig.addPort(anyObject(), anyString());
+ replay(mockBridgeConfig);
+ return (B) mockBridgeConfig;
+ } else if (projectionClass.equals(InterfaceConfig.class)) {
+ expect(this.mockInterfaceConfig.addTunnelMode(anyString(), anyObject()))
+ .andReturn(true)
+ .anyTimes();
+ replay(mockInterfaceConfig);
+ return (B) mockInterfaceConfig;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public <B extends Behaviour> boolean is(Class<B> projectionClass) {
+ return true;
+ }
+ }
+
+
+ private static class TestCoreService extends CoreServiceAdapter {
+
+ @Override
+ public ApplicationId registerApplication(String name) {
+ return TEST_APP_ID;
+ }
+ }
+
+ private static class TestLeadershipService extends LeadershipServiceAdapter {
+
+ @Override
+ public NodeId getLeader(String path) {
+ return LOCAL_NODE_ID;
+ }
+ }
+
+ private static class TestClusterService extends ClusterServiceAdapter {
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return LOCAL_CTRL;
+ }
+ }
+
+ private static class TestConfigService extends NetworkConfigServiceAdapter {
+
+ }
+
+ private static class TestNodeManager implements CordVtnNodeService, CordVtnNodeAdminService {
+ Map<DeviceId, CordVtnNode> nodeMap = Maps.newHashMap();
+ List<CordVtnNodeListener> listeners = Lists.newArrayList();
+
+ @Override
+ public Set<CordVtnNode> nodes() {
+ return ImmutableSet.copyOf(nodeMap.values());
+ }
+
+ @Override
+ public Set<CordVtnNode> completeNodes() {
+ return null;
+ }
+
+ @Override
+ public CordVtnNode node(String hostname) {
+ return null;
+ }
+
+ @Override
+ public CordVtnNode node(DeviceId deviceId) {
+ return nodeMap.get(deviceId);
+ }
+
+ @Override
+ public void addListener(CordVtnNodeListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(CordVtnNodeListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void createNode(CordVtnNode node) {
+ nodeMap.put(node.integrationBridgeId(), node);
+ CordVtnNodeEvent event = new CordVtnNodeEvent(NODE_CREATED, node);
+ listeners.forEach(l -> l.event(event));
+ }
+
+ @Override
+ public void updateNode(CordVtnNode node) {
+ nodeMap.put(node.integrationBridgeId(), node);
+ CordVtnNodeEvent event = new CordVtnNodeEvent(NODE_UPDATED, node);
+ listeners.forEach(l -> l.event(event));
+ }
+
+ @Override
+ public CordVtnNode removeNode(String hostname) {
+ return null;
+ }
+ }
+
+ private static class TestDeviceService extends DeviceServiceAdapter {
+ Map<DeviceId, Device> devMap = Maps.newHashMap();
+ List<Port> portList = Lists.newArrayList();
+ List<DeviceListener> listeners = Lists.newArrayList();
+
+ @Override
+ public void addListener(DeviceListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(DeviceListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return devMap.get(deviceId);
+ }
+
+ @Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ return this.portList.stream()
+ .filter(p -> p.element().id().equals(deviceId))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return devMap.containsKey(deviceId);
+ }
+
+ void addDevice(Device device) {
+ devMap.put(device.id(), device);
+ DeviceEvent event = new DeviceEvent(DEVICE_ADDED, device);
+ listeners.forEach(l -> l.event(event));
+ }
+
+ void removeDevice(Device device) {
+ devMap.remove(device.id());
+ DeviceEvent event = new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device);
+ listeners.forEach(l -> l.event(event));
+ }
+
+ void addPort(Device device, Port port) {
+ portList.add(port);
+ DeviceEvent event = new DeviceEvent(PORT_ADDED, device, port);
+ listeners.forEach(l -> l.event(event));
+ }
+
+ void removePort(Device device, Port port) {
+ portList.remove(port);
+ DeviceEvent event = new DeviceEvent(PORT_REMOVED, device, port);
+ listeners.forEach(l -> l.event(event));
+ }
+ }
+
+ private static class TestHostService extends HostServiceAdapter {
+
+ @Override
+ public Iterable<Host> getHosts() {
+ return Lists.newArrayList();
+ }
+ }
+
+ private static class TestInstanceService implements InstanceService {
+
+ @Override
+ public void addInstance(ConnectPoint connectPoint) {
+
+ }
+
+ @Override
+ public void addInstance(HostId hostId, HostDescription description) {
+
+ }
+
+ @Override
+ public void removeInstance(ConnectPoint connectPoint) {
+
+ }
+
+ @Override
+ public void removeInstance(HostId hostId) {
+
+ }
+ }
+
+ private static class TestCordVtnPipeline implements CordVtnPipeline {
+
+ @Override
+ public void initPipeline(CordVtnNode node) {
+
+ }
+
+ @Override
+ public void cleanupPipeline() {
+
+ }
+
+ @Override
+ public void processFlowRule(boolean install, FlowRule rule) {
+
+ }
+ }
+
+ public class TestEventDispatcher extends DefaultEventSinkRegistry
+ implements EventDeliveryService {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized void post(Event event) {
+ EventSink sink = getSink(event.getClass());
+ checkState(sink != null, "No sink for event %s", event);
+ sink.process(event);
+ }
+
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return 0;
+ }
+ }
+}
diff --git a/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeTest.java b/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeTest.java
new file mode 100644
index 0000000..022904e
--- /dev/null
+++ b/src/test/java/org/opencord/cordvtn/impl/DefaultCordVtnNodeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.cordvtn.impl;
+
+import com.google.common.testing.EqualsTester;
+import org.junit.Test;
+import org.onosproject.net.Device;
+import org.opencord.cordvtn.api.node.CordVtnNode;
+
+import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.COMPLETE;
+import static org.opencord.cordvtn.api.node.CordVtnNodeState.INIT;
+
+/**
+ * Unit test of {@link DefaultCordVtnNode} model entity.
+ */
+public class DefaultCordVtnNodeTest extends CordVtnNodeTest {
+
+ private static final Device OF_DEVICE_1 = createDevice(1);
+ private static final CordVtnNode NODE_1 = createNode("node-01", OF_DEVICE_1, INIT);
+ private static final CordVtnNode NODE_2 = createNode("node-01", OF_DEVICE_1, COMPLETE);
+ private static final CordVtnNode NODE_3 = createNode("node-03", OF_DEVICE_1, INIT);
+
+ @Test
+ public void testImmutability() {
+ assertThatClassIsImmutable(DefaultCordVtnNode.class);
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester().addEqualityGroup(NODE_1, NODE_2)
+ .addEqualityGroup(NODE_3)
+ .testEquals();
+ }
+}