Public IP configurable
Change-Id: I5b5f1a846701326abbdc81295646d431c1ff62bf
diff --git a/ce-api/src/main/java/org/opencord/ce/api/services/channel/EndPoint.java b/ce-api/src/main/java/org/opencord/ce/api/models/DomainEndPoint.java
similarity index 81%
rename from ce-api/src/main/java/org/opencord/ce/api/services/channel/EndPoint.java
rename to ce-api/src/main/java/org/opencord/ce/api/models/DomainEndPoint.java
index 7e8305f..f5dfc42 100644
--- a/ce-api/src/main/java/org/opencord/ce/api/services/channel/EndPoint.java
+++ b/ce-api/src/main/java/org/opencord/ce/api/models/DomainEndPoint.java
@@ -14,28 +14,26 @@
* limitations under the License.
*/
-package org.opencord.ce.api.services.channel;
+package org.opencord.ce.api.models;
import org.onlab.packet.IpAddress;
import org.onosproject.net.domain.DomainId;
-import java.util.Set;
-
/**
* ONOS cluster endpoint for the communication between local and global.
*/
-public class EndPoint {
+public class DomainEndPoint {
private DomainId domainId;
- private Set<IpAddress> ipAddresses;
+ private IpAddress publicIp;
private int port;
private String username;
private String password;
private String topic;
- public EndPoint(DomainId domainId, Set<IpAddress> ipAddresses, int port,
+ public DomainEndPoint(DomainId domainId, IpAddress publicIp, int port,
String username, String password, String topic) {
this.domainId = domainId;
- this.ipAddresses = ipAddresses;
+ this.publicIp = publicIp;
this.port = port;
this.username = username;
this.password = password;
@@ -46,8 +44,8 @@
return domainId;
}
- public Set<IpAddress> ipAddresses() {
- return ipAddresses;
+ public IpAddress publicIp() {
+ return publicIp;
}
public int port() {
diff --git a/ce-api/src/main/java/org/opencord/ce/api/services/channel/ConnectionService.java b/ce-api/src/main/java/org/opencord/ce/api/services/channel/ConnectionService.java
deleted file mode 100644
index 89df1b4..0000000
--- a/ce-api/src/main/java/org/opencord/ce/api/services/channel/ConnectionService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.api.services.channel;
-
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.packet.IpAddress;
-import org.onosproject.net.domain.DomainId;
-
-import javax.ws.rs.client.Client;
-
-/**
- * Service to control local and remote domain endpoints.
- */
-public interface ConnectionService {
-
- /**
- * Gets the ip of the local (ONOS cluster) instance in charge of handling the communication
- * with the specified domain.
- *
- * @param domainId domain identifier
- * @return ip address of the local instance
- */
- IpAddress getLocalMasterIp(String domainId);
-
- /**
- * Gets the ip of the remote (ONOS cluster) instance in charge of
- * handling the communication with the local cluster.
- *
- * @param domainId domain identifier of the remote cluster
- * @return the ip address of the remote instance
- */
- IpAddress getRemoteMasterIp(String domainId);
-
- /**
- * Gets the connection info for the specified domain.
- *
- * @param domainId target domain ID
- * @return pair of http client object and destination IP address
- */
- Pair<Client, IpAddress> getConnectionInfo(DomainId domainId);
-}
diff --git a/global/config-samples/ecord-global-config.json b/global/config-samples/ecord-global-config.json
index 9d41fb0..61bbda4 100644
--- a/global/config-samples/ecord-global-config.json
+++ b/global/config-samples/ecord-global-config.json
@@ -20,9 +20,7 @@
[
{
"domainId" : "10.128.14.50",
- "clusterIps" : [
- "10.128.14.50"
- ],
+ "publicIp" : "10.128.14.50",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
@@ -30,9 +28,7 @@
},
{
"domainId" : "10.128.14.30",
- "clusterIps" : [
- "10.128.14.30"
- ],
+ "publicIp" : "10.128.14.30",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
@@ -42,5 +38,4 @@
}
}
}
-
}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java
deleted file mode 100644
index d470f9f..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/HttpClientComponent.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel;
-
-
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.NetworkConfigService;
-import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.domain.DomainId;
-import org.onosproject.net.domain.DomainService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.opencord.ce.api.services.channel.ConnectionService;
-import org.opencord.ce.api.services.channel.ControlChannelListenerService;
-import org.opencord.ce.api.services.channel.EndPoint;
-import org.opencord.ce.global.channel.client.ConnectionConfig;
-import org.opencord.ce.global.channel.client.HttpClientInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.client.Client;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-
-/**
- * Southbound component that adds a listener to the
- * {@link ControlChannelListenerService}.
- *
- * Accomplishes network tasks through HTTP requests,
- * acting as a client that send {@link org.opencord.ce.api.models.CarrierEthernetForwardingConstruct}
- * requests to the underlying domains
- */
-@Component(immediate = true)
-@Service(ConnectionService.class)
-public class HttpClientComponent implements ConnectionService {
- private static final String APP_NAME = "org.opencord.ce.global.channel.http";
-
- // temporary
- private static final String TOPIC_ONE = "ecord-domains-topic-one";
- private static final String TOPIC_TWO = "ecord-domains-topic-two";
- private static final String TOPIC_THREE = "ecord-domains-topic-three";
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private ApplicationId appId;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry configRegistry;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigService configService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ControlChannelListenerService channelListenerService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DomainService domainService;
-
- private final Set<EndPoint> endPoints = Sets.newHashSet();
-
- private ConsistentMap<EndPoint, IpAddress> endPointMasterIpMap;
-
- private final ExecutorService eventExecutor =
- Executors.newSingleThreadExecutor();
-
- private final ConfigFactory<ApplicationId, ConnectionConfig> configFactory =
- new ConfigFactory<ApplicationId, ConnectionConfig>(SubjectFactories.APP_SUBJECT_FACTORY,
- ConnectionConfig.class, "endPoints") {
- @Override
- public ConnectionConfig createConfig() {
- return new ConnectionConfig();
- }
- };
-
- private final NetworkConfigListener configListener = new InternalConfigListener();
-
- @Activate
- protected void activate() {
- log.info("Started");
-
- appId = coreService.registerApplication(APP_NAME);
- endPointMasterIpMap = storageService.<EndPoint, IpAddress>consistentMapBuilder()
- .withName("ecord-domain-endpoints")
- .withSerializer(Serializer.using(
- new KryoNamespace.Builder()
- .register(KryoNamespaces.API)
- .register(DomainId.class)
- .register(EndPoint.class)
- .build()
- )).build();
- configRegistry.registerConfigFactory(configFactory);
- configService.addListener(configListener);
- channelListenerService.addListener(HttpClientInstance.INSTANCE);
- leadershipService.runForLeadership(TOPIC_ONE);
- leadershipService.runForLeadership(TOPIC_TWO);
- leadershipService.runForLeadership(TOPIC_THREE);
- }
-
- @Deactivate
- protected void deactivate() {
- log.info("Stopped");
-
- configService.removeListener(configListener);
- configRegistry.unregisterConfigFactory(configFactory);
- channelListenerService.removeListener(HttpClientInstance.INSTANCE);
- leadershipService.withdraw(TOPIC_ONE);
- leadershipService.withdraw(TOPIC_TWO);
- leadershipService.withdraw(TOPIC_THREE);
- HttpClientInstance.INSTANCE.stopNetworkTasks();
- }
-
- @Override
- public IpAddress getLocalMasterIp(String domainId) {
- synchronized (this) {
- for (EndPoint ep : endPoints) {
- if (ep.domainId().id().equals(domainId)) {
- String topic = ep.topic();
- String masterIp;
- String leaderId = leadershipService.getLeader(topic).id();
- log.info("local leaderId: " + leaderId);
- masterIp = clusterService.getNode(NodeId.nodeId(leaderId)).ip().toString();
-
- return IpAddress.valueOf(masterIp);
- }
- }
- }
- log.info("Found no leader for domain " + domainId +
- "-- endPoints size: " + endPoints.size());
- return null;
- }
-
- @Override
- public IpAddress getRemoteMasterIp(String domainId) {
- synchronized (this) {
- for (EndPoint ep : endPoints) {
- if (ep.domainId().id().equals(domainId)) {
- return HttpClientInstance.INSTANCE.getRemoteMasterIp(ep);
- }
- }
- }
- log.info("Found no master ip for domain {}", domainId);
- return null;
- }
-
- @Override
- public Pair<Client, IpAddress> getConnectionInfo(DomainId domainId) {
- Client client = HttpClientInstance.INSTANCE
- .getConnectionInfo(domainId);
- return Pair.of(client, getRemoteMasterIp(domainId.id()));
- }
-
- private void readConfig() {
- ConnectionConfig config = configRegistry.getConfig(appId, ConnectionConfig.class);
- log.debug("Domains connections config received");
-
- synchronized (this) {
- endPoints.addAll(config.endPoints());
- }
-
- HttpClientInstance.INSTANCE.configure(clusterService, leadershipService, domainService,
- endPointMasterIpMap, config);
- }
-
- private class InternalConfigListener implements NetworkConfigListener {
-
- @Override
- public void event(NetworkConfigEvent event) {
- if (!event.configClass().equals(ConnectionConfig.class)) {
- return;
- }
- switch (event.type()) {
- case CONFIG_ADDED:
- log.info("Network configuration added");
- eventExecutor.execute(HttpClientComponent.this::readConfig);
- break;
- case CONFIG_UPDATED:
- log.info("Network configuration updated");
- eventExecutor.execute(HttpClientComponent.this::readConfig);
- break;
- default:
- break;
- }
- }
- }
-}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
index c9c91f7..0a3938f 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/ConnectionConfig.java
@@ -22,7 +22,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.net.config.Config;
import org.onosproject.net.domain.DomainId;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import java.util.ArrayList;
import java.util.Set;
@@ -40,13 +40,11 @@
private static final String DOMAINS = "domains";
private static final String DOMAIN_ID = "domainId";
- private static final String DOMAIN_IPS = "clusterIps";
+ private static final String PUBLIC_IP = "publicIp";
private static final String USERNAME = "username";
private static final String PASSWD = "password";
private static final String TOPIC = "topic";
-
-
/**
* Gets listen port from configuration.
* @return port number
@@ -71,17 +69,14 @@
* Returns set of domain end points.
* @return set of domain end points
*/
- public Set<EndPoint> endPoints() {
+ public Set<DomainEndPoint> endPoints() {
JsonNode peersNode = object.get(DOMAINS);
- Set<EndPoint> endPoints = Sets.newHashSet();
+ Set<DomainEndPoint> endPoints = Sets.newHashSet();
peersNode.forEach(jsonNode -> {
DomainId domainId = DomainId.domainId(
jsonNode.path(DOMAIN_ID).asText());
- Set<IpAddress> ipAddresses = Sets.newHashSet();
- jsonNode.path(DOMAIN_IPS).forEach(ipAddress -> ipAddresses.add(
- IpAddress.valueOf(ipAddress.asText())
- ));
+ IpAddress publicIp = IpAddress.valueOf(jsonNode.path(PUBLIC_IP).asText());
int port = jsonNode.path(PORT).asInt();
String username = jsonNode.path(USERNAME).asText();
@@ -89,7 +84,7 @@
String topic = jsonNode.path(TOPIC).asText();
- endPoints.add(new EndPoint(domainId, ipAddresses, port,
+ endPoints.add(new DomainEndPoint(domainId, publicIp, port,
username, password, topic));
});
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java
deleted file mode 100644
index 8638639..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/DomainMasterIpDiscoveryTask.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel.client;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.Timer;
-import org.opencord.ce.api.services.channel.EndPoint;
-import org.slf4j.Logger;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.opencord.ce.api.services.channel.Symbols.BASE_URL;
-import static org.opencord.ce.api.services.channel.Symbols.COLON;
-import static org.opencord.ce.api.services.channel.Symbols.DOUBLESLASH;
-import static org.opencord.ce.api.services.channel.Symbols.HTTP;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_IP;
-import static org.opencord.ce.api.services.channel.Symbols.OK;
-import static org.opencord.ce.api.services.channel.Symbols.RESULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Executed periodically to find the leader node of a domain.
- * Stops when the ip is found
- */
-public class DomainMasterIpDiscoveryTask implements TimerTask {
-
- private final Logger log = getLogger(getClass());
-
- private Timeout timeout;
- private volatile boolean isStopped;
- private EndPoint endPoint;
- private Client client;
-
- ObjectMapper mapper;
-
- public DomainMasterIpDiscoveryTask(EndPoint endPoint, Client client,
- ObjectMapper mapper) {
- this.endPoint = endPoint;
- this.client = client;
- this.mapper = mapper;
-
- isStopped = true;
- start();
-
- }
-
- public synchronized void stop() {
- if (!isStopped) {
- isStopped = true;
- timeout.cancel();
- } else {
- log.warn("IpDiscovery stopped multiple times?");
- }
- }
-
- public synchronized void start() {
- if (isStopped) {
- isStopped = false;
- timeout = Timer.getTimer().newTimeout(this, 0, SECONDS);
- } else {
- log.warn("IpDiscovery started multiple times?");
- }
- }
-
- public synchronized boolean isStopped() {
- return isStopped || timeout.isCancelled();
- }
-
- @Override
- public void run(Timeout t) {
- if (isStopped()) {
- return;
- }
- for (IpAddress ipAddress : endPoint.ipAddresses()) {
- String url = HTTP + COLON + DOUBLESLASH + ipAddress.toString() + COLON +
- endPoint.port() + BASE_URL + MASTER;
- log.info("masterIp url: " + url);
- WebTarget wt = client.target(url);
- Response response = wt.request(MediaType.APPLICATION_JSON)
- .get();
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- continue;
- }
- String stringBody = response.readEntity(String.class);
- log.info("getLocalMasterIp() response: " + stringBody);
- try {
- ObjectNode responseBody = (ObjectNode) mapper.readTree(stringBody);
- if (responseBody.path(RESULT).asText().equals(OK)) {
- IpAddress masterIpAdress = IpAddress.valueOf(responseBody.path(MASTER_IP).asText());
- HttpClientInstance.INSTANCE.setMasterIp(endPoint, masterIpAdress);
- this.stop();
- return;
- }
- } catch (IOException ex) {
- log.info("getLocalMasterIp() IOException, try next endpoint ip");
- }
- }
-
- if (!isStopped()) {
- timeout = Timer.getTimer().newTimeout(this, 3, SECONDS);
- }
- }
-}
-
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java
new file mode 100644
index 0000000..5bfb5d0
--- /dev/null
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientComponent.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.ce.global.channel.client;
+
+
+import com.google.common.collect.Sets;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.domain.DomainService;
+import org.opencord.ce.api.services.channel.ControlChannelListenerService;
+import org.opencord.ce.api.models.DomainEndPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * Southbound component that adds a listener to the
+ * {@link ControlChannelListenerService}.
+ *
+ * Accomplishes network tasks through HTTP requests,
+ * acting as a client that send {@link org.opencord.ce.api.models.CarrierEthernetForwardingConstruct}
+ * requests to the underlying domains
+ */
+@Component(immediate = true)
+public class HttpClientComponent {
+ private static final String APP_NAME = "org.opencord.ce.global.channel.http";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private ApplicationId appId;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry configRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ControlChannelListenerService channelListenerService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DomainService domainService;
+
+ private static boolean isRunningForLeadership = false;
+
+ private final Set<DomainEndPoint> endPoints = Sets.newHashSet();
+ private final List<String> topics = new ArrayList<>();
+
+ private final ExecutorService eventExecutor =
+ Executors.newSingleThreadExecutor();
+
+ private final ConfigFactory<ApplicationId, ConnectionConfig> configFactory =
+ new ConfigFactory<ApplicationId, ConnectionConfig>(SubjectFactories.APP_SUBJECT_FACTORY,
+ ConnectionConfig.class, "endPoints") {
+ @Override
+ public ConnectionConfig createConfig() {
+ return new ConnectionConfig();
+ }
+ };
+
+ private final NetworkConfigListener configListener = new InternalConfigListener();
+
+ @Activate
+ protected void activate() {
+ log.info("Started");
+
+ appId = coreService.registerApplication(APP_NAME);
+ configRegistry.registerConfigFactory(configFactory);
+ configService.addListener(configListener);
+ channelListenerService.addListener(HttpClientInstance.INSTANCE);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ configService.removeListener(configListener);
+ configRegistry.unregisterConfigFactory(configFactory);
+ channelListenerService.removeListener(HttpClientInstance.INSTANCE);
+ if (isRunningForLeadership) {
+ topics.forEach(topic -> leadershipService.withdraw(topic));
+ }
+ }
+
+ private void runForLeadership(List<String> topics) {
+ topics.forEach(topic ->
+ leadershipService.runForLeadership(topic));
+ isRunningForLeadership = true;
+ }
+
+
+ private void readConfig() {
+ ConnectionConfig config = configRegistry.getConfig(appId, ConnectionConfig.class);
+ log.debug("Domains connections config received");
+
+ endPoints.addAll(config.endPoints());
+ topics.addAll(config.topics());
+ runForLeadership(topics);
+ HttpClientInstance.INSTANCE.configure(clusterService, leadershipService, domainService, endPoints);
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ if (!event.configClass().equals(ConnectionConfig.class)) {
+ return;
+ }
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ log.info("Network configuration added");
+ eventExecutor.execute(HttpClientComponent.this::readConfig);
+ break;
+ case CONFIG_UPDATED:
+ log.info("Network configuration updated");
+ eventExecutor.execute(HttpClientComponent.this::readConfig);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
index cf16ab5..eb820fb 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/client/HttpClientInstance.java
@@ -23,17 +23,15 @@
import org.onosproject.codec.JsonCodec;
import org.onosproject.net.domain.DomainId;
import org.onosproject.net.domain.DomainService;
-import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.rest.AbstractWebResource;
-import org.onosproject.store.service.ConsistentMap;
import org.opencord.ce.api.models.CarrierEthernetForwardingConstruct;
import org.opencord.ce.api.models.CarrierEthernetNetworkInterface;
import org.opencord.ce.api.models.CarrierEthernetUni;
+import org.opencord.ce.api.models.DomainEndPoint;
import org.opencord.ce.api.models.EvcConnId;
import org.opencord.ce.api.services.MetroNetworkVirtualNodeService;
-import org.opencord.ce.api.services.channel.EndPoint;
import org.opencord.ce.api.services.channel.RequestCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,11 +74,7 @@
private static final int STATUS_OK = Response.Status.OK.getStatusCode();
private static final int STATUS_REQ_UNPROCESSABLE = Response.Status.NOT_ACCEPTABLE.getStatusCode();
- private final Map<DomainId, Pair<Client, EndPoint>> domainsEndPointsMap = Maps.newConcurrentMap();
-
- private ConsistentMap<EndPoint, IpAddress> endPointMasterIpMap;
-
- protected final Map<EndPoint, DomainMasterIpDiscoveryTask> ipDiscoveryTaskMap = Maps.newConcurrentMap();
+ private final Map<DomainId, Pair<Client, DomainEndPoint>> domainsEndPointsMap = Maps.newConcurrentMap();
private final AbstractWebResource codecContext = new AbstractWebResource();
@@ -95,57 +89,33 @@
// newFixedThreadPool(5, groupedThreads("opencord/ecord-http", "event-handler"));
public void configure(ClusterService clusterService, LeadershipService leadershipService,
- DomainService domainService, ConsistentMap<EndPoint,
- IpAddress> endPointMasterIpMap, ConnectionConfig connConfig) {
+ DomainService domainService, Set<DomainEndPoint> endPoints) {
if (!configured) {
this.clusterService = clusterService;
this.leadershipService = leadershipService;
this.domainService = domainService;
- this.endPointMasterIpMap = endPointMasterIpMap;
configured = true;
}
- connConfig.endPoints().forEach(siteConfig -> {
+ endPoints.forEach(siteConfig -> {
DomainId domainId = siteConfig.domainId();
synchronized (this) {
domainsEndPointsMap.putIfAbsent(domainId,
Pair.of(createClient(), siteConfig));
notify();
}
- String topic = siteConfig.topic();
- // TODO: add leadership listeners to react to changes
- if (isLeader(topic)) {
- log.info("I am the leader for domain: {} ", domainId);
- probeMasterIp(siteConfig, domainsEndPointsMap.get(domainId).getLeft());
- } else {
- log.info("I am NOT the leader for domain: {}", domainId);
-
- }
});
}
- private void probeMasterIp(EndPoint endPoint, Client client) {
- ipDiscoveryTaskMap.putIfAbsent(endPoint,
- new DomainMasterIpDiscoveryTask(endPoint, client, codecContext.mapper()));
- }
-
- protected void setMasterIp(EndPoint endPoint, IpAddress ipAddress) {
- synchronized (this) {
- endPointMasterIpMap.put(endPoint, ipAddress);
- notify();
- }
- }
-
- public IpAddress getRemoteMasterIp(EndPoint endPoint) {
- synchronized (this) {
- return endPointMasterIpMap.get(endPoint).value();
- }
- }
-
public Client getConnectionInfo(DomainId domainId) {
return domainsEndPointsMap.get(domainId).getLeft();
}
+ private boolean isLeader(String topic) {
+ return leadershipService.getLeader(topic).id()
+ .equals(clusterService.getLocalNode().id().id());
+ }
+
private boolean checkReply(Response response) {
if (response != null) {
return checkStatusCode(response.getStatus());
@@ -164,11 +134,6 @@
}
}
- private boolean isLeader(String topic) {
- return leadershipService.getLeader(topic).id()
- .equals(clusterService.getLocalNode().id().id());
- }
-
private String getTopic(DomainId domainId) {
return domainsEndPointsMap.get(domainId).getRight().topic();
}
@@ -177,10 +142,6 @@
return ClientBuilder.newClient();
}
- public void stopNetworkTasks() {
- ipDiscoveryTaskMap.forEach((ep, task) -> task.stop());
- }
-
@Override
public void setNodeForwarding(CarrierEthernetForwardingConstruct fc, CarrierEthernetNetworkInterface srcNi,
Set<CarrierEthernetNetworkInterface> dstNiSet) {
@@ -201,35 +162,6 @@
dstNiSet.forEach(dstNi ->
dstNiJsonArrayNode.add(niCodec.encode(dstNi, codecContext)));
body.set(DST_NI_LIST, dstNiJsonArrayNode);
-/*
- String fcId = fc.id().id();
- VlanId fcTag = fc.vlanId();
- ConnectPoint ingressCp = srcNi.cp();
- CarrierEthernetNetworkInterface.Type srcType = srcNi.type();
- VlanId sTag = srcNi.sVlanId();
- VlanId ceTag = srcNi.ceVlanId();
- CarrierEthernetForwardingConstruct.Type fcType = fc.type();
- ArrayList<Pair<ConnectPoint, CarrierEthernetNetworkInterface.Type>> egressList =
- new ArrayList<>();
- dstNiSet.forEach(dstNi -> egressList.add(Pair.of(dstNi.cp(), dstNi.type())));
-
- ObjectNode jsonBody = codecContext.mapper().createObjectNode()
- .put(FC_ID, fcId)
- .put(FC_TAG, fcTag.toShort())
- .put(FC_TYPE, fcType.toString())
- .put(INGRESS_NI_TYPE, srcType.toString());
- JsonCodec<ConnectPoint> cpCodec = codecContext.codec(ConnectPoint.class);
- jsonBody.set(FC_INGRESS_CP, cpCodec.encode(ingressCp, codecContext));
- jsonBody.put(INGRESS_FC_TAG, sTag.toShort())
- .put(CUSTOMER_TAG, ceTag.toShort());
- ArrayNode egressListNode = codecContext.newArray(jsonBody, FC_EGRESS_LST);
- dstNiSet.forEach(dstNi -> {
- ObjectNode item = codecContext.mapper().createObjectNode()
- .put(FC_EGRESS_TYPE, dstNi.type().toString());
- item.set(FC_EGRESS_CP, cpCodec.encode(dstNi.cp(), codecContext));
- egressListNode.add(item);
- });
- */
String resource = "/ForwardingConstruct";
networkExecutor.execute(new NetworkTask(domainId, POST, resource, body.toString(),
new RequestCallback() {
@@ -379,25 +311,11 @@
}
}
- EndPoint endPoint = domainsEndPointsMap.get(domainId).getRight();
- IpAddress ipAddress;
- synchronized (this) {
- while (!endPointMasterIpMap.containsKey(endPoint) ||
- endPointMasterIpMap.get(endPoint).value() == null) {
- try {
- log.info("wait() master ip");
- wait();
- } catch (InterruptedException ie) {
- log.info("Interrupted exception: " + ie.getMessage());
- }
- }
- ipAddress = endPointMasterIpMap.get(endPoint).value();
- }
-
+ DomainEndPoint endPoint = domainsEndPointsMap.get(domainId).getRight();
Client client = domainsEndPointsMap.get(domainId).getLeft();
- String url = HTTP + COLON + DOUBLESLASH + ipAddress.toString() + COLON +
+ String url = HTTP + COLON + DOUBLESLASH + endPoint.publicIp().toString() + COLON +
endPoint.port() + BASE_URL + "/carrierethernet" + resource;
log.info("DEBUG {}: Sending data via http: {}", url, body);
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java
deleted file mode 100644
index 6ba8469..0000000
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/DomainMasterIpResource.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.global.channel.server;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.onlab.packet.IpAddress;
-import org.onosproject.rest.AbstractWebResource;
-import org.opencord.ce.api.services.channel.ConnectionService;
-import org.slf4j.Logger;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_API_NO_IP_BODY;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_IP;
-import static org.opencord.ce.api.services.channel.Symbols.OK;
-import static org.opencord.ce.api.services.channel.Symbols.RESULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Gets the ip of the instance leader of the communication with the specified
- * domainId.
- */
-@Path("master/{domainId}")
-public class DomainMasterIpResource extends AbstractWebResource {
- private final Logger log = getLogger(getClass());
-
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public Response getMasterIp(@PathParam("domainId") String id) {
- log.info("Domain {} asks who is the master for him", id);
-
- IpAddress ip = get(ConnectionService.class).getLocalMasterIp(id);
- if (ip != null) {
- ObjectNode body = mapper().createObjectNode();
- body.put(RESULT, OK);
- body.put(MASTER_IP, ip.toString());
-
- return ok(body.toString()).build();
- } else {
- return ok(MASTER_API_NO_IP_BODY).build();
- }
- }
-}
diff --git a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
index bd0691f..fd05d86 100644
--- a/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
+++ b/global/http-channel/src/main/java/org/opencord/ce/global/channel/server/EcordGlobalRestApp.java
@@ -27,6 +27,6 @@
@Override
public Set<Class<?>> getClasses() {
- return getClasses(DeviceResource.class, DomainMasterIpResource.class);
+ return getClasses(DeviceResource.class);
}
}
diff --git a/local/bigswitch/src/main/java/org/opencord/ce/local/bigswitch/BigSwitchManager.java b/local/bigswitch/src/main/java/org/opencord/ce/local/bigswitch/BigSwitchManager.java
index 6297ab7..ebe2179 100644
--- a/local/bigswitch/src/main/java/org/opencord/ce/local/bigswitch/BigSwitchManager.java
+++ b/local/bigswitch/src/main/java/org/opencord/ce/local/bigswitch/BigSwitchManager.java
@@ -27,6 +27,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
@@ -119,7 +120,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigService networkConfigService;
- @Property(name = DOMAIN_ID, value = DEFAULT_DOMAIN_ID, label = "Domain ID where this ONOS is running")
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService componentConfigService;
+
+ @Property(name = "domainId", value = DEFAULT_DOMAIN_ID, label = "Domain ID where this ONOS is running")
private String siteId = DEFAULT_DOMAIN_ID;
private static final String PROP_ENABLED = "enabled";
@@ -165,6 +169,7 @@
@Activate
public void activate() {
appId = coreService.registerApplication(APP_NAME);
+ componentConfigService.registerProperties(getClass());
configRegistry.registerConfigFactory(configFactory);
networkConfigService.addListener(configListener);
//deviceService = opticalView(deviceService);
@@ -193,6 +198,7 @@
configRegistry.unregisterConfigFactory(configFactory);
//edgePortService.removeListener(edgeListener);
deviceService.removeListener(deviceListener);
+ componentConfigService.unregisterProperties(getClass(), false);
log.info("Stopped");
}
@@ -226,6 +232,7 @@
// TODO: manage new site ID
siteId = newSiteId;
+ log.info("siteId {}", siteId);
}
@Override
@@ -258,6 +265,7 @@
@Override
public DomainId siteId() {
+ log.info("siteId {}", siteId);
return DomainId.domainId(siteId);
}
diff --git a/local/config-samples/co1-config.json b/local/config-samples/co1-config.json
index 1a1b169..222832d 100644
--- a/local/config-samples/co1-config.json
+++ b/local/config-samples/co1-config.json
@@ -16,9 +16,7 @@
},
"org.opencord.ce.local.channel.http" : {
"global" : {
- "clusterIps" : [
- "10.128.14.1"
- ],
+ "publicIp" : "10.128.14.1",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
diff --git a/local/config-samples/co1-withEE-config.json b/local/config-samples/co1-withEE-config.json
index 2109145..6c1ef4c 100644
--- a/local/config-samples/co1-withEE-config.json
+++ b/local/config-samples/co1-withEE-config.json
@@ -28,9 +28,7 @@
},
"org.opencord.ce.local.channel.http" : {
"global" : {
- "clusterIps" : [
- "10.128.14.1"
- ],
+ "publicIp" : "10.128.14.1",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
diff --git a/local/config-samples/co2-config.json b/local/config-samples/co2-config.json
index 4567139..2b6f525 100644
--- a/local/config-samples/co2-config.json
+++ b/local/config-samples/co2-config.json
@@ -16,9 +16,7 @@
},
"org.opencord.ce.local.channel.http" : {
"global" : {
- "clusterIps" : [
- "10.128.14.1"
- ],
+ "publicIp" : "10.128.14.1",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
diff --git a/local/config-samples/co2-withEE-config.json b/local/config-samples/co2-withEE-config.json
index 8cc3fb9..1140d7c 100644
--- a/local/config-samples/co2-withEE-config.json
+++ b/local/config-samples/co2-withEE-config.json
@@ -28,9 +28,7 @@
},
"org.opencord.ce.local.channel.http" : {
"global" : {
- "clusterIps" : [
- "10.128.14.1"
- ],
+ "publicIp" : "10.128.14.1",
"port" : "8181",
"username" : "sdn",
"password" : "rocks",
diff --git a/local/config-samples/co3-config.json b/local/config-samples/co3-config.json
deleted file mode 100644
index e2bcb4f..0000000
--- a/local/config-samples/co3-config.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "apps" : {
- "org.opencord.ce.local.bigswitch" : {
- "mefPorts" :
- [
- {
- "mefPortType" : "UNI",
- "connectPoint" : "netconf:10.0.0.30:830/0"
- },
- {
- "mefPortType" : "GENERIC",
- "connectPoint" : "netconf:10.0.0.30:830/1"
- }
- ]
- },
- "org.opencord.ce.local.channel.http" : {
- "global" : {
- "clusterIps" : [
- "10.128.14.1"
- ],
- "port" : "8181",
- "username" : "sdn",
- "password" : "rocks",
- "topic" : "ecord-domains-topic-one"
- }
- }
- }
-}
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/ConnectionConfig.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/ConnectionConfig.java
index a98365b..851c876 100644
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/ConnectionConfig.java
+++ b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/ConnectionConfig.java
@@ -21,7 +21,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.net.config.Config;
import org.onosproject.net.domain.DomainId;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import java.util.Set;
@@ -34,16 +34,14 @@
public class ConnectionConfig extends Config<ApplicationId> {
private static final String PORT = "port";
- private static final String DOMAIN_IPS = "clusterIps";
+ private static final String PUBLIC_IP = "publicIp";
private static final String USERNAME = "username";
private static final String PASSWD = "password";
private static final String TOPIC = "topic";
- public EndPoint global() {
+ public DomainEndPoint global() {
Set<IpAddress> ipAddresses = Sets.newHashSet();
- object.path(DOMAIN_IPS).forEach(ipAddress -> ipAddresses.add(
- IpAddress.valueOf(ipAddress.asText())
- ));
+ IpAddress publicIp = IpAddress.valueOf(object.path(PUBLIC_IP).asText());
int port = object.path(PORT).asInt();
String username = object.path(USERNAME).asText();
@@ -51,7 +49,7 @@
String topic = object.path(TOPIC).asText();
- return new EndPoint(DomainId.domainId("global"), ipAddresses, port,
+ return new DomainEndPoint(DomainId.domainId("global"), publicIp, port,
username, password, topic);
}
}
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/DomainMasterIpDiscoveryTask.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/DomainMasterIpDiscoveryTask.java
index e93625a..5a7d9bc 100644
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/DomainMasterIpDiscoveryTask.java
+++ b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/DomainMasterIpDiscoveryTask.java
@@ -23,7 +23,7 @@
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.IpAddress;
import org.onlab.util.Timer;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import org.slf4j.Logger;
import javax.ws.rs.client.Client;
@@ -53,13 +53,13 @@
private Timeout timeout;
private volatile boolean isStopped;
- private EndPoint endPoint;
+ private DomainEndPoint endPoint;
private Client client;
private String localSiteId;
private ObjectMapper mapper;
- public DomainMasterIpDiscoveryTask(EndPoint endPoint, Client client,
+ public DomainMasterIpDiscoveryTask(DomainEndPoint endPoint, Client client,
ObjectMapper mapper, String localSiteId) {
this.endPoint = endPoint;
this.client = client;
@@ -98,32 +98,26 @@
if (isStopped()) {
return;
}
- for (IpAddress ipAddress : endPoint.ipAddresses()) {
- String url = HTTP + COLON + DOUBLESLASH + ipAddress.toString() + COLON +
+ String url = HTTP + COLON + DOUBLESLASH + endPoint.publicIp() + COLON +
endPoint.port() + BASE_URL + "/global" + MASTER + "/" + localSiteId;
log.info("GET: " + url);
WebTarget wt = client.target(url);
Response response = wt.request(MediaType.APPLICATION_JSON)
.get();
log.info("DEBUG response: " + response.toString());
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-
- continue;
- }
- String stringBody = response.readEntity(String.class);
- try {
- ObjectNode responseBody = (ObjectNode) mapper.readTree(stringBody);
- if (responseBody.path(RESULT).asText().equals(OK)) {
- IpAddress masterIpAdress = IpAddress.valueOf(responseBody.path(MASTER_IP).asText());
- HttpClientInstance.INSTANCE.setMasterIp(endPoint, masterIpAdress);
- this.stop();
- return;
+ if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+ String stringBody = response.readEntity(String.class);
+ try {
+ ObjectNode responseBody = (ObjectNode) mapper.readTree(stringBody);
+ if (responseBody.path(RESULT).asText().equals(OK)) {
+ IpAddress masterIpAdress = IpAddress.valueOf(responseBody.path(MASTER_IP).asText());
+ this.stop();
+ return;
+ }
+ } catch (IOException ex) {
+ log.info("getLocalMasterIp() IOException, try next endpoint ip");
}
- } catch (IOException ex) {
- log.info("getLocalMasterIp() IOException, try next endpoint ip");
}
- }
-
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, 3, SECONDS);
}
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientComponent.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientComponent.java
index 99f0d3c..17cfa4e 100644
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientComponent.java
+++ b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientComponent.java
@@ -32,7 +32,7 @@
import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.PortDescription;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import org.opencord.ce.local.bigswitch.BigSwitchEvent;
import org.opencord.ce.local.bigswitch.BigSwitchListener;
import org.opencord.ce.local.bigswitch.BigSwitchService;
@@ -75,7 +75,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected BigSwitchService bigSwitchService;
- private EndPoint globalOnos;
+ private static DomainEndPoint globalOnos;
private final ExecutorService eventExecutor =
Executors.newSingleThreadExecutor();
@@ -107,12 +107,13 @@
@Deactivate
public void deactivate() {
log.info("Stopped");
-
leadershipService.withdraw(TOPIC_ONE);
bigSwitchService.removeListener(bigSwitchListener);
configService.removeListener(configListener);
configRegistry.unregisterConfigFactory(configFactory);
- HttpClientInstance.INSTANCE.stopNetworkTasks();
+ if (globalOnos != null) {
+ leadershipService.withdraw(globalOnos.topic());
+ }
}
private void readConfig() {
@@ -122,6 +123,7 @@
log.error("Configuration failure");
return;
}
+ leadershipService.runForLeadership(globalOnos.topic());
HttpClientInstance.INSTANCE.setGlobalOnos(globalOnos);
}
@@ -133,14 +135,14 @@
switch (event.type()) {
case DEVICE_CREATED:
log.info("DEBUG: DEV_CREATED event");
- HttpClientInstance.INSTANCE.notifyBigSwitch();
+ HttpClientInstance.INSTANCE.notifyBigSwitch(bigSwitchService.siteId());
case PORT_ADDED:
case PORT_UPDATED:
case PORT_REMOVED:
// the subject is port last updated / added port
// but we are not interested in it now
List<PortDescription> ports = event.allPorts();
- HttpClientInstance.INSTANCE.notifyBigSwitchPorts(ports);
+ HttpClientInstance.INSTANCE.notifyBigSwitchPorts(bigSwitchService.siteId(), ports);
break;
case DEVICE_REMOVED:
// TODO
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientInstance.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientInstance.java
index 0d831d9..d2e1079 100644
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientInstance.java
+++ b/local/http-channel/src/main/java/org/opencord/ce/local/channel/client/HttpClientInstance.java
@@ -19,7 +19,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.packet.ChassisId;
-import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.codec.JsonCodec;
@@ -29,9 +28,10 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.domain.DomainId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.rest.AbstractWebResource;
-import org.opencord.ce.api.services.channel.EndPoint;
+import org.opencord.ce.api.models.DomainEndPoint;
import org.opencord.ce.api.services.channel.RequestCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,9 +72,7 @@
private ClusterService clusterService;
private LeadershipService leadershipService;
- private EndPoint globalOnos;
- private IpAddress currentRemoteMasterIp;
- private DomainMasterIpDiscoveryTask globalMasterIpDiscoveryTask;
+ private DomainEndPoint globalOnos;
private final ExecutorService networkExecutor =
newSingleThreadExecutor();
@@ -84,25 +82,10 @@
this.leadershipService = leadershipService;
}
- protected void setGlobalOnos(EndPoint globalOnos) {
+ protected void setGlobalOnos(DomainEndPoint globalOnos) {
synchronized (MONITOR) {
this.globalOnos = globalOnos;
}
- // TODO: add leadership listeners to react to changes
- if (isLeader(globalOnos.topic())) {
- globalMasterIpDiscoveryTask =
- new DomainMasterIpDiscoveryTask(globalOnos, client, codecContext.mapper(),
- clusterService.getLocalNode().id().id());
- } else {
- log.info("I am NOT the leader for the communication with the global");
- }
- }
-
- protected void setMasterIp(EndPoint endPoint, IpAddress ipAddress) {
- synchronized (MONITOR) {
- currentRemoteMasterIp = ipAddress;
- MONITOR.notify();
- }
}
private boolean isLeader(String topic) {
@@ -128,13 +111,8 @@
}
}
- public void stopNetworkTasks() {
- globalMasterIpDiscoveryTask.stop();
- }
-
- public void notifyBigSwitch() {
- String siteId = clusterService.getLocalNode().id().id();
- String deviceId = "domain:" + siteId;
+ public void notifyBigSwitch(DomainId siteId) {
+ String deviceId = "domain:" + siteId.id();
String resource = "/" + siteId + "/" + deviceId;
ObjectNode body = codecContext.mapper().createObjectNode();
// body is empty for now
@@ -151,11 +129,10 @@
}));
}
- public void notifyBigSwitchPorts(List<PortDescription> ports) {
- String siteId = clusterService.getLocalNode().id().id();
+ public void notifyBigSwitchPorts(DomainId siteId, List<PortDescription> ports) {
JsonCodec<Port> portCodec = codecContext.codec(Port.class);
ArrayNode body = codecContext.mapper().createArrayNode();
- DeviceId deviceId = DeviceId.deviceId("domain:" + siteId);
+ DeviceId deviceId = DeviceId.deviceId("domain:" + siteId.id());
ports.forEach(portDescription ->
body.add(portCodec.encode(new DefaultPort(new DummyDevice(deviceId), portDescription.portNumber(),
portDescription.isEnabled(), portDescription.type(),
@@ -191,16 +168,19 @@
@Override
public void run() {
synchronized (MONITOR) {
- while (globalOnos == null || currentRemoteMasterIp == null) {
+ while (globalOnos == null) {
try {
- log.info("wait() global-ONOS endpoint");
+ log.info("wait() global ONOS endpoint");
MONITOR.wait();
} catch (InterruptedException ie) {
log.info("Interrupted exception: " + ie.getMessage());
}
}
}
- String url = HTTP + COLON + DOUBLESLASH + currentRemoteMasterIp + COLON +
+ if (!isLeader(globalOnos.topic())) {
+ return;
+ }
+ String url = HTTP + COLON + DOUBLESLASH + globalOnos.publicIp() + COLON +
globalOnos.port() + BASE_URL + "/global/topology" + resource;
log.info("Sending data via http: url: {}\n body: {}", url, body);
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/DomainMasterIpResource.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/DomainMasterIpResource.java
deleted file mode 100644
index 4f906c6..0000000
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/DomainMasterIpResource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.ce.local.channel.server;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_API_NO_IP_BODY;
-import static org.opencord.ce.api.services.channel.Symbols.MASTER_IP;
-import static org.opencord.ce.api.services.channel.Symbols.OK;
-import static org.opencord.ce.api.services.channel.Symbols.RESULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Gets the ip of the instance leader of the communication with the specified
- * domainId.
- */
-@Path("master")
-public class DomainMasterIpResource extends AbstractWebResource {
- private final Logger log = getLogger(getClass());
-
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public Response getMasterIp() {
- log.info("Global domain asks who is the master for him");
-
- //IpAddress ip = get(ConnectionService.class).getLocalMasterIp(id);
-
- // testing
- IpAddress ip = get(ClusterService.class).getLocalNode().ip();
- if (ip != null) {
- ObjectNode body = mapper().createObjectNode();
- body.put(RESULT, OK);
- body.put(MASTER_IP, ip.toString());
-
- return ok(body.toString()).build();
- } else {
- return ok(MASTER_API_NO_IP_BODY).build();
- }
- }
-}
diff --git a/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/EcordLocalRestApp.java b/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/EcordLocalRestApp.java
index 810fa2d..52fd88c 100644
--- a/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/EcordLocalRestApp.java
+++ b/local/http-channel/src/main/java/org/opencord/ce/local/channel/server/EcordLocalRestApp.java
@@ -28,6 +28,6 @@
@Override
public Set<Class<?>> getClasses() {
- return getClasses(MetroNetworkResource.class, DomainMasterIpResource.class);
+ return getClasses(MetroNetworkResource.class);
}
}