Public IP configurable
Change-Id: I5b5f1a846701326abbdc81295646d431c1ff62bf
diff --git a/global/config-samples/ecord-global-config.json b/global/config-samples/ecord-global-config.json
index 9d41fb0..61bbda4 100644
--- a/global/config-samples/ecord-global-config.json
+++ b/global/config-samples/ecord-global-config.json
@@ -20,9 +20,7 @@
[
{
"domainId" : "10.128.14.50",
- "clusterIps" : [
- "10.128.14.50"
- ],
+ "publicIp" : "10.128.14.50",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
@@ -30,9 +28,7 @@
},
{
"domainId" : "10.128.14.30",
- "clusterIps" : [
- "10.128.14.30"
- ],
+ "publicIp" : "10.128.14.30",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
@@ -42,5 +38,4 @@
}
}
}
-
}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java
deleted file mode 100644
index d470f9f..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel;
-
-
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.domain.DomainId;
-import org.onosproject.net.domain.DomainService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.opencord.ce.api.services.channel.ConnectionService;
-import org.opencord.ce.api.services.channel.ControlChannelListenerService;
-import org.opencord.ce.api.services.channel.EndPoint;
-import org.opencord.ce.global.channel.client.ConnectionConfig;
-import org.opencord.ce.global.channel.client.HttpClientInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.client.Client;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-
-/**
- * Southbound component that adds a listener to the
- * {@link ControlChannelListenerService}.
- *
- * Accomplishes network tasks through HTTP requests,
- * acting as a client that send {@link org.opencord.ce.api.models.CarrierEthernetForwardingConstruct}
- * requests to the underlying domains
- */
-@Component(immediate = true)
-@Service(ConnectionService.class)
-public class HttpClientComponent implements ConnectionService {
- private static final String APP_NAME = "org.opencord.ce.global.channel.http";
-
- // temporary
- private static final String TOPIC_ONE = "ecord-domains-topic-one";
- private static final String TOPIC_TWO = "ecord-domains-topic-two";
- private static final String TOPIC_THREE = "ecord-domains-topic-three";
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private ApplicationId appId;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry configRegistry;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigService configService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ControlChannelListenerService channelListenerService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DomainService domainService;
-
- private final Set<EndPoint> endPoints = Sets.newHashSet();
-
- private ConsistentMap<EndPoint, IpAddress> endPointMasterIpMap;
-
- private final ExecutorService eventExecutor =
- Executors.newSingleThreadExecutor();
-
- private final ConfigFactory<ApplicationId, ConnectionConfig> configFactory =
- new ConfigFactory<ApplicationId, ConnectionConfig>(SubjectFactories.APP_SUBJECT_FACTORY,
- ConnectionConfig.class, "endPoints") {
- @Override
- public ConnectionConfig createConfig() {
- return new ConnectionConfig();
- }
- };
-
- private final NetworkConfigListener configListener = new InternalConfigListener();
-
- @Activate
- protected void activate() {
- log.info("Started");
-
- appId = coreService.registerApplication(APP_NAME);
- endPointMasterIpMap = storageService.<EndPoint, IpAddress>consistentMapBuilder()
- .withName("ecord-domain-endpoints")
- .withSerializer(Serializer.using(
- new KryoNamespace.Builder()
- .register(KryoNamespaces.API)
- .register(DomainId.class)
- .register(EndPoint.class)
- .build()
- )).build();
- configRegistry.registerConfigFactory(configFactory);
- configService.addListener(configListener);
- channelListenerService.addListener(HttpClientInstance.INSTANCE);
- leadershipService.runForLeadership(TOPIC_ONE);
- leadershipService.runForLeadership(TOPIC_TWO);
- leadershipService.runForLeadership(TOPIC_THREE);
- }
-
- @Deactivate
- protected void deactivate() {
- log.info("Stopped");
-
- configService.removeListener(configListener);
- configRegistry.unregisterConfigFactory(configFactory);
- channelListenerService.removeListener(HttpClientInstance.INSTANCE);
- leadershipService.withdraw(TOPIC_ONE);
- leadershipService.withdraw(TOPIC_TWO);
- leadershipService.withdraw(TOPIC_THREE);
- HttpClientInstance.INSTANCE.stopNetworkTasks();
- }
-
- @Override
- public IpAddress getLocalMasterIp(String domainId) {
- synchronized (this) {
- for (EndPoint ep : endPoints) {
- if (ep.domainId().id().equals(domainId)) {
- String topic = ep.topic();
- String masterIp;
- String leaderId = leadershipService.getLeader(topic).id();
- log.info("local leaderId: " + leaderId);
- masterIp = clusterService.getNode(NodeId.nodeId(leaderId)).ip().toString();
-
- return IpAddress.valueOf(masterIp);
- }
- }
- }
- log.info("Found no leader for domain " + domainId +
- "-- endPoints size: " + endPoints.size());
- return null;
- }
-
- @Override
- public IpAddress getRemoteMasterIp(String domainId) {
- synchronized (this) {
- for (EndPoint ep : endPoints) {
- if (ep.domainId().id().equals(domainId)) {
- return HttpClientInstance.INSTANCE.getRemoteMasterIp(ep);
- }
- }
- }
- log.info("Found no master ip for domain {}", domainId);
- return null;
- }
-
- @Override
- public Pair<Client, IpAddress> getConnectionInfo(DomainId domainId) {
- Client client = HttpClientInstance.INSTANCE
- .getConnectionInfo(domainId);
- return Pair.of(client, getRemoteMasterIp(domainId.id()));
- }
-
- private void readConfig() {
- ConnectionConfig config = configRegistry.getConfig(appId, ConnectionConfig.class);
- log.debug("Domains connections config received");
-
- synchronized (this) {
- endPoints.addAll(config.endPoints());
- }
-
- HttpClientInstance.INSTANCE.configure(clusterService, leadershipService, domainService,
- endPointMasterIpMap, config);
- }
-
- private class InternalConfigListener implements NetworkConfigListener {
-
- @Override
- public void event(NetworkConfigEvent event) {
- if (!event.configClass().equals(ConnectionConfig.class)) {
- return;
- }
- switch (event.type()) {
- case CONFIG_ADDED:
- log.info("Network configuration added");
- eventExecutor.execute(HttpClientComponent.this::readConfig);
- break;
- case CONFIG_UPDATED:
- log.info("Network configuration updated");
- eventExecutor.execute(HttpClientComponent.this::readConfig);
- break;
- default:
- break;
- }
- }
- }
-}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
index c9c91f7..0a3938f 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
@@ -22,7 +22,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.net.config.Config;
import org.onosproject.net.domain.DomainId;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import java.util.ArrayList;
import java.util.Set;
@@ -40,13 +40,11 @@
private static final String DOMAINS = "domains";
private static final String DOMAIN_ID = "domainId";
- private static final String DOMAIN_IPS = "clusterIps";
+ private static final String PUBLIC_IP = "publicIp";
private static final String USERNAME = "username";
private static final String PASSWD = "password";
private static final String TOPIC = "topic";
-
-
/**
* Gets listen port from configuration.
* @return port number
@@ -71,17 +69,14 @@
* Returns set of domain end points.
* @return set of domain end points
*/
- public Set<EndPoint> endPoints() {
+ public Set<DomainEndPoint> endPoints() {
JsonNode peersNode = object.get(DOMAINS);
- Set<EndPoint> endPoints = Sets.newHashSet();
+ Set<DomainEndPoint> endPoints = Sets.newHashSet();
peersNode.forEach(jsonNode -> {
DomainId domainId = DomainId.domainId(
jsonNode.path(DOMAIN_ID).asText());
- Set<IpAddress> ipAddresses = Sets.newHashSet();
- jsonNode.path(DOMAIN_IPS).forEach(ipAddress -> ipAddresses.add(
- IpAddress.valueOf(ipAddress.asText())
- ));
+ IpAddress publicIp = IpAddress.valueOf(jsonNode.path(PUBLIC_IP).asText());
int port = jsonNode.path(PORT).asInt();
String username = jsonNode.path(USERNAME).asText();
@@ -89,7 +84,7 @@
String topic = jsonNode.path(TOPIC).asText();
- endPoints.add(new EndPoint(domainId, ipAddresses, port,
+ endPoints.add(new DomainEndPoint(domainId, publicIp, port,
username, password, topic));
});
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java
deleted file mode 100644
index 8638639..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel.client;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.Timer;
-import org.opencord.ce.api.services.channel.EndPoint;
-import org.slf4j.Logger;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.opencord.ce.api.services.channel.Symbols.BASE_URL;
-import static org.opencord.ce.api.services.channel.Symbols.COLON;
-import static org.opencord.ce.api.services.channel.Symbols.DOUBLESLASH;
-import static org.opencord.ce.api.services.channel.Symbols.HTTP;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_IP;
-import static org.opencord.ce.api.services.channel.Symbols.OK;
-import static org.opencord.ce.api.services.channel.Symbols.RESULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Executed periodically to find the leader node of a domain.
- * Stops when the ip is found
- */
-public class DomainMasterIpDiscoveryTask implements TimerTask {
-
- private final Logger log = getLogger(getClass());
-
- private Timeout timeout;
- private volatile boolean isStopped;
- private EndPoint endPoint;
- private Client client;
-
- ObjectMapper mapper;
-
- public DomainMasterIpDiscoveryTask(EndPoint endPoint, Client client,
- ObjectMapper mapper) {
- this.endPoint = endPoint;
- this.client = client;
- this.mapper = mapper;
-
- isStopped = true;
- start();
-
- }
-
- public synchronized void stop() {
- if (!isStopped) {
- isStopped = true;
- timeout.cancel();
- } else {
- log.warn("IpDiscovery stopped multiple times?");
- }
- }
-
- public synchronized void start() {
- if (isStopped) {
- isStopped = false;
- timeout = Timer.getTimer().newTimeout(this, 0, SECONDS);
- } else {
- log.warn("IpDiscovery started multiple times?");
- }
- }
-
- public synchronized boolean isStopped() {
- return isStopped || timeout.isCancelled();
- }
-
- @Override
- public void run(Timeout t) {
- if (isStopped()) {
- return;
- }
- for (IpAddress ipAddress : endPoint.ipAddresses()) {
- String url = HTTP + COLON + DOUBLESLASH + ipAddress.toString() + COLON +
- endPoint.port() + BASE_URL + MASTER;
- log.info("masterIp url: " + url);
- WebTarget wt = client.target(url);
- Response response = wt.request(MediaType.APPLICATION_JSON)
- .get();
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- continue;
- }
- String stringBody = response.readEntity(String.class);
- log.info("getLocalMasterIp() response: " + stringBody);
- try {
- ObjectNode responseBody = (ObjectNode) mapper.readTree(stringBody);
- if (responseBody.path(RESULT).asText().equals(OK)) {
- IpAddress masterIpAdress = IpAddress.valueOf(responseBody.path(MASTER_IP).asText());
- HttpClientInstance.INSTANCE.setMasterIp(endPoint, masterIpAdress);
- this.stop();
- return;
- }
- } catch (IOException ex) {
- log.info("getLocalMasterIp() IOException, try next endpoint ip");
- }
- }
-
- if (!isStopped()) {
- timeout = Timer.getTimer().newTimeout(this, 3, SECONDS);
- }
- }
-}
-
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java
new file mode 100644
index 0000000..5bfb5d0
--- /dev/null
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.ce.global.channel.client;
+
+
+import com.google.common.collect.Sets;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.domain.DomainService;
+import org.opencord.ce.api.services.channel.ControlChannelListenerService;
+import org.opencord.ce.api.models.DomainEndPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * Southbound component that adds a listener to the
+ * {@link ControlChannelListenerService}.
+ *
+ * Accomplishes network tasks through HTTP requests,
+ * acting as a client that send {@link org.opencord.ce.api.models.CarrierEthernetForwardingConstruct}
+ * requests to the underlying domains
+ */
+@Component(immediate = true)
+public class HttpClientComponent {
+ private static final String APP_NAME = "org.opencord.ce.global.channel.http";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private ApplicationId appId;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry configRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ControlChannelListenerService channelListenerService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DomainService domainService;
+
+ private static boolean isRunningForLeadership = false;
+
+ private final Set<DomainEndPoint> endPoints = Sets.newHashSet();
+ private final List<String> topics = new ArrayList<>();
+
+ private final ExecutorService eventExecutor =
+ Executors.newSingleThreadExecutor();
+
+ private final ConfigFactory<ApplicationId, ConnectionConfig> configFactory =
+ new ConfigFactory<ApplicationId, ConnectionConfig>(SubjectFactories.APP_SUBJECT_FACTORY,
+ ConnectionConfig.class, "endPoints") {
+ @Override
+ public ConnectionConfig createConfig() {
+ return new ConnectionConfig();
+ }
+ };
+
+ private final NetworkConfigListener configListener = new InternalConfigListener();
+
+ @Activate
+ protected void activate() {
+ log.info("Started");
+
+ appId = coreService.registerApplication(APP_NAME);
+ configRegistry.registerConfigFactory(configFactory);
+ configService.addListener(configListener);
+ channelListenerService.addListener(HttpClientInstance.INSTANCE);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ configService.removeListener(configListener);
+ configRegistry.unregisterConfigFactory(configFactory);
+ channelListenerService.removeListener(HttpClientInstance.INSTANCE);
+ if (isRunningForLeadership) {
+ topics.forEach(topic -> leadershipService.withdraw(topic));
+ }
+ }
+
+ private void runForLeadership(List<String> topics) {
+ topics.forEach(topic ->
+ leadershipService.runForLeadership(topic));
+ isRunningForLeadership = true;
+ }
+
+
+ private void readConfig() {
+ ConnectionConfig config = configRegistry.getConfig(appId, ConnectionConfig.class);
+ log.debug("Domains connections config received");
+
+ endPoints.addAll(config.endPoints());
+ topics.addAll(config.topics());
+ runForLeadership(topics);
+ HttpClientInstance.INSTANCE.configure(clusterService, leadershipService, domainService, endPoints);
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ if (!event.configClass().equals(ConnectionConfig.class)) {
+ return;
+ }
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ log.info("Network configuration added");
+ eventExecutor.execute(HttpClientComponent.this::readConfig);
+ break;
+ case CONFIG_UPDATED:
+ log.info("Network configuration updated");
+ eventExecutor.execute(HttpClientComponent.this::readConfig);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
index cf16ab5..eb820fb 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
@@ -23,17 +23,15 @@
import org.onosproject.codec.JsonCodec;
import org.onosproject.net.domain.DomainId;
import org.onosproject.net.domain.DomainService;
-import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.rest.AbstractWebResource;
-import org.onosproject.store.service.ConsistentMap;
import org.opencord.ce.api.models.CarrierEthernetForwardingConstruct;
import org.opencord.ce.api.models.CarrierEthernetNetworkInterface;
import org.opencord.ce.api.models.CarrierEthernetUni;
+import org.opencord.ce.api.models.DomainEndPoint;
import org.opencord.ce.api.models.EvcConnId;
import org.opencord.ce.api.services.MetroNetworkVirtualNodeService;
-import org.opencord.ce.api.services.channel.EndPoint;
import org.opencord.ce.api.services.channel.RequestCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,11 +74,7 @@
private static final int STATUS_OK = Response.Status.OK.getStatusCode();
private static final int STATUS_REQ_UNPROCESSABLE = Response.Status.NOT_ACCEPTABLE.getStatusCode();
- private final Map<DomainId, Pair<Client, EndPoint>> domainsEndPointsMap = Maps.newConcurrentMap();
-
- private ConsistentMap<EndPoint, IpAddress> endPointMasterIpMap;
-
- protected final Map<EndPoint, DomainMasterIpDiscoveryTask> ipDiscoveryTaskMap = Maps.newConcurrentMap();
+ private final Map<DomainId, Pair<Client, DomainEndPoint>> domainsEndPointsMap = Maps.newConcurrentMap();
private final AbstractWebResource codecContext = new AbstractWebResource();
@@ -95,57 +89,33 @@
// newFixedThreadPool(5, groupedThreads("opencord/ecord-http", "event-handler"));
public void configure(ClusterService clusterService, LeadershipService leadershipService,
- DomainService domainService, ConsistentMap<EndPoint,
- IpAddress> endPointMasterIpMap, ConnectionConfig connConfig) {
+ DomainService domainService, Set<DomainEndPoint> endPoints) {
if (!configured) {
this.clusterService = clusterService;
this.leadershipService = leadershipService;
this.domainService = domainService;
- this.endPointMasterIpMap = endPointMasterIpMap;
configured = true;
}
- connConfig.endPoints().forEach(siteConfig -> {
+ endPoints.forEach(siteConfig -> {
DomainId domainId = siteConfig.domainId();
synchronized (this) {
domainsEndPointsMap.putIfAbsent(domainId,
Pair.of(createClient(), siteConfig));
notify();
}
- String topic = siteConfig.topic();
- // TODO: add leadership listeners to react to changes
- if (isLeader(topic)) {
- log.info("I am the leader for domain: {} ", domainId);
- probeMasterIp(siteConfig, domainsEndPointsMap.get(domainId).getLeft());
- } else {
- log.info("I am NOT the leader for domain: {}", domainId);
-
- }
});
}
- private void probeMasterIp(EndPoint endPoint, Client client) {
- ipDiscoveryTaskMap.putIfAbsent(endPoint,
- new DomainMasterIpDiscoveryTask(endPoint, client, codecContext.mapper()));
- }
-
- protected void setMasterIp(EndPoint endPoint, IpAddress ipAddress) {
- synchronized (this) {
- endPointMasterIpMap.put(endPoint, ipAddress);
- notify();
- }
- }
-
- public IpAddress getRemoteMasterIp(EndPoint endPoint) {
- synchronized (this) {
- return endPointMasterIpMap.get(endPoint).value();
- }
- }
-
public Client getConnectionInfo(DomainId domainId) {
return domainsEndPointsMap.get(domainId).getLeft();
}
+ private boolean isLeader(String topic) {
+ return leadershipService.getLeader(topic).id()
+ .equals(clusterService.getLocalNode().id().id());
+ }
+
private boolean checkReply(Response response) {
if (response != null) {
return checkStatusCode(response.getStatus());
@@ -164,11 +134,6 @@
}
}
- private boolean isLeader(String topic) {
- return leadershipService.getLeader(topic).id()
- .equals(clusterService.getLocalNode().id().id());
- }
-
private String getTopic(DomainId domainId) {
return domainsEndPointsMap.get(domainId).getRight().topic();
}
@@ -177,10 +142,6 @@
return ClientBuilder.newClient();
}
- public void stopNetworkTasks() {
- ipDiscoveryTaskMap.forEach((ep, task) -> task.stop());
- }
-
@Override
public void setNodeForwarding(CarrierEthernetForwardingConstruct fc, CarrierEthernetNetworkInterface srcNi,
Set<CarrierEthernetNetworkInterface> dstNiSet) {
@@ -201,35 +162,6 @@
dstNiSet.forEach(dstNi ->
dstNiJsonArrayNode.add(niCodec.encode(dstNi, codecContext)));
body.set(DST_NI_LIST, dstNiJsonArrayNode);
-/*
- String fcId = fc.id().id();
- VlanId fcTag = fc.vlanId();
- ConnectPoint ingressCp = srcNi.cp();
- CarrierEthernetNetworkInterface.Type srcType = srcNi.type();
- VlanId sTag = srcNi.sVlanId();
- VlanId ceTag = srcNi.ceVlanId();
- CarrierEthernetForwardingConstruct.Type fcType = fc.type();
- ArrayList<Pair<ConnectPoint, CarrierEthernetNetworkInterface.Type>> egressList =
- new ArrayList<>();
- dstNiSet.forEach(dstNi -> egressList.add(Pair.of(dstNi.cp(), dstNi.type())));
-
- ObjectNode jsonBody = codecContext.mapper().createObjectNode()
- .put(FC_ID, fcId)
- .put(FC_TAG, fcTag.toShort())
- .put(FC_TYPE, fcType.toString())
- .put(INGRESS_NI_TYPE, srcType.toString());
- JsonCodec<ConnectPoint> cpCodec = codecContext.codec(ConnectPoint.class);
- jsonBody.set(FC_INGRESS_CP, cpCodec.encode(ingressCp, codecContext));
- jsonBody.put(INGRESS_FC_TAG, sTag.toShort())
- .put(CUSTOMER_TAG, ceTag.toShort());
- ArrayNode egressListNode = codecContext.newArray(jsonBody, FC_EGRESS_LST);
- dstNiSet.forEach(dstNi -> {
- ObjectNode item = codecContext.mapper().createObjectNode()
- .put(FC_EGRESS_TYPE, dstNi.type().toString());
- item.set(FC_EGRESS_CP, cpCodec.encode(dstNi.cp(), codecContext));
- egressListNode.add(item);
- });
- */
String resource = "/ForwardingConstruct";
networkExecutor.execute(new NetworkTask(domainId, POST, resource, body.toString(),
new RequestCallback() {
@@ -379,25 +311,11 @@
}
}
- EndPoint endPoint = domainsEndPointsMap.get(domainId).getRight();
- IpAddress ipAddress;
- synchronized (this) {
- while (!endPointMasterIpMap.containsKey(endPoint) ||
- endPointMasterIpMap.get(endPoint).value() == null) {
- try {
- log.info("wait() master ip");
- wait();
- } catch (InterruptedException ie) {
- log.info("Interrupted exception: " + ie.getMessage());
- }
- }
- ipAddress = endPointMasterIpMap.get(endPoint).value();
- }
-
+ DomainEndPoint endPoint = domainsEndPointsMap.get(domainId).getRight();
Client client = domainsEndPointsMap.get(domainId).getLeft();
- String url = HTTP + COLON + DOUBLESLASH + ipAddress.toString() + COLON +
+ String url = HTTP + COLON + DOUBLESLASH + endPoint.publicIp().toString() + COLON +
endPoint.port() + BASE_URL + "/carrierethernet" + resource;
log.info("DEBUG {}: Sending data via http: {}", url, body);
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java
deleted file mode 100644
index 6ba8469..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel.server;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.onlab.packet.IpAddress;
-import org.onosproject.rest.AbstractWebResource;
-import org.opencord.ce.api.services.channel.ConnectionService;
-import org.slf4j.Logger;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_API_NO_IP_BODY;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_IP;
-import static org.opencord.ce.api.services.channel.Symbols.OK;
-import static org.opencord.ce.api.services.channel.Symbols.RESULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Gets the ip of the instance leader of the communication with the specified
- * domainId.
- */
-@Path("master/{domainId}")
-public class DomainMasterIpResource extends AbstractWebResource {
- private final Logger log = getLogger(getClass());
-
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public Response getMasterIp(@PathParam("domainId") String id) {
- log.info("Domain {} asks who is the master for him", id);
-
- IpAddress ip = get(ConnectionService.class).getLocalMasterIp(id);
- if (ip != null) {
- ObjectNode body = mapper().createObjectNode();
- body.put(RESULT, OK);
- body.put(MASTER_IP, ip.toString());
-
- return ok(body.toString()).build();
- } else {
- return ok(MASTER_API_NO_IP_BODY).build();
- }
- }
-}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
index bd0691f..fd05d86 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
@@ -27,6 +27,6 @@
@Override
public Set<Class<?>> getClasses() {
- return getClasses(DeviceResource.class, DomainMasterIpResource.class);
+ return getClasses(DeviceResource.class);
}
}