Moving VTN away from old XOS APIs onto GRPC APIs.
Instead of the VTN app pulling the ServiceNetwork and SerivcePort
info from XOS, the VTN simply sets a flag to indicate it wants a
full resync of the data. The VTN synchronizer then knows to send
all ServiceNetwork and ServicePort information on next sync.
Change-Id: I84bd1e6bb691dcbb9667c30c252921894b06bb2a
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnNetworkListCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnNetworkListCommand.java
index 10ae833..9043cea 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnNetworkListCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnNetworkListCommand.java
@@ -42,7 +42,7 @@
protected void execute() {
ServiceNetworkService service = AbstractShellCommand.get(ServiceNetworkService.class);
List<ServiceNetwork> networks = Lists.newArrayList(service.serviceNetworks());
- networks.sort(Comparator.comparing(ServiceNetwork::name));
+ networks.sort(Comparator.comparing(sn -> sn.id().id()));
if (outputJson()) {
try {
diff --git a/src/main/java/org/opencord/cordvtn/cli/CordVtnSyncXosStatesCommand.java b/src/main/java/org/opencord/cordvtn/cli/CordVtnSyncXosStatesCommand.java
index a5f5610..ba794d1 100644
--- a/src/main/java/org/opencord/cordvtn/cli/CordVtnSyncXosStatesCommand.java
+++ b/src/main/java/org/opencord/cordvtn/cli/CordVtnSyncXosStatesCommand.java
@@ -18,21 +18,15 @@
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.cordvtn.api.core.ServiceNetworkAdminService;
-import org.opencord.cordvtn.api.net.ServiceNetwork;
-import org.opencord.cordvtn.api.net.ServicePort;
import org.opencord.cordvtn.rest.XosVtnNetworkingClient;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* Synchronizes network states with XOS VTN service.
* This command can be used to actively synchronize XOS network with VTN
* service network.
*/
@Command(scope = "onos", name = "cordvtn-sync-xos-states",
- description = "Synchronizes network states with Neutron")
+ description = "Synchronizes network states with XOS")
public class CordVtnSyncXosStatesCommand extends AbstractShellCommand {
@Argument(index = 0, name = "endpoint", description = "XOS VTN service endpoint",
@@ -47,56 +41,15 @@
required = true, multiValued = false)
private String password = null;
- private static final String NET_FORMAT = "%-40s%-30s%-20s%-8s%-20s%s";
- private static final String PORT_FORMAT = "%-40s%-30s%-20s%-18s%-10s%s";
-
@Override
protected void execute() {
- ServiceNetworkAdminService snetService =
- AbstractShellCommand.get(ServiceNetworkAdminService.class);
-
XosVtnNetworkingClient client = XosVtnNetworkingClient.builder()
.endpoint(endpoint)
.user(user)
.password(password)
.build();
- print("Synchronizing service networks...");
- print(NET_FORMAT, "ID", "Name", "Type", "VNI", "Subnet", "Service IP");
- client.serviceNetworks().forEach(snet -> {
- if (snetService.serviceNetwork(snet.id()) != null) {
- snetService.updateServiceNetwork(snet);
- } else {
- snetService.createServiceNetwork(snet);
- }
- ServiceNetwork updated = snetService.serviceNetwork(snet.id());
- print(NET_FORMAT, updated.id(),
- updated.name(),
- updated.type(),
- updated.segmentId(),
- updated.subnet(),
- updated.serviceIp());
- });
-
- // FIXME creating a port fails until XOS service API provides network ID
- print("\nSynchronizing service ports...");
- print(PORT_FORMAT, "ID", "Name", "MAC", "IP", "VLAN", "WAN IPs");
- client.servicePorts().forEach(sport -> {
- if (snetService.servicePort(sport.id()) != null) {
- snetService.updateServicePort(sport);
- } else {
- snetService.createServicePort(sport);
- }
- ServicePort updated = snetService.servicePort(sport.id());
- List<String> floatingIps = updated.addressPairs().stream()
- .map(ip -> ip.ip().toString())
- .collect(Collectors.toList());
- print(PORT_FORMAT, updated.id(),
- updated.name(),
- updated.mac(),
- updated.ip(),
- updated.vlanId() != null ? updated.vlanId() : "",
- floatingIps.isEmpty() ? "" : floatingIps);
- });
+ print("Requesting state synchronization");
+ client.requestSync();
}
}
diff --git a/src/main/java/org/opencord/cordvtn/rest/XosVtnNetworkingClient.java b/src/main/java/org/opencord/cordvtn/rest/XosVtnNetworkingClient.java
index b0a3c25..34ad685 100644
--- a/src/main/java/org/opencord/cordvtn/rest/XosVtnNetworkingClient.java
+++ b/src/main/java/org/opencord/cordvtn/rest/XosVtnNetworkingClient.java
@@ -19,24 +19,19 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
import org.glassfish.jersey.client.ClientProperties;
-import org.onlab.util.Tools;
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.onosproject.rest.AbstractWebResource;
-import org.opencord.cordvtn.api.net.NetworkId;
-import org.opencord.cordvtn.api.net.PortId;
-import org.opencord.cordvtn.api.net.ServiceNetwork;
-import org.opencord.cordvtn.api.net.ServicePort;
import org.slf4j.Logger;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
-import java.util.Set;
-import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.net.MediaType.JSON_UTF_8;
@@ -50,17 +45,14 @@
protected final Logger log = getLogger(getClass());
- private static final String URL_BASE = "/api/service/vtn/";
- private static final String URL_SERVICE_NETWORKS = "serviceNetworks/";
- private static final String URL_SERVICE_PORTS = "servicePorts/";
+ private static final String URL_BASE = "/xosapi/v1/vtn/";
- private static final String SERVICE_PORTS = "servicePorts";
- private static final String SERVICE_PORT = "servicePort";
- private static final String SERVICE_NETWORKS = "serviceNetworks";
- private static final String SERVICE_NETWORK = "serviceNetwork";
private static final String EMPTY_JSON_STRING = "{}";
- private static final String MSG_RECEIVED = "Received ";
+ private static final String VTN_ID = "id";
+ private static final String VTN_RESYNC = "resync";
+ private static final String VTN_SERVICES = "vtnservices";
+
private static final String ERR_LOG = "Received %s result with wrong format: %s";
private static final int DEFAULT_TIMEOUT_MS = 2000;
@@ -75,86 +67,20 @@
this.user = user;
this.password = password;
+ client.register(HttpAuthenticationFeature.basic(user, password));
client.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_TIMEOUT_MS);
client.property(ClientProperties.READ_TIMEOUT, DEFAULT_TIMEOUT_MS);
mapper().enable(SerializationFeature.INDENT_OUTPUT);
}
- public Set<ServiceNetwork> serviceNetworks() {
- String response = restGet(URL_SERVICE_NETWORKS);
- final String error = String.format(ERR_LOG, SERVICE_NETWORKS, response);
- try {
- JsonNode jsonTree = mapper().readTree(response).get(SERVICE_NETWORKS);
- if (jsonTree == null) {
- return ImmutableSet.of();
- }
- log.trace(MSG_RECEIVED + SERVICE_NETWORKS);
- log.trace(mapper().writeValueAsString(jsonTree));
- return Tools.stream(jsonTree).map(snet -> codec(ServiceNetwork.class)
- .decode((ObjectNode) snet, this))
- .collect(Collectors.toSet());
- } catch (IOException e) {
- throw new IllegalArgumentException(error);
- }
- }
-
- public ServiceNetwork serviceNetwork(NetworkId netId) {
- String response = restGet(URL_SERVICE_NETWORKS + netId.id());
- final String error = String.format(ERR_LOG, SERVICE_NETWORK, response);
- try {
- JsonNode jsonTree = mapper().readTree(response).get(SERVICE_NETWORK);
- if (jsonTree == null) {
- throw new IllegalArgumentException(error);
- }
- log.trace(MSG_RECEIVED + SERVICE_NETWORK);
- log.trace(mapper().writeValueAsString(jsonTree));
- return codec(ServiceNetwork.class).decode((ObjectNode) jsonTree, this);
- } catch (IOException e) {
- throw new IllegalArgumentException(error);
- }
- }
-
- public Set<ServicePort> servicePorts() {
- String response = restGet(URL_SERVICE_PORTS);
- final String error = String.format(ERR_LOG, SERVICE_PORTS, response);
- try {
- JsonNode jsonTree = mapper().readTree(response).get(SERVICE_PORTS);
- if (jsonTree == null) {
- ImmutableSet.of();
- }
- log.trace(MSG_RECEIVED + SERVICE_PORTS);
- log.trace(mapper().writeValueAsString(jsonTree));
- return Tools.stream(jsonTree).map(sport -> codec(ServicePort.class)
- .decode((ObjectNode) sport, this))
- .collect(Collectors.toSet());
- } catch (IOException e) {
- throw new IllegalArgumentException(error);
- }
- }
-
- public ServicePort servicePort(PortId portId) {
- String response = restGet(URL_SERVICE_PORTS + portId.id());
- final String error = String.format(ERR_LOG, SERVICE_PORT, response);
- try {
- JsonNode jsonTree = mapper().readTree(response).get(SERVICE_PORT);
- if (jsonTree == null) {
- throw new IllegalArgumentException(error);
- }
- log.trace(MSG_RECEIVED + SERVICE_PORT);
- log.trace(mapper().writeValueAsString(jsonTree));
- return codec(ServicePort.class).decode((ObjectNode) jsonTree, this);
- } catch (IOException e) {
- throw new IllegalArgumentException(error);
- }
- }
-
private String restGet(String path) {
- WebTarget wt = client.target(endpoint + URL_BASE).path(path);
+ WebTarget wt = client.target("http://" + endpoint + URL_BASE).path(path);
Invocation.Builder builder = wt.request(JSON_UTF_8.toString());
try {
Response response = builder.get();
if (response.getStatus() != HTTP_OK) {
log.warn("Failed to get resource {}", endpoint + URL_BASE + path);
+ log.warn("reason {}", response.readEntity(String.class));
return EMPTY_JSON_STRING;
}
} catch (javax.ws.rs.ProcessingException e) {
@@ -163,6 +89,49 @@
return builder.get(String.class);
}
+ private String restPut(String path, JsonNode request) {
+ WebTarget wt = client.target("http://" + endpoint + URL_BASE).path(path);
+ Invocation.Builder builder = wt.request(MediaType.APPLICATION_JSON)
+ .accept(JSON_UTF_8.toString());
+
+ try {
+ Response response = builder.put(Entity.entity(request.toString(),
+ MediaType.APPLICATION_JSON_TYPE));
+ String strResponse = response.readEntity(String.class);
+ if (response.getStatus() != HTTP_OK) {
+ throw new IllegalArgumentException("Failed to put resource "
+ + response.getStatus() + ": " + strResponse);
+ }
+ return strResponse;
+ } catch (javax.ws.rs.ProcessingException e) {
+ return EMPTY_JSON_STRING;
+ }
+ }
+
+ public void requestSync() {
+ String response = restGet(VTN_SERVICES);
+ final String error = String.format(ERR_LOG, VTN_SERVICES, response);
+ try {
+ JsonNode jsonTree = mapper().readTree(response).path("items");
+ if (!jsonTree.isArray() || jsonTree.size() < 1) {
+ throw new IllegalArgumentException(error);
+ }
+ JsonNode vtnService = jsonTree.get(0);
+ String vtnServiceId = vtnService.path(VTN_ID).asText();
+ if (vtnServiceId == null || vtnServiceId.equals("")) {
+ throw new IllegalArgumentException(error);
+ }
+ log.info("Requesting sync for VTN service {}", vtnServiceId);
+
+ ObjectNode request = mapper().createObjectNode()
+ .put(VTN_RESYNC, true);
+
+ restPut(VTN_SERVICES + "/" + vtnServiceId, request);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(error);
+ }
+ }
+
/**
* Returns endpoint url for the XOS service API access.
*
@@ -221,7 +190,6 @@
checkArgument(!Strings.isNullOrEmpty(user));
checkArgument(!Strings.isNullOrEmpty(password));
- // TODO perform authentication when XOS provides it
return new XosVtnNetworkingClient(endpoint, user, password);
}
diff --git a/xos/synchronizer/steps/sync_vtn_service.py b/xos/synchronizer/steps/sync_vtn_service.py
index 7b7aedb..862b0f6 100644
--- a/xos/synchronizer/steps/sync_vtn_service.py
+++ b/xos/synchronizer/steps/sync_vtn_service.py
@@ -211,12 +211,24 @@
logger.error("Received error from vtn service (%d)" % r.status_code)
def call(self, **args):
+ global glo_saved_networks
+ global glo_saved_ports
+
vtn_service = VTNService.objects.all()
if not vtn_service:
raise Exception("No VTN Service")
vtn_service = vtn_service[0]
+ if (vtn_service.resync):
+ # If the VTN app requested a full resync, clear our saved network
+ # so we will resync everything, then reset the 'resync' flag
+ glo_saved_networks = {}
+ glo_saved_ports = {}
+
+ vtn_service.resync = False
+ vtn_service.save()
+
if vtn_service.vtnAPIVersion>=2:
# version 2 means use new API
logger.info("Using New API")
diff --git a/xos/vtn.xproto b/xos/vtn.xproto
index bd7ff54..ec98877 100644
--- a/xos/vtn.xproto
+++ b/xos/vtn.xproto
@@ -15,4 +15,5 @@
required string xosPassword = 10 [default = "letmein", max_length = 255, content_type = "stripped", blank = False, null = False, db_index = False];
required int32 vtnAPIVersion = 11 [default = 1, null = False, db_index = False, blank = False];
required string controllerPort = 12 [default = "onos-cord:6653", max_length = 255, content_type = "stripped", blank = False, null = False, db_index = False];
+ required bool resync = 13 [default = False, null = False, db_index = False, blank = False];
}