fixed topic search
diff --git a/apps/fpcagent/fpcagent.json b/apps/fpcagent/fpcagent.json
index 5b820df..a54290a 100644
--- a/apps/fpcagent/fpcagent.json
+++ b/apps/fpcagent/fpcagent.json
@@ -2,23 +2,9 @@
"apps": {
"org.onosproject.fpcagent": {
"fpcagent": {
- "monitor-threads": 4,
- "scheduled-monitors-poolsize": 4,
- "dpn-listener-uri": "tcp://127.0.0.1:5569",
- "dpn-listener-id": 1,
- "dpn-client-uri": "tcp://127.0.0.1:5559",
+ "dpn-subscriber-uri": "tcp://*:5560",
+ "dpn-publisher-uri": "tcp://*:5559",
"dpn-client-threads": 5,
- "metricsupdate-ms": 10000,
- "mobilityupdate-ms": 30000,
- "activation-threads": 5,
- "target-read-limit": 10,
- "http-notifier-clients": 3,
- "zmq-nbi-server-poolsize": 1,
- "zmq-nbi-server-uri": "tcp://127.0.0.1:5570",
- "zmq-nbi-inproc-uri": "inproc://backend",
- "zmq-nbi-handler-poolsize": 10,
- "http-nio2-nb-poolsize": 80,
- "http-nio2-nb-port": 9292,
"node-id": "node0",
"network-id": "network1",
"zmq-broadcast-all": "1",
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 1896cc4..9b4b915 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -36,7 +36,6 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configure.DefaultConfigureOutput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configuredpn.DefaultConfigureDpnInput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configuredpn.DefaultConfigureDpnOutput;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opheader.OpTypeEnum;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.CreateOrUpdate;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.DeleteOrQuery;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.payload.Contexts;
@@ -132,7 +131,7 @@
fpcConfig.getConfig().ifPresent(
helper -> {
ZMQSBSubscriberManager.createInstance(
- helper.dpnListenerUri(),
+ helper.dpnSubscriberUri(),
helper.zmqBroadcastAll(),
helper.zmqBroadcastControllers(),
helper.zmqBroadcastDpns(),
@@ -141,7 +140,7 @@
);
ZMQSBPublisherManager.createInstance(
- helper.dpnClientUri(),
+ helper.dpnPublisherUri(),
helper.dpnClientThreads()
);
@@ -197,23 +196,18 @@
DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
switch (input.opType()) {
case CREATE:
+ configureOutput = tenantService.configureCreate(
+ (CreateOrUpdate) input.opBody(),
+ input.clientId(),
+ input.opId()
+ );
+ break;
case UPDATE:
- if (input.opBody() instanceof CreateOrUpdate) {
- CreateOrUpdate createOrUpdate = (CreateOrUpdate) input.opBody();
- if (input.opType().equals(OpTypeEnum.CREATE)) {
- configureOutput = tenantService.configureCreate(
- createOrUpdate,
- input.clientId(),
- input.opId()
- );
- } else {
- configureOutput = tenantService.configureUpdate(
- createOrUpdate,
- input.clientId(),
- input.opId()
- );
- }
- }
+ configureOutput = tenantService.configureUpdate(
+ (CreateOrUpdate) input.opBody(),
+ input.clientId(),
+ input.opId()
+ );
break;
case QUERY:
break;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
index f197bdb..6acf2f8 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
@@ -197,10 +197,10 @@
}
for (Dpns dpn : context.dpns()) {
- if (!getDefaultTenant().map(
- tenant -> tenant.fpcTopology().dpns() != null &&
+ if (getDefaultTenant().map(
+ tenant -> tenant.fpcTopology().dpns() == null ||
tenant.fpcTopology().dpns().stream()
- .anyMatch(dpns -> dpns.dpnId().equals(dpn.dpnId()))
+ .noneMatch(dpns -> dpns.dpnId().equals(dpn.dpnId()))
).orElse(false)) {
throw new IllegalStateException("DPN ID is not registered to the topology.");
}
@@ -222,54 +222,70 @@
short ebi = context.ebi().uint8(),
lbi = context.lbi().uint8();
- Short dpnTopic = DpnApi.getTopicFromNode(dpn.dpnId().toString());
+ Optional<String> key = getDefaultTenant().flatMap(tenant ->
+ tenant.fpcTopology()
+ .dpns()
+ .stream()
+ .filter(dpns -> dpns.dpnId().equals(dpn.dpnId()))
+ .findFirst().map(node -> node.nodeId() + "/" + node.networkId())
+ );
- if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
- s1u_sgw_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
- } else {
- throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
- }
- if (context.dl().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
- s1u_enb_gtpu_teid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
- } else {
- throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
- }
+ if (key.isPresent()) {
+ Short dpnTopic = DpnApi.getTopicFromNode(key.get());
+
+ if (dpnTopic != null) {
+ if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
+ s1u_sgw_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
+ } else {
+ throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
+ }
+ if (context.dl().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
+ s1u_enb_gtpu_teid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
+ } else {
+ throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
+ }
- if (commands.contains("session")) {
- tasks.add(Executors.callable(() -> DpnApi.create_session(
- dpnTopic,
- imsi,
- Ip4Prefix.valueOf(context.delegatingIpPrefixes().get(0).toString()).address(),
- ebi,
- ulLocalAddress,
- s1u_sgw_gtpu_teid,
- cId,
- opId,
- contextId
- )));
+ if (commands.contains("session")) {
+ tasks.add(Executors.callable(() -> DpnApi.create_session(
+ dpnTopic,
+ imsi,
+ Ip4Prefix.valueOf(context.delegatingIpPrefixes().get(0).toString()).address(),
+ ebi,
+ ulLocalAddress,
+ s1u_sgw_gtpu_teid,
+ cId,
+ opId,
+ contextId
+ )));
- if (commands.contains("downlink")) {
- tasks.add(Executors.callable(() -> DpnApi.modify_bearer_dl(
- dpnTopic,
- s1u_sgw_gtpu_teid,
- dlRemoteAddress,
- s1u_enb_gtpu_teid,
- cId,
- opId
- )));
+ if (commands.contains("downlink")) {
+ tasks.add(Executors.callable(() -> DpnApi.modify_bearer_dl(
+ dpnTopic,
+ s1u_sgw_gtpu_teid,
+ dlRemoteAddress,
+ s1u_enb_gtpu_teid,
+ cId,
+ opId
+ )));
+ }
+ } else if (commands.contains("indirect-forward")) {
+ // TODO - Modify API for Indirect Forwarding to/from another SGW
+ } else if (commands.contains("uplink")) {
+ tasks.add(Executors.callable(() -> DpnApi.create_bearer_ul(
+ dpnTopic,
+ imsi,
+ lbi,
+ ebi,
+ ulLocalAddress,
+ s1u_sgw_gtpu_teid
+ )));
+ }
+ } else {
+ throw new IllegalArgumentException("Could not find Topic ID");
}
- } else if (commands.contains("indirect-forward")) {
- // TODO - Modify API for Indirect Forwarding to/from another SGW
- } else if (commands.contains("uplink")) {
- tasks.add(Executors.callable(() -> DpnApi.create_bearer_ul(
- dpnTopic,
- imsi,
- lbi,
- ebi,
- ulLocalAddress,
- s1u_sgw_gtpu_teid
- )));
+ } else {
+ throw new IllegalArgumentException("DPN does not have node and network ID defined.");
}
}
}
@@ -295,6 +311,7 @@
)
);
} catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
DefaultErr defaultErr = new DefaultErr();
configureOutput.resultType(defaultErr);
defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
@@ -318,19 +335,21 @@
defaultCommonSuccess.addToContexts(context);
if (getDefaultTenant().map(
- tenant -> tenant.fpcMobility()
- .contexts()
- .parallelStream()
- .anyMatch(contexts -> contexts.contextId().equals(context.contextId()))
+ tenant -> tenant.fpcMobility().contexts() == null ||
+ tenant.fpcMobility()
+ .contexts()
+ .parallelStream()
+ .noneMatch(contexts -> contexts.contextId().equals(context.contextId()))
).orElse(false)) {
- throw new IllegalStateException("Context already exists.");
+ throw new IllegalStateException("Context doesn't exist.");
}
for (Dpns dpn : context.dpns()) {
- if (!getDefaultTenant().map(
- tenant -> tenant.fpcTopology().dpns()
- .stream()
- .anyMatch(dpns -> dpns.dpnId().equals(dpn.dpnId()))
+ if (getDefaultTenant().map(
+ tenant -> tenant.fpcTopology().dpns() == null ||
+ tenant.fpcTopology().dpns()
+ .stream()
+ .noneMatch(dpns -> dpns.dpnId().equals(dpn.dpnId()))
).orElse(false)) {
throw new IllegalStateException("DPN ID is not registered to the topology.");
}
@@ -349,61 +368,75 @@
BigInteger opId = operationId.uint64();
+ Optional<String> key = getDefaultTenant().flatMap(tenant ->
+ tenant.fpcTopology()
+ .dpns()
+ .stream()
+ .filter(dpns -> dpns.dpnId().equals(dpn.dpnId()))
+ .findFirst().map(node -> node.nodeId() + "/" + node.networkId())
+ );
- Short dpnTopic = DpnApi.getTopicFromNode(dpn.dpnId().toString());
+ if (key.isPresent()) {
+ Short dpnTopic = DpnApi.getTopicFromNode(key.get());
- if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
- s1u_sgw_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
- } else {
- throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
- }
- if (context.dl().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
- s1u_enb_gtpu_teid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
- } else {
- throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
- }
+ if (dpnTopic != null) {
+ if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
+ s1u_sgw_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
+ } else {
+ throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
+ }
+ if (context.dl().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
+ s1u_enb_gtpu_teid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
+ } else {
+ throw new IllegalArgumentException("mobprofileParameters are not instance of ThreegppTunnel");
+ }
-
- if (commands.contains("downlink")) {
- if (context.dl().lifetime() >= 0L) {
- tasks.add(Executors.callable(() ->
- DpnApi.modify_bearer_dl(
- dpnTopic,
- dlRemoteAddress,
- s1u_enb_gtpu_teid,
- dlLocalAddress,
- cId,
- opId,
- contextId
- )
- ));
+ if (commands.contains("downlink")) {
+ if (context.dl().lifetime() >= 0L) {
+ tasks.add(Executors.callable(() ->
+ DpnApi.modify_bearer_dl(
+ dpnTopic,
+ dlRemoteAddress,
+ s1u_enb_gtpu_teid,
+ dlLocalAddress,
+ cId,
+ opId,
+ contextId
+ )
+ ));
+ } else {
+ tasks.add(Executors.callable(() ->
+ DpnApi.delete_bearer(
+ dpnTopic,
+ s1u_enb_gtpu_teid
+ )
+ ));
+ }
+ }
+ if (commands.contains("uplink")) {
+ if (context.ul().lifetime() >= 0L) {
+ tasks.add(Executors.callable(() ->
+ DpnApi.modify_bearer_ul(
+ dpnTopic,
+ ulLocalAddress,
+ s1u_enb_gtpu_teid,
+ s1u_sgw_gtpu_teid
+ )
+ ));
+ } else {
+ tasks.add(Executors.callable(() ->
+ DpnApi.delete_bearer(
+ dpnTopic,
+ s1u_sgw_gtpu_teid
+ )
+ ));
+ }
+ }
} else {
- tasks.add(Executors.callable(() ->
- DpnApi.delete_bearer(
- dpnTopic,
- s1u_enb_gtpu_teid
- )
- ));
+ throw new IllegalArgumentException("Could not find Topic ID");
}
- }
- if (commands.contains("uplink")) {
- if (context.ul().lifetime() >= 0L) {
- tasks.add(Executors.callable(() ->
- DpnApi.modify_bearer_ul(
- dpnTopic,
- ulLocalAddress,
- s1u_enb_gtpu_teid,
- s1u_sgw_gtpu_teid
- )
- ));
- } else {
- tasks.add(Executors.callable(() ->
- DpnApi.delete_bearer(
- dpnTopic,
- s1u_sgw_gtpu_teid
- )
- ));
- }
+ } else {
+ throw new IllegalArgumentException("DPN does not have node and network ID defined.");
}
}
}
@@ -429,6 +462,7 @@
)
);
} catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
DefaultErr defaultErr = new DefaultErr();
defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
defaultErr.errorTypeId(ErrorTypeId.of(0));
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/ConfigHelper.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/ConfigHelper.java
index bee7cce..5a91d52 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/ConfigHelper.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/ConfigHelper.java
@@ -9,9 +9,9 @@
@JsonPropertyOrder({
"monitor-threads",
"scheduled-monitors-poolsize",
- "dpn-listener-uri",
+ "dpn-subscriber-uri",
"dpn-listener-id",
- "dpn-client-uri",
+ "dpn-publisher-uri",
"dpn-client-threads",
"metricsupdate-ms",
"mobilityupdate-ms",
@@ -31,17 +31,16 @@
"zmq-broadcast-all"
})
public class ConfigHelper {
-
@JsonProperty("monitor-threads")
private Integer monitorThreads;
@JsonProperty("scheduled-monitors-poolsize")
private Integer scheduledMonitorsPoolsize;
- @JsonProperty("dpn-listener-uri")
- private String dpnListenerUri;
+ @JsonProperty("dpn-subscriber-uri")
+ private String dpnSubscriberUri;
@JsonProperty("dpn-listener-id")
private Integer dpnListenerId;
- @JsonProperty("dpn-client-uri")
- private String dpnClientUri;
+ @JsonProperty("dpn-publisher-uri")
+ private String dpnPublisherUri;
@JsonProperty("dpn-client-threads")
private Integer dpnClientThreads;
@JsonProperty("metricsupdate-ms")
@@ -85,222 +84,220 @@
public ConfigHelper() {
}
- @JsonProperty("monitor-threads")
+
public Integer monitorThreads() {
return monitorThreads;
}
- @JsonProperty("monitor-threads")
+
public void setMonitorThreads(Integer monitorThreads) {
this.monitorThreads = monitorThreads;
}
- @JsonProperty("scheduled-monitors-poolsize")
+
public Integer scheduledMonitorsPoolsize() {
return scheduledMonitorsPoolsize;
}
- @JsonProperty("scheduled-monitors-poolsize")
+
public void setScheduledMonitorsPoolsize(Integer scheduledMonitorsPoolsize) {
this.scheduledMonitorsPoolsize = scheduledMonitorsPoolsize;
}
- @JsonProperty("dpn-listener-uri")
- public String dpnListenerUri() {
- return dpnListenerUri;
+
+ public String dpnSubscriberUri() {
+ return dpnSubscriberUri;
}
- @JsonProperty("dpn-listener-uri")
- public void setDpnListenerUri(String dpnListenerUri) {
- this.dpnListenerUri = dpnListenerUri;
+
+ public void setDpnSubscriberUri(String dpnSubscriberUri) {
+ this.dpnSubscriberUri = dpnSubscriberUri;
}
- @JsonProperty("dpn-listener-id")
+
public Integer dpnListenerId() {
return dpnListenerId;
}
- @JsonProperty("dpn-listener-id")
+
public void setDpnListenerId(Integer dpnListenerId) {
this.dpnListenerId = dpnListenerId;
}
- @JsonProperty("dpn-client-uri")
- public String dpnClientUri() {
- return dpnClientUri;
+
+ public String dpnPublisherUri() {
+ return dpnPublisherUri;
}
- @JsonProperty("dpn-client-uri")
- public void setDpnClientUri(String dpnClientUri) {
- this.dpnClientUri = dpnClientUri;
+ public void setDpnPublisherUri(String dpnPublisherUri) {
+ this.dpnPublisherUri = dpnPublisherUri;
}
- @JsonProperty("dpn-client-threads")
+
public Integer dpnClientThreads() {
return dpnClientThreads;
}
- @JsonProperty("dpn-client-threads")
+
public void setDpnClientThreads(Integer dpnClientThreads) {
this.dpnClientThreads = dpnClientThreads;
}
- @JsonProperty("metricsupdate-ms")
+
public Integer metricUpdateMs() {
return metricsupdateMs;
}
- @JsonProperty("metricsupdate-ms")
+
public void setMetricsupdateMs(Integer metricsupdateMs) {
this.metricsupdateMs = metricsupdateMs;
}
- @JsonProperty("mobilityupdate-ms")
+
public Integer mobilityUpdateMs() {
return mobilityupdateMs;
}
- @JsonProperty("mobilityupdate-ms")
+
public void setMobilityupdateMs(Integer mobilityupdateMs) {
this.mobilityupdateMs = mobilityupdateMs;
}
- @JsonProperty("activation-threads")
+
public Integer activationThreads() {
return activationThreads;
}
- @JsonProperty("activation-threads")
+
public void setActivationThreads(Integer activationThreads) {
this.activationThreads = activationThreads;
}
- @JsonProperty("target-read-limit")
+
public Integer targetReadLimit() {
return targetReadLimit;
}
- @JsonProperty("target-read-limit")
+
public void setTargetReadLimit(Integer targetReadLimit) {
this.targetReadLimit = targetReadLimit;
}
- @JsonProperty("http-notifier-clients")
+
public Integer httpNotifierClients() {
return httpNotifierClients;
}
- @JsonProperty("http-notifier-clients")
+
public void setHttpNotifierClients(Integer httpNotifierClients) {
this.httpNotifierClients = httpNotifierClients;
}
- @JsonProperty("zmq-nbi-server-poolsize")
+
public Integer zmqServerPoolsize() {
return zmqNbiServerPoolsize;
}
- @JsonProperty("zmq-nbi-server-poolsize")
+
public void setZmqNbiServerPoolsize(Integer zmqNbiServerPoolsize) {
this.zmqNbiServerPoolsize = zmqNbiServerPoolsize;
}
- @JsonProperty("zmq-nbi-server-uri")
+
public String zmqServerUri() {
return zmqNbiServerUri;
}
- @JsonProperty("zmq-nbi-server-uri")
+
public void setZmqNbiServerUri(String zmqNbiServerUri) {
this.zmqNbiServerUri = zmqNbiServerUri;
}
- @JsonProperty("zmq-nbi-inproc-uri")
+
public String zmqInprocUri() {
return zmqNbiInprocUri;
}
- @JsonProperty("zmq-nbi-inproc-uri")
+
public void setZmqNbiInprocUri(String zmqNbiInprocUri) {
this.zmqNbiInprocUri = zmqNbiInprocUri;
}
- @JsonProperty("zmq-nbi-handler-poolsize")
+
public Integer zmqHandlerPoolsize() {
return zmqNbiHandlerPoolsize;
}
- @JsonProperty("zmq-nbi-handler-poolsize")
+
public void setZmqNbiHandlerPoolsize(Integer zmqNbiHandlerPoolsize) {
this.zmqNbiHandlerPoolsize = zmqNbiHandlerPoolsize;
}
- @JsonProperty("http-nio2-nb-poolsize")
+
public Integer httpNio2Poolsize() {
return httpNio2NbPoolsize;
}
- @JsonProperty("http-nio2-nb-poolsize")
+
public void setHttpNio2NbPoolsize(Integer httpNio2NbPoolsize) {
this.httpNio2NbPoolsize = httpNio2NbPoolsize;
}
- @JsonProperty("http-nio2-nb-port")
+
public Integer httpNio2Port() {
return httpNio2NbPort;
}
- @JsonProperty("http-nio2-nb-port")
+
public void setHttpNio2NbPort(Integer httpNio2NbPort) {
this.httpNio2NbPort = httpNio2NbPort;
}
- @JsonProperty("node-id")
+
public String nodeId() {
return nodeId;
}
- @JsonProperty("node-id")
+
public void setNodeId(String node_id) {
this.nodeId = node_id;
}
- @JsonProperty("network-id")
+
public String networkId() {
return networkId;
}
- @JsonProperty("network-id")
+
public void setNetworkId(String network_id) {
this.networkId = network_id;
}
- @JsonProperty("zmq-broadcast-controllers")
+
public String zmqBroadcastControllers() {
return zmqBroadcastControllers;
}
- @JsonProperty("zmq-broadcast-controllers")
+
public void setZmqBroadcastControllers(String zmqBroadcastControllers) {
this.zmqBroadcastControllers = zmqBroadcastControllers;
}
- @JsonProperty("zmq-broadcast-dpns")
public String zmqBroadcastDpns() {
return zmqBroadcastDpns;
}
- @JsonProperty("zmq-broadcast-dpns")
+
public void setZmqBroadcastDpns(String zmqBroadcastDpns) {
this.zmqBroadcastDpns = zmqBroadcastDpns;
}
- @JsonProperty("zmq-broadcast-all")
+
public String zmqBroadcastAll() {
return zmqBroadcastAll;
}
- @JsonProperty("zmq-broadcast-all")
+
public void setZmqBroadcastAll(String zmqBroadcastAll) {
this.zmqBroadcastAll = zmqBroadcastAll;
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java
index 43d4759..b9ca75c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java
@@ -30,7 +30,7 @@
public class DpnApi {
protected static final Logger log = LoggerFactory.getLogger(DpnApi.class);
private static final Map<String, FpcDpnId> uplinkDpnMap;
- private static final Map<String, Short> topicToNodeMap;
+ private static final Map<String, Short> nodeToTopicMap;
/**
* Topic for broadcasting
*/
@@ -56,7 +56,7 @@
static {
uplinkDpnMap = Maps.newConcurrentMap();
- topicToNodeMap = Maps.newConcurrentMap();
+ nodeToTopicMap = Maps.newConcurrentMap();
}
/**
@@ -409,11 +409,11 @@
} else if (buf[3] == DPN_HELLO) {
status = DPNStatusIndication.Status.HELLO;
log.info("Hello {} on topic {}", key, buf[2]);
- topicToNodeMap.put(key, (short) buf[2]);
+ nodeToTopicMap.put(key, (short) buf[2]);
} else if (buf[3] == DPN_BYE) {
status = DPNStatusIndication.Status.BYE;
log.info("Bye {}", key);
- topicToNodeMap.remove(key);
+ nodeToTopicMap.remove(key);
}
return new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), new DPNStatusIndication(status, key));
}
@@ -427,9 +427,8 @@
* @return - ZMQ Topic
*/
public static Short getTopicFromNode(String Key) {
- if (Key == null) return 1;
- Short aShort = topicToNodeMap.get(Key);
- return aShort != null ? aShort : (short) 1;
+ Short aShort = nodeToTopicMap.get(Key);
+ return aShort;
}
/**
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java
index ec7321e..541d7f5 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java
@@ -50,6 +50,7 @@
executorService.submit(() -> {
ZMQ.Socket socket = ctx.createSocket(ZMQ.PUB);
socket.connect(address);
+ log.debug("Publisher at {}", address);
while ((!Thread.currentThread().isInterrupted()) && run) {
try {
byte[] array = blockingQueue.take().array();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index 09617c0..3b3e491 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -84,10 +84,14 @@
public void open() {
short subscriberId = (short) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
- broadcastAllWorker = Executors.newSingleThreadExecutor().submit(new ZMQSubscriberWorker(broadcastAllId));
+ broadcastAllWorker = Executors.newSingleThreadExecutor()
+ .submit(new ZMQSubscriberWorker(broadcastAllId));
+
broadcastControllersWorker = Executors.newSingleThreadExecutor()
.submit(new ZMQSubscriberWorker(broadcastControllersId));
- broadcastTopicWorker = Executors.newSingleThreadExecutor().submit(new BroadcastTopic(subscriberId));
+
+ broadcastTopicWorker = Executors.newSingleThreadExecutor()
+ .submit(new BroadcastTopic(subscriberId));
}
@Override
@@ -191,11 +195,12 @@
ZMQ.Socket subscriber = this.ctx.createSocket(ZMQ.SUB);
subscriber.connect(address);
subscriber.subscribe(new byte[]{toUint8(subscriberId)});
+ log.debug("Subscriber at {} / {}", address, subscriberId);
while ((!Thread.currentThread().isInterrupted()) && run) {
byte[] contents = subscriber.recv();
byte topic = contents[0];
byte messageType = contents[1];
- log.info("Received {}", contents);
+ log.debug("Received {}", contents);
switch (topic) {
case 1:
if (messageType == ASSIGN_CONFLICT && toInt(contents, 3) != controllerSourceId) {
@@ -276,6 +281,7 @@
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
+
log.info("sendHelloToDpns: {}", bb.array());
ZMQSBPublisherManager.getInstance().send(bb);
}
diff --git a/scripts/addDPN.sh b/scripts/addDPN.sh
index 9a07776..9c77d37 100755
--- a/scripts/addDPN.sh
+++ b/scripts/addDPN.sh
@@ -1,3 +1,4 @@
+#!/bin/bash
curl -i --header "Content-type: application/json" --request POST -u onos:rocks --data '{
"dpns": [
{
@@ -11,3 +12,4 @@
}
]
}' 'http://localhost:8181/onos/restconf/data/ietf-dmm-fpcagent:tenants/tenant=default/fpc-topology'
+./getTenants.sh
\ No newline at end of file
diff --git a/scripts/createRPC.sh b/scripts/createRPC.sh
index e920070..621f6df 100755
--- a/scripts/createRPC.sh
+++ b/scripts/createRPC.sh
@@ -5,44 +5,6 @@
"client-id": "1",
"contexts": [
{
- "context-id": 202374886,
- "delegating-ip-prefixes": [
- "192.168.1.5/32"
- ],
- "dl": {
- "dpn-parameters": {},
- "mobility-tunnel-parameters": {
- "tunnel-identifier": "2222",
- "tunnel-type": "gtpv1"
- },
- "tunnel-local-address": "192.168.1.1",
- "tunnel-remote-address": "10.1.1.1"
- },
- "dpn-group": "foo",
- "dpns": [
- {
- "direction": "uplink",
- "dpn-id": "1",
- "dpn-parameters": {}
- }
- ],
- "ebi": "5",
- "imsi": "9135551234",
- "instructions": {
- "instr-3gpp-mob": "session uplink"
- },
- "lbi": "5",
- "ul": {
- "dpn-parameters": {},
- "mobility-tunnel-parameters": {
- "tunnel-identifier": "1111",
- "tunnel-type": "gtpv1"
- },
- "tunnel-local-address": "192.168.1.1",
- "tunnel-remote-address": "10.1.1.1"
- }
- },
- {
"context-id": 202374887,
"delegating-ip-prefixes": [
"192.168.1.5/32"
diff --git a/scripts/forwarder_subscriber_with_ACK.py b/scripts/forwarder_subscriber_with_ACK.py
index e3826a5..fbc5088 100755
--- a/scripts/forwarder_subscriber_with_ACK.py
+++ b/scripts/forwarder_subscriber_with_ACK.py
@@ -32,15 +32,7 @@
def signal_handler(signal, frame):
print "\nExiting... Sending DPN Status Indication message with Status = GOODBYE"
pub_socket.send("%s" % (struct.pack("!BBBBIB",2,12,topicId,2,source,len(nodeId)) + nodeId + struct.pack('!B',len(networkId)) + networkId))
- count = 0
- while True:
- time.sleep(1)
- sys.stdout.write("\r"+str(5-count)+" ")
- sys.stdout.flush()
- count += 1
- if(count > 5):
- print "\n"
- sys.exit(0)
+ sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
@@ -70,18 +62,19 @@
print "Ready to receive messages. Press Ctrl+C when ready to exit."
-rec_port = "5560"
-send_port = "5559"
+subscriber_uri = "tcp://localhost:5560"
+publisher_uri = "tcp://localhost:5559"
# Socket to talk to server
context = zmq.Context()
-socket = context.socket(zmq.SUB)
+sub_socket = context.socket(zmq.SUB)
pub_socket = context.socket(zmq.PUB)
-socket.connect ("tcp://localhost:%s" % rec_port)
-pub_socket.connect("tcp://localhost:%s" % send_port)
+sub_socket.connect(subscriber_uri)
+pub_socket.connect(publisher_uri)
topicfilter = ""
controller_topic= 252
-socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
-print "Listening to port ", rec_port
+sub_socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
+print "Subscriber to ", subscriber_uri
+print "Publisher to ", publisher_uri
print "DPN Lifecycle start up . . . Please wait."
async_result = pool.apply_async(sendAssignId,(pub_socket,))
@@ -94,12 +87,14 @@
msgnum5count = 0
msgnum6count = 0
for update_nbr in range(900000):
- string = socket.recv()
+ string = sub_socket.recv()
ts = time.time()
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
topic,msgnum = struct.unpack('!BB',string[:2])
+ print 'received %s %s' % (topic, msgnum)
+
if topic == 1 and msgnum == 10: #Assign_Id
top,msg,topId,src,nodeIdLen = struct.unpack('!BBBIB',string[:8])
top,msg,topId,src,nodeIdLen,nodeId1,networkIdLen = struct.unpack('!BBBIB'+str(nodeIdLen)+'sB',string[:8+nodeIdLen+1])
@@ -250,4 +245,4 @@
print '================'
print 'Total = ', count, 'msgnum1 count', msgnum1count, 'msgnum2 count', msgnum2count, 'msgnum3 count', msgnum3count, 'msgnum4 count', msgnum4count,'msgnum5 count', msgnum5count, 'msgnum6 count', msgnum6count
-socket.close()
+sub_socket.close()
diff --git a/scripts/updateRPC.sh b/scripts/updateRPC.sh
new file mode 100755
index 0000000..de934bf
--- /dev/null
+++ b/scripts/updateRPC.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+curl -X POST --header 'Content-Type: application/json' -u onos:rocks --header 'Accept: application/json' -d '{
+ "input": {
+ "admin-state": "enabled",
+ "client-id": "1",
+ "contexts": [
+ {
+ "context-id": 202374887,
+ "delegating-ip-prefixes": [
+ "192.168.1.5/32"
+ ],
+ "dl": {
+ "dpn-parameters": {},
+ "mobility-tunnel-parameters": {
+ "tunnel-identifier": "2222",
+ "tunnel-type": "gtpv1"
+ },
+ "tunnel-local-address": "192.168.1.1",
+ "tunnel-remote-address": "10.1.1.1"
+ },
+ "dpn-group": "foo",
+ "dpns": [
+ {
+ "direction": "uplink",
+ "dpn-id": "1",
+ "dpn-parameters": {}
+ }
+ ],
+ "ebi": "5",
+ "imsi": "9135551234",
+ "instructions": {
+ "instr-3gpp-mob": "session uplink"
+ },
+ "lbi": "5",
+ "ul": {
+ "dpn-parameters": {},
+ "mobility-tunnel-parameters": {
+ "tunnel-identifier": "1111",
+ "tunnel-type": "gtpv1"
+ },
+ "tunnel-local-address": "192.168.1.1",
+ "tunnel-remote-address": "10.1.1.1"
+ }
+ }
+ ],
+ "op-id": "1",
+ "op-ref-scope": "op",
+ "op-type": "update",
+ "session-state": "complete"
+ }
+}' 'http://localhost:8181/onos/restconf/operations/ietf-dmm-fpcagent:configure' | python -m json.tool