fixed checkstyle, added CP notifications for P4
diff --git a/apps/fpcagent/BUCK b/apps/fpcagent/BUCK
index ef4f0b0..5ab31d7 100644
--- a/apps/fpcagent/BUCK
+++ b/apps/fpcagent/BUCK
@@ -13,12 +13,12 @@
'//apps/restconf/api:onos-apps-restconf-api',
'//models/fpcagent:onos-models-fpcagent',
'//models/common:onos-models-common',
- '//lib:jetty-servlet',
- '//lib:jetty-server',
- '//lib:jetty-io',
- '//lib:jetty-http',
- '//lib:jetty-util',
- '//lib:javax.servlet-api',
+ ':jetty-servlet-custom',
+ ':jetty-server-custom',
+ ':jetty-io-custom',
+ ':jetty-http-custom',
+ ':jetty-util-custom',
+ ':javax.servlet-api-custom',
':zeromq',
':json',
]
@@ -31,12 +31,12 @@
'//lib:httpclient-osgi',
'//lib:httpcore-osgi',
'//lib:org.apache.httpcomponents.httpasyncclient-osgi',
- '//lib:jetty-servlet',
- '//lib:jetty-server',
- '//lib:jetty-io',
- '//lib:jetty-http',
- '//lib:jetty-util',
- '//lib:javax.servlet-api',
+ ':jetty-servlet-custom',
+ ':jetty-server-custom',
+ ':jetty-io-custom',
+ ':jetty-http-custom',
+ ':jetty-util-custom',
+ ':javax.servlet-api-custom',
]
EXCLUDED_BUNDLES = [
@@ -87,3 +87,56 @@
visibility = [ 'PUBLIC' ],
)
+remote_jar (
+ name = 'javax.servlet-api-custom',
+ out = 'javax.servlet-api-3.0.1.jar',
+ url = 'mvn:javax.servlet:javax.servlet-api:jar:3.0.1',
+ sha1 = '6bf0ebb7efd993e222fc1112377b5e92a13b38dd',
+ maven_coords = 'javax.servlet:javax.servlet-api:3.0.1',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-util-custom',
+ out = 'jetty-util-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-util:jar:8.1.19.v20160209',
+ sha1 = 'c88071e72998e6355d719d35ff001d9c327a971a',
+ maven_coords = 'org.eclipse.jetty:jetty-util:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-server-custom',
+ out = 'jetty-server-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-server:jar:8.1.19.v20160209',
+ sha1 = 'e4100696c994e26148fd0b62c8a866a606be1540',
+ maven_coords = 'org.eclipse.jetty:jetty-server:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-servlet-custom',
+ out = 'jetty-servlet-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-servlet:jar:8.1.19.v20160209',
+ sha1 = '6872c3fc289de8f26a43b101741b33af36590cb4',
+ maven_coords = 'org.eclipse.jetty:jetty-servlet:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-http-custom',
+ out = 'jetty-http-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-http:jar:8.1.19.v20160209',
+ sha1 = 'f08c189eab9a45810644ec35440f1a0c8aa0c4e0',
+ maven_coords = 'org.eclipse.jetty:jetty-http:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-io-custom',
+ out = 'jetty-io-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-io:jar:8.1.19.v20160209',
+ sha1 = 'c35e77e419169b4c8ad5fa3429865f2c8541e972',
+ maven_coords = 'org.eclipse.jetty:jetty-io:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcConfig.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcConfig.java
index a47d4a9..1478421 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcConfig.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcConfig.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent;
import com.fasterxml.jackson.core.JsonProcessingException;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 98aa2ad..f576930 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -17,7 +17,12 @@
package org.onosproject.fpcagent;
import com.google.common.collect.Sets;
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onosproject.config.DynamicConfigEvent;
import org.onosproject.config.DynamicConfigListener;
import org.onosproject.config.DynamicConfigService;
@@ -26,15 +31,19 @@
import org.onosproject.core.IdGenerator;
import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
import org.onosproject.fpcagent.protocols.DpnP4Communicator;
-import org.onosproject.fpcagent.providers.DpnDeviceListener;
-import org.onosproject.fpcagent.providers.DpnProviderService;
+import org.onosproject.fpcagent.providers.ZmqDpnDeviceListener;
+import org.onosproject.fpcagent.providers.ZmqDpnProviderService;
import org.onosproject.fpcagent.util.ConfigHelper;
-import org.onosproject.fpcagent.util.eventStream.JettyServer;
-import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
-import org.onosproject.fpcagent.util.eventStream.ParseStream;
-import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
-import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
-import org.onosproject.net.config.*;
+import org.onosproject.fpcagent.util.eventstream.JettyServer;
+import org.onosproject.fpcagent.util.eventstream.NbEventWorkerManager;
+import org.onosproject.fpcagent.util.eventstream.ParseStream;
+import org.onosproject.fpcagent.workers.ZmqSbPublisherManager;
+import org.onosproject.fpcagent.workers.ZmqSbSubscriberManager;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.restconf.api.RestconfService;
@@ -87,7 +96,7 @@
private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private DpnProviderService dpnProviderService;
+ private ZmqDpnProviderService zmqDpnProviderService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private RestconfService restconfService;
@@ -99,7 +108,7 @@
private IdGenerator notificationIds;
private FpcConfig fpcConfig;
private boolean started = false;
- private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
+ private HashSet<ZmqDpnDeviceListener> listeners = Sets.newHashSet();
/* Config */
private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -122,7 +131,7 @@
JettyServer.createInstance().open();
ParseStream.createInstance().open();
- NBEventWorkerManager.createInstance(20, restconfService).open();
+ NbEventWorkerManager.createInstance(20, restconfService).open();
DpnNgicCommunicator.createInstance();
DpnP4Communicator.createInstance(applicationId, flowRuleService);
@@ -136,12 +145,12 @@
registry.unregisterConfigFactory(fpcConfigConfigFactory);
if (started) {
- ZMQSBSubscriberManager.getInstance().close();
- ZMQSBPublisherManager.getInstance().close();
+ ZmqSbSubscriberManager.getInstance().close();
+ ZmqSbPublisherManager.getInstance().close();
started = false;
}
- NBEventWorkerManager.getInstance().close();
+ NbEventWorkerManager.getInstance().close();
ParseStream.getInstance().close();
JettyServer.getInstance().close();
@@ -155,32 +164,32 @@
fpcConfig.getConfig().ifPresent(
helper -> {
started = true;
- ZMQSBSubscriberManager.createInstance(
+ ZmqSbSubscriberManager.createInstance(
helper.dpnSubscriberUri(),
helper.nodeId(),
helper.networkId(),
- dpnProviderService.getListener(),
+ zmqDpnProviderService.getListener(),
notificationIds
);
- ZMQSBPublisherManager.createInstance(
+ ZmqSbPublisherManager.createInstance(
helper.dpnPublisherUri(),
helper.dpnClientThreads()
);
- ZMQSBPublisherManager.getInstance().open();
- ZMQSBSubscriberManager.getInstance().open();
+ ZmqSbPublisherManager.getInstance().open();
+ ZmqSbSubscriberManager.getInstance().open();
}
);
}
@Override
- public void addListener(DpnDeviceListener listener) {
+ public void addListener(ZmqDpnDeviceListener listener) {
listeners.add(listener);
}
@Override
- public void removeListener(DpnDeviceListener listener) {
+ public void removeListener(ZmqDpnDeviceListener listener) {
listeners.remove(listener);
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index 2eb0b6a..f8b80fd 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -19,7 +19,12 @@
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onosproject.config.DynamicConfigService;
@@ -45,7 +50,14 @@
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientOutput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.IetfDmmFpcagentService;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.*;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultFpcAgentInfo;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ErrorTypeId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.Result;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configure.DefaultConfigureInput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configure.DefaultConfigureOutput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.DefaultConfigureBundlesInput;
@@ -82,7 +94,15 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.mobilityinfo.mobprofileparameters.ThreegppTunnel;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.targetsvalue.Targets;
import org.onosproject.yang.gen.v1.ietfdmmthreegpp.rev20160803.ietfdmmthreegpp.threegppinstr.Bits;
-import org.onosproject.yang.model.*;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.ModelConverter;
+import org.onosproject.yang.model.ModelObject;
+import org.onosproject.yang.model.ModelObjectId;
+import org.onosproject.yang.model.ResourceData;
+import org.onosproject.yang.model.ResourceId;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
+import org.onosproject.yang.model.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,16 +113,20 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.onosproject.fpcagent.util.Converter.convertContext;
import static org.onosproject.fpcagent.util.FpcUtil.*;
-//import org.onosproject.fpcagent.workers.HTTPNotifier;
-
@Component(immediate = true)
@Service
-public class FpcRpcManager implements FpcRpcService, IetfDmmFpcagentService, org.onosproject.yang.gen.v1.fpc.rev20150105.FpcService {
+public class FpcRpcManager implements FpcRpcService,
+ IetfDmmFpcagentService,
+ org.onosproject.yang.gen.v1.fpc.rev20150105.FpcService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -147,8 +171,8 @@
protected void deactivate() {
deviceService.removeListener(listener);
rpcRegistry.unregisterRpcService(this);
- clientInfo.clear();
- tenantInfo.clear();
+ CLIENT_INFO.clear();
+ TENANT_INFO.clear();
log.info("FPC RPC Service Stopped");
}
@@ -162,7 +186,7 @@
// Create the Default Tenant and added to the Tenants structure.
final DefaultTenants tenants = new DefaultTenants();
final DefaultTenant tenant = new DefaultTenant();
- tenant.tenantId(defaultIdentity);
+ tenant.tenantId(DEFAULT_IDENTITY);
tenant.fpcTopology(new DefaultFpcTopology());
tenant.fpcPolicy(new DefaultFpcPolicy());
tenant.fpcMobility(new DefaultFpcMobility());
@@ -226,8 +250,12 @@
if (controlProtocol.isAssignableFrom(ZmqDpnControlProtocol.class)) {
dpnCommunicationService = DpnNgicCommunicator.getInstance();
} else if (controlProtocol.isAssignableFrom(P4DpnControlProtocol.class)) {
- dpnCommunicationService = DpnP4Communicator.getInstance();
- DpnP4Communicator.getInstance().setDeviceId(DeviceId.deviceId(optionalDpn.get().dpnId().toString()));
+ DpnP4Communicator instance = DpnP4Communicator.getInstance();
+ dpnCommunicationService = instance;
+ instance.setDeviceId(
+ DeviceId.deviceId(optionalDpn.get().dpnId().toString())
+ );
+ instance.setClientId(clientInfo.clientId());
} else {
throw new RuntimeException("Control Protocol is not supported.");
}
@@ -244,8 +272,8 @@
}
// get DPN Topic from Node/Network pair
- byte topic_id = getTopicFromNode(key.get());
- if (topic_id == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
+ byte topicId = getTopicFromNode(key.get());
+ if (topicId == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -264,35 +292,35 @@
log.debug("handling configure create event {} for {}", commands, dpn.dpnId());
- Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
- s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
+ Ip4Address s1UEnodebIpv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
+ s1USgwIpv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
- long client_id = clientInfo.clientId().fpcIdentity().union().int64(),
- session_id = context.contextId().fpcIdentity().union().int64(),
- s1u_sgw_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters()
+ long clientId = clientInfo.clientId().fpcIdentity().union().int64(),
+ sessionId = context.contextId().fpcIdentity().union().int64(),
+ s1USgwGtpuTeid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters()
.mobprofileParameters()).tunnelIdentifier(),
- s1u_enb_gtpu_teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters()
+ s1UEnbGtpuTeid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters()
.mobprofileParameters()).tunnelIdentifier();
- BigInteger op_id = operationId.uint64(),
+ BigInteger opId = operationId.uint64(),
imsi = context.imsi().uint64();
- short default_ebi = context.ebi().uint8();
+ short defaultEbi = context.ebi().uint8();
// TODO try to make sense out of this...
if (commands.contains("session")) {
DefaultContexts convertContext = convertContext(context);
tasks.add(Executors.callable(() -> {
- dpnCommunicationService.create_session(
- topic_id,
+ dpnCommunicationService.createSession(
+ topicId,
imsi,
- default_ebi,
+ defaultEbi,
Ip4Prefix.valueOf(context.delegatingIpPrefixes().get(0).toString()).address(),
- s1u_sgw_gtpu_teid,
- s1u_sgw_ipv4,
- session_id,
- client_id,
- op_id
+ s1USgwGtpuTeid,
+ s1USgwIpv4,
+ sessionId,
+ clientId,
+ opId
);
ModelObjectId modelObjectId = tenantBuilder(tenantId)
@@ -305,14 +333,14 @@
// FIXME why downlink is in session while uplink is not?
if (commands.contains("downlink")) {
tasks.add(Executors.callable(() -> {
- dpnCommunicationService.modify_bearer(
- topic_id,
- s1u_sgw_ipv4,
- s1u_enb_gtpu_teid,
- s1u_enodeb_ipv4,
- session_id,
- client_id,
- op_id
+ dpnCommunicationService.modifyBearer(
+ topicId,
+ s1USgwIpv4,
+ s1UEnbGtpuTeid,
+ s1UEnodebIpv4,
+ sessionId,
+ clientId,
+ opId
);
ModelObjectId modelObjectId = tenantBuilder(tenantId)
@@ -331,8 +359,7 @@
}
// execute all tasks.
- ExecutorService executor = Executors.newWorkStealingPool();
- executor.invokeAll(tasks).forEach(
+ Executors.newWorkStealingPool().invokeAll(tasks).forEach(
future -> {
try {
future.get();
@@ -391,8 +418,12 @@
if (controlProtocol.isAssignableFrom(ZmqDpnControlProtocol.class)) {
dpnCommunicationService = DpnNgicCommunicator.getInstance();
} else if (controlProtocol.isAssignableFrom(P4DpnControlProtocol.class)) {
- dpnCommunicationService = DpnP4Communicator.getInstance();
- DpnP4Communicator.getInstance().setDeviceId(DeviceId.deviceId(optionalDpn.get().dpnId().toString()));
+ DpnP4Communicator instance = DpnP4Communicator.getInstance();
+ dpnCommunicationService = instance;
+ instance.setDeviceId(
+ DeviceId.deviceId(optionalDpn.get().dpnId().toString())
+ );
+ instance.setClientId(clientInfo.clientId());
} else {
throw new RuntimeException("Control Protocol is not supported.");
}
@@ -410,8 +441,8 @@
}
// get DPN Topic from Node/Network pair
- byte topic_id = getTopicFromNode(key.get());
- if (topic_id == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
+ byte topicId = getTopicFromNode(key.get());
+ if (topicId == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -424,10 +455,11 @@
log.debug("handling configure update event {} for {}", commands, dpn.dpnId());
- Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
- s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
+ Ip4Address s1UEnodebIpv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
+ s1USgwIpv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
- long s1u_enb_gtpu_teid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier(),
+ long s1UEnbGtpuTeid = ((ThreegppTunnel) context.dl().mobilityTunnelParameters()
+ .mobprofileParameters()).tunnelIdentifier(),
cId = clientInfo.clientId().fpcIdentity().union().int64(),
contextId = context.contextId().fpcIdentity().union().int64();
@@ -437,11 +469,11 @@
if (commands.contains("downlink")) {
if (context.dl().lifetime() >= 0L) {
tasks.add(Executors.callable(() -> {
- dpnCommunicationService.modify_bearer(
- topic_id,
- s1u_sgw_ipv4,
- s1u_enb_gtpu_teid,
- s1u_enodeb_ipv4,
+ dpnCommunicationService.modifyBearer(
+ topicId,
+ s1USgwIpv4,
+ s1UEnbGtpuTeid,
+ s1UEnodebIpv4,
contextId,
cId,
opId
@@ -461,11 +493,11 @@
if (commands.contains("uplink")) {
if (context.ul().lifetime() >= 0L) {
tasks.add(Executors.callable(() -> {
- dpnCommunicationService.modify_bearer(
- topic_id,
- s1u_sgw_ipv4,
- s1u_enb_gtpu_teid,
- s1u_enodeb_ipv4,
+ dpnCommunicationService.modifyBearer(
+ topicId,
+ s1USgwIpv4,
+ s1UEnbGtpuTeid,
+ s1UEnodebIpv4,
contextId,
cId,
opId
@@ -486,8 +518,7 @@
}
// execute all tasks.
- ExecutorService executor = Executors.newWorkStealingPool();
- executor.invokeAll(tasks).forEach(
+ Executors.newWorkStealingPool().invokeAll(tasks).forEach(
future -> {
try {
future.get();
@@ -525,7 +556,8 @@
// find context that this target is about.
FpcContextId fpcContextId = FpcContextId.of(FpcIdentity.fromString(trgt));
- Optional<DefaultContexts> defaultContexts = CacheManager.getInstance(tenantId).contextsCache.get(fpcContextId);
+ Optional<DefaultContexts> defaultContexts = CacheManager.getInstance(tenantId)
+ .contextsCache.get(fpcContextId);
if (!defaultContexts.isPresent()) {
throw new RuntimeException("Context doesn't exist. Please issue create operation..");
}
@@ -546,8 +578,12 @@
if (controlProtocol.isAssignableFrom(ZmqDpnControlProtocol.class)) {
dpnCommunicationService = DpnNgicCommunicator.getInstance();
} else if (controlProtocol.isAssignableFrom(P4DpnControlProtocol.class)) {
- dpnCommunicationService = DpnP4Communicator.getInstance();
- DpnP4Communicator.getInstance().setDeviceId(DeviceId.deviceId(optionalDpn.get().dpnId().toString()));
+ DpnP4Communicator instance = DpnP4Communicator.getInstance();
+ dpnCommunicationService = instance;
+ instance.setDeviceId(
+ DeviceId.deviceId(optionalDpn.get().dpnId().toString())
+ );
+ instance.setClientId(clientInfo.clientId());
} else {
throw new RuntimeException("Control Protocol is not supported.");
}
@@ -560,8 +596,8 @@
}
// find DPN Topic from Node/Network ID pair.
- byte topic_id = getTopicFromNode(key.get());
- if (topic_id == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
+ byte topicId = getTopicFromNode(key.get());
+ if (topicId == -1 && dpnCommunicationService instanceof ZmqDpnControlProtocol) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -569,20 +605,21 @@
throw new RuntimeException("mobprofileParameters are not instance of ThreegppTunnel");
}
- Long teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters().mobprofileParameters()).tunnelIdentifier();
- long client_id = clientInfo.clientId().fpcIdentity().union().int64();
- BigInteger op_id = operationId.uint64();
+ Long teid = ((ThreegppTunnel) context.ul().mobilityTunnelParameters()
+ .mobprofileParameters()).tunnelIdentifier();
+ long clientId = clientInfo.clientId().fpcIdentity().union().int64();
+ BigInteger opId = operationId.uint64();
// TODO figure out what is going on.
if (targetStr.endsWith("ul") || targetStr.endsWith("dl")) {
// TODO delete bearer
} else {
tasks.add(Executors.callable(() -> {
- dpnCommunicationService.delete_session(
- topic_id,
+ dpnCommunicationService.deleteSession(
+ topicId,
context.contextId().fpcIdentity().union().int64(),
- client_id,
- op_id
+ clientId,
+ opId
);
ContextsKeys contextsKeys = new ContextsKeys();
@@ -601,8 +638,7 @@
}
// execute all tasks
- ExecutorService executor = Executors.newWorkStealingPool();
- executor.invokeAll(tasks).forEach(
+ Executors.newWorkStealingPool().invokeAll(tasks).forEach(
future -> {
try {
future.get();
@@ -738,11 +774,17 @@
case REMOVE:
configureDpnOutput = configureDpnRemove(input);
break;
+ default:
+ break;
}
}
} catch (Exception e) {
status = RpcOutput.Status.RPC_FAILURE;
- org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr defaultErr = new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr();
+ org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent
+ .resultbodydpn.resulttype.DefaultErr defaultErr =
+ new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent
+ .resultbodydpn.resulttype.DefaultErr();
+
defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
defaultErr.errorTypeId(ErrorTypeId.of(0));
configureDpnOutput.resultType(defaultErr);
@@ -768,21 +810,21 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
+ if (!CLIENT_INFO.containsKey(input.clientId())) {
throw new RuntimeException("Client Identifier is not registered.");
}
switch (input.opType()) {
case CREATE:
configureOutput = configureCreate(
(CreateOrUpdate) input.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
input.opId()
);
break;
case UPDATE:
configureOutput = configureUpdate(
(CreateOrUpdate) input.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
input.opId()
);
break;
@@ -791,10 +833,12 @@
case DELETE:
configureOutput = configureDelete(
(DeleteOrQuery) input.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
input.opId()
);
break;
+ default:
+ break;
}
configureOutput.opId(input.opId());
}
@@ -827,23 +871,24 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
+ if (!CLIENT_INFO.containsKey(input.clientId())) {
throw new RuntimeException("Client Identifier is not registered.");
}
- for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
+ for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent
+ .configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
switch (bundle.opType()) {
case CREATE:
configureOutput = configureCreate(
(CreateOrUpdate) bundle.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
bundle.opId()
);
break;
case UPDATE:
configureOutput = configureUpdate(
(CreateOrUpdate) bundle.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
bundle.opId()
);
break;
@@ -852,10 +897,12 @@
case DELETE:
configureOutput = configureDelete(
(DeleteOrQuery) bundle.opBody(),
- clientInfo.get(input.clientId()),
+ CLIENT_INFO.get(input.clientId()),
bundle.opId()
);
break;
+ default:
+ break;
}
Bundles result = new DefaultBundles();
result.opId(bundle.opId());
@@ -921,15 +968,15 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- if (clientInfo.containsKey(input.clientId())) {
+ if (CLIENT_INFO.containsKey(input.clientId())) {
throw new RuntimeException("Client already registered.");
}
// keep information for each client. this can be moved to the DC Store and use Cache.
- clientInfo.put(input.clientId(), input);
+ CLIENT_INFO.put(input.clientId(), input);
// keep clients for each tenant
- HashSet<ClientIdentifier> hashSet = tenantInfo.getOrDefault(input.tenantId(), Sets.newHashSet());
+ HashSet<ClientIdentifier> hashSet = TENANT_INFO.getOrDefault(input.tenantId(), Sets.newHashSet());
hashSet.add(input.clientId());
- tenantInfo.put(input.tenantId(), hashSet);
+ TENANT_INFO.put(input.tenantId(), hashSet);
registerClientOutput.clientId(input.clientId());
registerClientOutput.supportedFeatures(input.supportedFeatures());
registerClientOutput.endpointUri(input.endpointUri());
@@ -976,10 +1023,10 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
+ if (!CLIENT_INFO.containsKey(input.clientId())) {
throw new RuntimeException("Client does not exist.");
}
- clientInfo.remove(input.clientId());
+ CLIENT_INFO.remove(input.clientId());
deregisterClientOutput.clientId(input.clientId());
cpProviderService.getListener().deviceRemoved(input.clientId().toString());
@@ -1018,7 +1065,7 @@
@Override
public void event(DeviceEvent event) {
if (event.subject().manufacturer().equals("fpc")) {
- tenantInfo.forEach(
+ TENANT_INFO.forEach(
(tenantId, clients) -> {
String nodeNetwork = event.subject().annotations().value("node/network");
try {
@@ -1063,8 +1110,8 @@
}
// else if (event.subject().manufacturer().equals("cp")) {
// String clientId = event.subject().annotations().value("client-id");
-// if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
-// DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
+// if (CLIENT_INFO.keySet().contains(ClientIdentifier.fromString(clientId))) {
+// DefaultRegisterClientInput clientInput = CLIENT_INFO.get(ClientIdentifier.fromString(clientId));
//
// Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
// if (defaultTenant.isPresent()) {
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
index 4a29287..fc237b1 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent;
import com.google.common.annotations.Beta;
@@ -8,8 +24,6 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configuredpn.DefaultConfigureDpnOutput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.CreateOrUpdate;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.DeleteOrQuery;
-import org.onosproject.yang.model.RpcInput;
-import org.onosproject.yang.model.RpcOutput;
/**
* Main service that handles RPC events and DC Store modifications.
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
index 6adc0bc..7bb120d 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
@@ -1,15 +1,31 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent;
-import org.onosproject.fpcagent.providers.DpnDeviceListener;
+import org.onosproject.fpcagent.providers.ZmqDpnDeviceListener;
import org.onosproject.fpcagent.util.ConfigHelper;
import java.util.Optional;
public interface FpcService {
- void addListener(DpnDeviceListener listener);
+ void addListener(ZmqDpnDeviceListener listener);
- void removeListener(DpnDeviceListener listener);
+ void removeListener(ZmqDpnDeviceListener listener);
Optional<ConfigHelper> getConfig();
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
index c44a18b..88eab75 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
@@ -28,82 +28,82 @@
/**
* Creates Mobility Session.
*
- * @param topic_id - DPN Topic ID
+ * @param topicId - DPN Topic ID
* @param imsi - IMSI identifier
- * @param default_ebi - EBI
- * @param ue_ipv4 - UE IPv4 Address
- * @param s1u_sgw_teid - SGW Tunnel Identifier
- * @param s1u_sgw_ipv4 - SGW IPv4
- * @param session_id - Context Identifier
- * @param client_id - Client Identifier
- * @param op_id - Operation Identifier
+ * @param defaultEbi - EBI
+ * @param ueIpv4 - UE IPv4 Address
+ * @param s1USgwTeid - SGW Tunnel Identifier
+ * @param s1USgwIpv4 - SGW IPv4
+ * @param sessionId - Context Identifier
+ * @param clientId - Client Identifier
+ * @param opId - Operation Identifier
*/
- void create_session(
- byte topic_id,
+ void createSession(
+ byte topicId,
BigInteger imsi,
- Short default_ebi,
- Ip4Address ue_ipv4,
- Long s1u_sgw_teid,
- Ip4Address s1u_sgw_ipv4,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ Short defaultEbi,
+ Ip4Address ueIpv4,
+ Long s1USgwTeid,
+ Ip4Address s1USgwIpv4,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
);
/**
* Modifies Bearer.
*
- * @param topic_id - DPN Topic ID
- * @param s1u_sgw_ipv4 - SGW IPv4 Address
- * @param s1u_enodeb_teid - ENodeB Tunnel Identifier
- * @param s1u_enodeb_ipv4 - ENodeB IPv4 Address
- * @param session_id - Context Identifier
- * @param client_id - Client Identifier
- * @param op_id - Operation Identifier
+ * @param topicId - DPN Topic ID
+ * @param s1USgwIpv4 - SGW IPv4 Address
+ * @param s1UEnodebTeid - ENodeB Tunnel Identifier
+ * @param s1UEnodebIpv4 - ENodeB IPv4 Address
+ * @param sessionId - Context Identifier
+ * @param clientId - Client Identifier
+ * @param opId - Operation Identifier
*/
- void modify_bearer(
- byte topic_id,
- Ip4Address s1u_sgw_ipv4,
- Long s1u_enodeb_teid,
- Ip4Address s1u_enodeb_ipv4,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ void modifyBearer(
+ byte topicId,
+ Ip4Address s1USgwIpv4,
+ Long s1UEnodebTeid,
+ Ip4Address s1UEnodebIpv4,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
);
/**
* Deletes Mobility Session.
*
- * @param topic_id - DPN Topic ID
- * @param session_id - Context Identifier
- * @param client_id - Client Identifier
- * @param op_id - Operation Identifier
+ * @param topicId - DPN Topic ID
+ * @param sessionId - Context Identifier
+ * @param clientId - Client Identifier
+ * @param opId - Operation Identifier
*/
- void delete_session(
- byte topic_id,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ void deleteSession(
+ byte topicId,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
);
/**
- * Creates the byte buffer to send ADC rules over ZMQ
+ * Creates the byte buffer to send ADC rules over ZMQ.
*
* @param topic - DPN Topic ID
- * @param domain_name - domain
+ * @param domainName - domain
* @param ip - ipaddress/ipprefix (i.e. 127.0.0.1/32)
* @param drop - Drop if 1
- * @param rating_group - Rating Group
- * @param service_ID - Service ID
- * @param sponsor_ID - Sponsor ID
+ * @param ratingGroup - Rating Group
+ * @param serviceId - Service ID
+ * @param sponsorId - Sponsor ID
*/
- void send_ADC_rules(
+ void sendAdcRules(
Short topic,
- String domain_name,
+ String domainName,
String ip,
Short drop,
- Long rating_group,
- Long service_ID,
- String sponsor_ID
+ Long ratingGroup,
+ Long serviceId,
+ String sponsorId
);
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
index ba697ed..d35b65f 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
@@ -21,8 +21,8 @@
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onosproject.fpcagent.util.CacheManager;
-import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
-import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
+import org.onosproject.fpcagent.workers.ZmqSbPublisherManager;
+import org.onosproject.fpcagent.workers.ZmqSbSubscriberManager;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.ZmqDpnControlProtocol;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.tenant.fpctopology.DefaultDpns;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
@@ -40,7 +40,7 @@
/**
* DPDK DPN API over ZeroMQ for NGIC.
*/
-public class DpnNgicCommunicator extends ZmqDpnControlProtocol implements DpnCommunicationService {
+public final class DpnNgicCommunicator extends ZmqDpnControlProtocol implements DpnCommunicationService {
protected static final Logger log = LoggerFactory.getLogger(DpnNgicCommunicator.class);
private static DpnNgicCommunicator _instance;
@@ -49,7 +49,7 @@
}
public static DpnNgicCommunicator createInstance() {
- if(_instance == null) {
+ if (_instance == null) {
_instance = new DpnNgicCommunicator();
}
return _instance;
@@ -60,94 +60,120 @@
}
/**
- * Broadcasts the GOODBYE message to all the DPNs
+ * Broadcasts the GOODBYE message to all the DPNs.
+ *
+ * @param nodeId node identifier
+ * @param networkId network identifier
*/
- public static void send_goodbye_dpns(String nodeId, String networkId) {
+ public static void sendGoodbyeDpns(String nodeId, String networkId) {
ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
bb.put(ReservedTopics.BROADCAST_DPNS.getType())
- .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(S11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
.put(ControllerStatusIndication.GOODBYE.getType())
- .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint32(ZmqSbSubscriberManager.getInstance().getControllerSourceId()))
.put(toUint8((short) nodeId.length()))
.put(nodeId.getBytes())
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
- log.info("send_goodbye_dpns: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendGoodbyeDpns: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
/**
- * Broadcasts the HELLO message to all the DPNs
+ * Broadcasts the HELLO message to all the DPNs.
+ *
+ * @param nodeId node identifier
+ * @param networkId network identifier
*/
- public static void send_hello_dpns(String nodeId, String networkId) {
+ public static void sendHelloDpns(String nodeId, String networkId) {
ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
bb.put(ReservedTopics.BROADCAST_DPNS.getType())
- .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(S11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
.put(ControllerStatusIndication.HELLO.getType())
- .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint32(ZmqSbSubscriberManager.getInstance().getControllerSourceId()))
.put(toUint8((short) nodeId.length()))
.put(nodeId.getBytes())
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
- log.info("send_hello_dpns: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendHelloDpns: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
- public static void send_assign_conflict(String nodeId, String networkId) {
+ /**
+ * Broadcasts a conflict on node and network identifiers.
+ *
+ * @param nodeId node identifier
+ * @param networkId network identifier
+ */
+ public static void sendAssignConflict(String nodeId, String networkId) {
ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
bb.put(ReservedTopics.BROADCAST_ALL.getType())
- .put(s11MsgType.ASSIGN_CONFLICT.getType())
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
- .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(S11MsgType.ASSIGN_CONFLICT.getType())
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(ZmqSbSubscriberManager.getInstance().getControllerSourceId()))
.put(toUint8((short) nodeId.length()))
.put(nodeId.getBytes())
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
- log.info("send_assign_conflict: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendAssignConflict: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
- public static void send_assign_topic(String nodeId, String networkId, byte topic) {
+ /**
+ * Broadcasts a topic to be assigned for the specific node and network.
+ *
+ * @param nodeId node identifier
+ * @param networkId network identifier
+ * @param topic assigned topic
+ */
+ public static void sendAssignTopic(String nodeId, String networkId, byte topic) {
ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
bb.put(ReservedTopics.BROADCAST_ALL.getType())
- .put(s11MsgType.ASSIGN_TOPIC.getType())
+ .put(S11MsgType.ASSIGN_TOPIC.getType())
.put(topic)
- .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint32(ZmqSbSubscriberManager.getInstance().getControllerSourceId()))
.put(toUint8((short) nodeId.length()))
.put(nodeId.getBytes())
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
- log.info("send_assign_topic: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendAssignTopic: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
- public static void send_status_ack(String nodeId, String networkId, byte topic) {
+ /**
+ * Sends acknowledgment for the DPN status.
+ *
+ * @param nodeId node identifier
+ * @param networkId network identifier
+ * @param topic node's topic
+ */
+ public static void sendStatusAck(String nodeId, String networkId, byte topic) {
ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
.put(topic)
- .put(s11MsgType.DPN_STATUS_ACK.getType())
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
- .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(S11MsgType.DPN_STATUS_ACK.getType())
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(ZmqSbSubscriberManager.getInstance().getControllerSourceId()))
.put(toUint8((short) nodeId.length()))
.put(nodeId.getBytes())
.put(toUint8((short) networkId.length()))
.put(networkId.getBytes());
- log.info("send_status_ack: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendStatusAck: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
/**
- * Parses the JSON returned from the Control Plane and sends the ACK to the DPN
+ * Parses the JSON returned from the Control Plane and sends the ACK to the DPN.
*
* @param body - The JSON body returned by the Control Plane in the DDN ACK
*/
- public static void send_ddn_ack(JSONObject body) {
+ public static void sendDdnAck(JSONObject body) {
try {
ByteBuffer bb = ByteBuffer.allocate(14);
byte topic = -1;
@@ -167,7 +193,7 @@
throw new RuntimeException("DPN specified in DDN not found.");
}
- bb.put(topic).put(s11MsgType.DDN_ACK.getType());
+ bb.put(topic).put(S11MsgType.DDN_ACK.getType());
if (body.has("dl-buffering-duration")) {
bb.put(toUint8((short) body.getInt("dl-buffering-duration")));
@@ -175,11 +201,11 @@
if (body.has("dl-buffering-suggested-count")) {
bb.put(toUint16((int) body.getInt("dl-buffering-suggested-count")));
}
- bb.put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ bb.put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
.put(toUint32(Long.parseLong(body.getString("client-id"))))
.put(toUint32(body.getLong("op-id")));
- ZMQSBPublisherManager.getInstance().send(bb);
+ ZmqSbPublisherManager.getInstance().send(bb);
} catch (Exception e) {
log.error("{}", ExceptionUtils.getFullStackTrace(e));
}
@@ -187,20 +213,21 @@
}
@Override
- public void create_session(
- byte topic_id,
+ public void createSession(
+ byte topicId,
BigInteger imsi,
- Short default_ebi,
- Ip4Address ue_ipv4,
- Long s1u_sgw_teid,
- Ip4Address s1u_sgw_ipv4,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ Short defaultEbi,
+ Ip4Address ueIpv4,
+ Long s1USgwTeid,
+ Ip4Address s1USgwIpv4,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
) {
- /* NGIC Create Session expected buffer:
+ /*
+ NGIC Create Session expected buffer:
uint8_t topic_id;
- uint8_t type;
+ uint8_t type;
struct create_session_t {
uint64_t imsi;
uint8_t default_ebi;
@@ -214,155 +241,159 @@
} create_session_msg;
*/
ByteBuffer bb = ByteBuffer.allocate(41)
- .put(topic_id)
- .put(s11MsgType.CREATE_SESSION.getType())
+ .put(topicId)
+ .put(S11MsgType.CREATE_SESSION.getType())
.put(toUint64(imsi))
- .put(toUint8(default_ebi))
- .put(toUint32(ue_ipv4.toInt()))
- .put(toUint32(s1u_sgw_teid))
- .put(toUint32(s1u_sgw_ipv4.toInt()))
- .put(toUint64(BigInteger.valueOf(session_id)))
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
- .put(toUint32(client_id))
- .put(toUint32(op_id.longValue()));
+ .put(toUint8(defaultEbi))
+ .put(toUint32(ueIpv4.toInt()))
+ .put(toUint32(s1USgwTeid))
+ .put(toUint32(s1USgwIpv4.toInt()))
+ .put(toUint64(BigInteger.valueOf(sessionId)))
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(clientId))
+ .put(toUint32(opId.longValue()));
- log.info("create_session(topic_id={}, imsi={}, default_ebi={}, " +
+ log.info("createSession(topic_id={}, imsi={}, default_ebi={}, " +
"ue_ipv4={}, s1u_sgw_teid={}, s1u_sgw_ipv4={}, " +
"session_id={}, client_id={}, op_id={})",
- topic_id, imsi, default_ebi, ue_ipv4, s1u_sgw_teid,
- s1u_sgw_ipv4, session_id, client_id, op_id);
+ topicId, imsi, defaultEbi, ueIpv4, s1USgwTeid,
+ s1USgwIpv4, sessionId, clientId, opId);
- ZMQSBPublisherManager.getInstance().send(bb);
+ ZmqSbPublisherManager.getInstance().send(bb);
}
@Override
- public void modify_bearer(
- byte topic_id,
- Ip4Address s1u_sgw_ipv4,
- Long s1u_enodeb_teid,
- Ip4Address s1u_enodeb_ipv4,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ public void modifyBearer(
+ byte topicId,
+ Ip4Address s1USgwIpv4,
+ Long s1UEnodebTeid,
+ Ip4Address s1UEnodebIpv4,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
) {
/*
- NGIC Modify Session expected buffer:
- uint8_t topic_id;
- uint8_t type;
- struct modify_bearer_t {
- uint32_t s1u_sgw_ipv4;
- uint32_t s1u_enodeb_teid;
- uint32_t s1u_enodeb_ipv4;
- uint64_t session_id;
- uint8_t controller_topic;
- uint32_t client_id;
- uint32_t op_id;
- } modify_bearer_msg;
+ NGIC Modify Session expected buffer:
+ uint8_t topic_id;
+ uint8_t type;
+ struct modify_bearer_t {
+ uint32_t s1u_sgw_ipv4;
+ uint32_t s1u_enodeb_teid;
+ uint32_t s1u_enodeb_ipv4;
+ uint64_t session_id;
+ uint8_t controller_topic;
+ uint32_t client_id;
+ uint32_t op_id;
+ } modify_bearer_msg;
*/
ByteBuffer bb = ByteBuffer.allocate(32)
- .put(topic_id)
- .put(s11MsgType.UPDATE_MODIFY_BEARER.getType())
- .put(toUint32(s1u_sgw_ipv4.toInt()))
- .put(toUint32(s1u_enodeb_teid))
- .put(toUint32(s1u_enodeb_ipv4.toInt()))
- .put(toUint64(BigInteger.valueOf(session_id)))
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
- .put(toUint32(client_id))
- .put(toUint32(op_id.longValue()));
+ .put(topicId)
+ .put(S11MsgType.UPDATE_MODIFY_BEARER.getType())
+ .put(toUint32(s1USgwIpv4.toInt()))
+ .put(toUint32(s1UEnodebTeid))
+ .put(toUint32(s1UEnodebIpv4.toInt()))
+ .put(toUint64(BigInteger.valueOf(sessionId)))
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(clientId))
+ .put(toUint32(opId.longValue()));
- log.info("modify_bearer(topic_id={}, s1u_sgw_ipv4={}, " +
+ log.info("modifyBearer(topic_id={}, s1u_sgw_ipv4={}, " +
"s1u_enodeb_teid={}, s1u_enodeb_ipv4={}, " +
"session_id={}, client_id={}, op_id={})",
- topic_id, s1u_sgw_ipv4, s1u_enodeb_teid, s1u_enodeb_ipv4,
- session_id, client_id, op_id);
+ topicId, s1USgwIpv4, s1UEnodebTeid, s1UEnodebIpv4,
+ sessionId, clientId, opId);
- ZMQSBPublisherManager.getInstance().send(bb);
+ ZmqSbPublisherManager.getInstance().send(bb);
}
@Override
- public void delete_session(
- byte topic_id,
- Long session_id,
- Long client_id,
- BigInteger op_id
+ public void deleteSession(
+ byte topicId,
+ Long sessionId,
+ Long clientId,
+ BigInteger opId
) {
/*
NGIC Delete Session expected buffer:
uint8_t topic_id;
- uint8_t type;
+ uint8_t type;
struct delete_session_t {
- uint64_t session_id;
- uint8_t controller_topic;
- uint32_t client_id;
- uint32_t op_id;
- } delete_session_msg;
+ uint64_t session_id;
+ uint8_t controller_topic;
+ uint32_t client_id;
+ uint32_t op_id;
+ } delete_session_msg;
*/
ByteBuffer bb = ByteBuffer.allocate(19)
- .put(topic_id)
- .put(s11MsgType.DELETE_SESSION.getType())
- .put(toUint64(BigInteger.valueOf(session_id)))
- .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
- .put(toUint32(client_id))
- .put(toUint32(op_id.longValue()));
+ .put(topicId)
+ .put(S11MsgType.DELETE_SESSION.getType())
+ .put(toUint64(BigInteger.valueOf(sessionId)))
+ .put(ZmqSbSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(clientId))
+ .put(toUint32(opId.longValue()));
- log.info("delete_session(topic_id={}, session_id={}, client_id={}, " +
+ log.info("deleteSession(topic_id={}, session_id={}, client_id={}, " +
"op_id={})",
- topic_id, session_id, client_id, op_id);
-
- ZMQSBPublisherManager.getInstance().send(bb);
+ topicId, sessionId, clientId, opId);
+
+ ZmqSbPublisherManager.getInstance().send(bb);
}
@Override
- public void send_ADC_rules(
+ public void sendAdcRules(
Short topic,
- String domain_name,
+ String domainName,
String ip,
Short drop,
- Long rating_group,
- Long service_ID, String sponsor_ID
+ Long ratingGroup,
+ Long serviceId, String sponsorId
) {
// TODO take a look for this function. Not tested.
- Ip4Prefix ip_prefix = null;
+ Ip4Prefix ip4Prefix = null;
if (ip != null) {
- ip_prefix = Ip4Prefix.valueOf(ip);
+ ip4Prefix = Ip4Prefix.valueOf(ip);
}
- Short selector_type = (short) (domain_name != null ? 0 : ip_prefix != null ? 2 : ip_prefix.address() != null ? 1 : 255);
- if (selector_type == 255) {
+ Short selectorType = (short) (domainName != null ? 0 : ip4Prefix != null ?
+ 2 : ip4Prefix.address() != null ? 1 : 255);
+ if (selectorType == 255) {
log.warn("Domain/IP not found, failed to send rules");
return;
}
ByteBuffer bb = ByteBuffer.allocate(200);
bb.put(toUint8(topic))
- .put(s11MsgType.ADC_RULE.getType())
- .put(toUint8(selector_type));
- if (selector_type == 0) {
- bb.put(toUint8((short) domain_name.length()))
- .put(domain_name.getBytes());
+ .put(S11MsgType.ADC_RULE.getType())
+ .put(toUint8(selectorType));
+ if (selectorType == 0) {
+ bb.put(toUint8((short) domainName.length()))
+ .put(domainName.getBytes());
}
- if ((selector_type == 1) || (selector_type == 2)) {
- int ip_address_long = ip_prefix.address().toInt();
- bb.put(toUint32(ip_address_long));
+ if ((selectorType == 1) || (selectorType == 2)) {
+ int ipAddressLong = ip4Prefix.address().toInt();
+ bb.put(toUint32(ipAddressLong));
}
- if (selector_type == 2) {
- bb.put(toUint16(ip_prefix.prefixLength()));
+ if (selectorType == 2) {
+ bb.put(toUint16(ip4Prefix.prefixLength()));
}
- if (drop != null)
+ if (drop != null) {
bb.put(toUint8(drop));
- if (rating_group != null)
- bb.put(toUint32(rating_group));
- if (service_ID != null)
- bb.put(toUint32(service_ID));
- if (sponsor_ID != null && (short) sponsor_ID.length() > 0) {
- bb.put(toUint8((short) sponsor_ID.length()))
- .put(sponsor_ID.getBytes());
}
- bb.put(ZMQSBSubscriberManager.getInstance().getControllerTopic());
+ if (ratingGroup != null) {
+ bb.put(toUint32(ratingGroup));
+ }
+ if (serviceId != null) {
+ bb.put(toUint32(serviceId));
+ }
+ if (sponsorId != null && (short) sponsorId.length() > 0) {
+ bb.put(toUint8((short) sponsorId.length()))
+ .put(sponsorId.getBytes());
+ }
+ bb.put(ZmqSbSubscriberManager.getInstance().getControllerTopic());
- log.info("send_ADC_rules: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ log.info("sendAdcRules: {}", bb.array());
+ ZmqSbPublisherManager.getInstance().send(bb);
}
- public enum s11MsgType {
+ public enum S11MsgType {
CREATE_SESSION(1) {
@Override
public String toString() {
@@ -486,12 +517,12 @@
private byte type;
- s11MsgType(int type) {
+ S11MsgType(int type) {
this.type = (byte) type;
}
- public static s11MsgType getEnum(byte name) {
- Optional<s11MsgType> any = Arrays.stream(s11MsgType.values())
+ public static S11MsgType getEnum(byte name) {
+ Optional<S11MsgType> any = Arrays.stream(S11MsgType.values())
.filter(typeStr -> typeStr.type == name)
.findAny();
if (any.isPresent()) {
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
index 6a26e60..83afb22 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
@@ -35,6 +35,8 @@
import org.onosproject.net.pi.runtime.PiAction;
import org.onosproject.net.pi.runtime.PiActionParam;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,55 +50,58 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.onosproject.fpcagent.util.FpcUtil.createConfigResultNotification;
+import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
-public class DpnP4Communicator
+public final class DpnP4Communicator
extends P4DpnControlProtocol
implements DpnCommunicationService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private static PiTableId TBL_ID_UE_FILTER = PiTableId
+ private static final PiTableId TBL_ID_UE_FILTER = PiTableId
.of("spgw_ingress.ue_filter_table");
- private static PiTableId TBL_ID_S1U_FILTER = PiTableId
+ private static final PiTableId TBL_ID_S1U_FILTER = PiTableId
.of("spgw_ingress.s1u_filter_table");
- private static PiTableId TBL_ID_DL_SESS_LOOKUP = PiTableId
+ private static final PiTableId TBL_ID_DL_SESS_LOOKUP = PiTableId
.of("spgw_ingress.dl_sess_lookup");
- private static PiMatchFieldId MF_ID_IPV4_DST = PiMatchFieldId
+ private static final PiMatchFieldId MF_ID_IPV4_DST = PiMatchFieldId
.of("ipv4.dst_addr");
- private static PiMatchFieldId MF_ID_GTPU_IPV4_DST = PiMatchFieldId
+ private static final PiMatchFieldId MF_ID_GTPU_IPV4_DST = PiMatchFieldId
.of("gtpu_ipv4.dst_addr");
- private static PiActionId ACT_ID_SET_DL_SESS_INFO = PiActionId
+ private static final PiActionId ACT_ID_SET_DL_SESS_INFO = PiActionId
.of("spgw_ingress.set_dl_sess_info");
- private static PiActionParamId ACT_PARAM_ID_DL_TEID = PiActionParamId
+ private static final PiActionParamId ACT_PARAM_ID_DL_TEID = PiActionParamId
.of("dl_sess_teid");
- private static PiActionParamId ACT_PARAM_ID_DL_ENB_ADDR = PiActionParamId
+ private static final PiActionParamId ACT_PARAM_ID_DL_ENB_ADDR = PiActionParamId
.of("dl_sess_enb_addr");
- private static PiActionParamId ACT_PARAM_ID_DL_S1U_ADDR = PiActionParamId
+ private static final PiActionParamId ACT_PARAM_ID_DL_S1U_ADDR = PiActionParamId
.of("dl_sess_s1u_addr");
- private static PiAction NO_ACTION = PiAction.builder()
+ private static final PiAction NO_ACTION = PiAction.builder()
.withId(PiActionId.of("NoAction"))
.build();
- private static byte DIR_UPLINK = (byte) 0;
- private static byte DIR_DOWNLINK = (byte) 1;
+ private static final byte DIR_UPLINK = (byte) 0;
+ private static final byte DIR_DOWNLINK = (byte) 1;
- private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_UE_ADDR =
+ private static final ConcurrentMap<Long, Ip4Address> SESS_ID_TO_UE_ADDR =
Maps.newConcurrentMap();
- private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_S1U_ADDR =
+ private static final ConcurrentMap<Long, Ip4Address> SESS_ID_TO_S1U_ADDR =
Maps.newConcurrentMap();
- private static ConcurrentMap<Long, Set<FlowRule>> SESS_ID_TO_FLOWS =
+ private static final ConcurrentMap<Long, Set<FlowRule>> SESS_ID_TO_FLOWS =
Maps.newConcurrentMap();
// FIXME: should use a cache with timeout
- private static Map<Long, Lock> SESS_LOCKS = Maps.newConcurrentMap();
+ private static final Map<Long, Lock> SESS_LOCKS = Maps.newConcurrentMap();
private static DpnP4Communicator _instance;
private ApplicationId appId;
private DeviceId deviceId;
+ private ClientIdentifier clientId;
private FlowRuleService flowRuleService;
private DpnP4Communicator(ApplicationId appId, FlowRuleService flowRuleService) {
@@ -120,6 +125,10 @@
this.deviceId = checkNotNull(deviceId);
}
+ public void setClientId(ClientIdentifier clientId) {
+ this.clientId = clientId;
+ }
+
private boolean isNotInit() {
if (deviceId == null) {
log.error("Not initialized. Cannot perform operations.");
@@ -130,132 +139,147 @@
}
@Override
- public void create_session(byte topic_id, BigInteger imsi, Short default_ebi,
- Ip4Address ue_ipv4, Long s1u_sgw_teid,
- Ip4Address s1u_sgw_ipv4, Long session_id,
- Long client_id, BigInteger op_id) {
+ public void createSession(byte topicId, BigInteger imsi, Short defaultEbi,
+ Ip4Address ueIpv4, Long s1USgwTeid,
+ Ip4Address s1USgwIpv4, Long sessionId,
+ Long clientId, BigInteger opId) {
- log.info("create_session(topic_id={}, imsi={}, default_ebi={}, " +
+ log.info("createSession(topic_id={}, imsi={}, default_ebi={}, " +
"ue_ipv4={}, s1u_sgw_teid={}, s1u_sgw_ipv4={}, " +
"session_id={}, client_id={}, op_id={})",
- topic_id, imsi, default_ebi, ue_ipv4, s1u_sgw_teid,
- s1u_sgw_ipv4, session_id, client_id, op_id);
+ topicId, imsi, defaultEbi, ueIpv4, s1USgwTeid,
+ s1USgwIpv4, sessionId, clientId, opId);
if (isNotInit()) {
return;
}
- SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
- SESS_LOCKS.get(session_id).lock();
+ SESS_LOCKS.putIfAbsent(sessionId, new ReentrantLock());
+ SESS_LOCKS.get(sessionId).lock();
try {
- if (SESS_ID_TO_FLOWS.containsKey(session_id)
- && !SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+ if (SESS_ID_TO_FLOWS.containsKey(sessionId)
+ && !SESS_ID_TO_FLOWS.get(sessionId).isEmpty()) {
log.warn("Creating session {}, but {} rules already exists for such session.",
- session_id, SESS_ID_TO_FLOWS.get(session_id).size());
- SESS_ID_TO_FLOWS.get(session_id).forEach(f -> log.debug("{}", f));
+ sessionId, SESS_ID_TO_FLOWS.get(sessionId).size());
+ SESS_ID_TO_FLOWS.get(sessionId).forEach(f -> log.debug("{}", f));
}
// If old session is there. We keep the old rules.
- SESS_ID_TO_FLOWS.putIfAbsent(session_id, Sets.newHashSet());
- SESS_ID_TO_UE_ADDR.put(session_id, ue_ipv4);
- SESS_ID_TO_S1U_ADDR.put(session_id, s1u_sgw_ipv4);
+ SESS_ID_TO_FLOWS.putIfAbsent(sessionId, Sets.newHashSet());
+ SESS_ID_TO_UE_ADDR.put(sessionId, ueIpv4);
+ SESS_ID_TO_S1U_ADDR.put(sessionId, s1USgwIpv4);
- SESS_ID_TO_FLOWS.get(session_id).add(ueFilterRule(ue_ipv4));
- SESS_ID_TO_FLOWS.get(session_id).add(s1uFilterRule(s1u_sgw_ipv4));
+ SESS_ID_TO_FLOWS.get(sessionId).add(ueFilterRule(ueIpv4));
+ SESS_ID_TO_FLOWS.get(sessionId).add(s1uFilterRule(s1USgwIpv4));
- applySessionRules(session_id);
+ applySessionRules(sessionId);
+
+ sendNotification(
+ clientId.toString(),
+ createConfigResultNotification(OpIdentifier.of(opId), (short) 16)
+ );
} finally {
- SESS_LOCKS.get(session_id).unlock();
+ SESS_LOCKS.get(sessionId).unlock();
}
}
@Override
- public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4,
- Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4,
- Long session_id, Long client_id, BigInteger op_id) {
+ public void modifyBearer(byte topicId, Ip4Address s1USgwIpv4,
+ Long s1UEnodebTeid, Ip4Address s1UEnodebIpv4,
+ Long sessionId, Long clientId, BigInteger opId) {
- log.info("modify_bearer(topic_id={}, s1u_sgw_ipv4={}, " +
+ log.info("modifyBearer(topic_id={}, s1u_sgw_ipv4={}, " +
"s1u_enodeb_teid={}, s1u_enodeb_ipv4={}, " +
"session_id={}, client_id={}, op_id={})",
- topic_id, s1u_sgw_ipv4, s1u_enodeb_teid, s1u_enodeb_ipv4,
- session_id, client_id, op_id);
+ topicId, s1USgwIpv4, s1UEnodebTeid, s1UEnodebIpv4,
+ sessionId, clientId, opId);
if (isNotInit()) {
return;
}
- SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
- SESS_LOCKS.get(session_id).lock();
+ SESS_LOCKS.putIfAbsent(sessionId, new ReentrantLock());
+ SESS_LOCKS.get(sessionId).lock();
try {
- if (!SESS_ID_TO_UE_ADDR.containsKey(session_id)) {
+ if (!SESS_ID_TO_UE_ADDR.containsKey(sessionId)) {
log.error("Missing sess ID in SESS_ID_TO_UE_ADDR map: {}",
- session_id);
+ sessionId);
return;
}
- if (!SESS_ID_TO_S1U_ADDR.containsKey(session_id)) {
+ if (!SESS_ID_TO_S1U_ADDR.containsKey(sessionId)) {
log.error("Missing sess ID in SESS_ID_TO_S1U_ADDR map: {}",
- session_id);
+ sessionId);
return;
}
- final Ip4Address ueAddr = SESS_ID_TO_UE_ADDR.get(session_id);
- final Ip4Address s1uAddr = SESS_ID_TO_S1U_ADDR.get(session_id);
+ final Ip4Address ueAddr = SESS_ID_TO_UE_ADDR.get(sessionId);
+ final Ip4Address s1uAddr = SESS_ID_TO_S1U_ADDR.get(sessionId);
- SESS_ID_TO_FLOWS.get(session_id)
- .add(dlSessLookupRule(ueAddr, s1u_enodeb_teid,
- s1u_enodeb_ipv4, s1uAddr));
+ SESS_ID_TO_FLOWS.get(sessionId)
+ .add(dlSessLookupRule(ueAddr, s1UEnodebTeid,
+ s1UEnodebIpv4, s1uAddr));
- applySessionRules(session_id);
+ applySessionRules(sessionId);
+
+ sendNotification(
+ clientId.toString(),
+ createConfigResultNotification(OpIdentifier.of(opId), (short) 16)
+ );
} finally {
- SESS_LOCKS.get(session_id).unlock();
+ SESS_LOCKS.get(sessionId).unlock();
}
}
@Override
- public void delete_session(byte topic_id, Long session_id, Long client_id,
- BigInteger op_id) {
+ public void deleteSession(byte topicId, Long sessionId, Long clientId,
+ BigInteger opId) {
- log.info("delete_session(topic_id={}, session_id={}, client_id={}, " +
+ log.info("deleteSession(topic_id={}, session_id={}, client_id={}, " +
"op_id={})",
- topic_id, session_id, client_id, op_id);
+ topicId, sessionId, clientId, opId);
if (isNotInit()) {
return;
}
- SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
- SESS_LOCKS.get(session_id).lock();
+ SESS_LOCKS.putIfAbsent(sessionId, new ReentrantLock());
+ SESS_LOCKS.get(sessionId).lock();
try {
- SESS_ID_TO_S1U_ADDR.remove(session_id);
- SESS_ID_TO_UE_ADDR.remove(session_id);
+ SESS_ID_TO_S1U_ADDR.remove(sessionId);
+ SESS_ID_TO_UE_ADDR.remove(sessionId);
- if (!SESS_ID_TO_FLOWS.containsKey(session_id)
- || SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+ if (!SESS_ID_TO_FLOWS.containsKey(sessionId)
+ || SESS_ID_TO_FLOWS.get(sessionId).isEmpty()) {
log.warn("Deleting session {}, but no rules exist for this session",
- session_id);
+ sessionId);
} else {
- removeSessionRules(session_id);
+ removeSessionRules(sessionId);
+
+ sendNotification(
+ clientId.toString(),
+ createConfigResultNotification(OpIdentifier.of(opId), (short) 16)
+ );
}
} finally {
- SESS_LOCKS.get(session_id).unlock();
+ SESS_LOCKS.get(sessionId).unlock();
}
}
@Override
- public void send_ADC_rules(Short topic, String domain_name, String ip,
- Short drop, Long rating_group, Long service_ID,
- String sponsor_ID) {
+ public void sendAdcRules(Short topic, String domainName, String ip,
+ Short drop, Long ratingGroup, Long serviceId,
+ String sponsorId) {
- log.info("send_ADC_rules(topic={}, domain_name={}, ip={}, drop={}, " +
+ log.info("sendAdcRules(topic={}, domain_name={}, ip={}, drop={}, " +
"rating_group={}, service_ID={}, sponsor_ID={})",
- topic, domain_name, ip, drop, rating_group, service_ID,
- sponsor_ID);
+ topic, domainName, ip, drop, ratingGroup, serviceId,
+ sponsorId);
if (isNotInit()) {
return;
}
- log.warn("send_ADC_rules() UNIMPLEMENTED!");
+ log.warn("sendAdcRules() UNIMPLEMENTED!");
}
private FlowRule ueFilterRule(Ip4Address ueAddr) {
@@ -353,5 +377,7 @@
final FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
rules.forEach(opsBuilder::remove);
flowRuleService.apply(opsBuilder.build());
+
+
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
index 21c1413..d6b5429 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
@@ -14,4 +14,7 @@
* limitations under the License.
*/
+/**
+ * Supported protocols for Southbound Interface.
+ */
package org.onosproject.fpcagent.protocols;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
index 0d77d14..23618c0 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
@@ -16,9 +16,20 @@
package org.onosproject.fpcagent.providers;
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.ChassisId;
-import org.onosproject.net.*;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceProviderRegistry;
@@ -100,7 +111,8 @@
.set(AnnotationKeys.MANAGEMENT_ADDRESS, address)
.set("client-id", id)
.build();
- DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "cp", "0.1", "0.1", id, chassisId),
+ DeviceDescription descriptionBase = new DefaultDeviceDescription(
+ deviceId.uri(), type, "cp", "0.1", "0.1", id, chassisId),
description = new DefaultDeviceDescription(descriptionBase, annotations);
providerService.deviceConnected(deviceId, description);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnDeviceListener.java
similarity index 94%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnDeviceListener.java
index 8d099f0..c13bf06 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnDeviceListener.java
@@ -16,7 +16,7 @@
package org.onosproject.fpcagent.providers;
-public interface DpnDeviceListener {
+public interface ZmqDpnDeviceListener {
void deviceAdded(String id, byte topic);
void deviceRemoved(String id);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProvider.java
similarity index 74%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProvider.java
index 69bfba6..c7f2706 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProvider.java
@@ -16,9 +16,20 @@
package org.onosproject.fpcagent.providers;
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.ChassisId;
-import org.onosproject.net.*;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceProviderRegistry;
@@ -35,17 +46,17 @@
*/
@Component(immediate = true)
@Service
-public class DpnProvider extends AbstractProvider implements DpnProviderService {
+public class ZmqDpnProvider extends AbstractProvider implements ZmqDpnProviderService {
- private static final Logger log = getLogger(DpnProvider.class);
- private final InternalDeviceListener listener = new InternalDeviceListener();
+ private static final Logger log = getLogger(ZmqDpnProvider.class);
+ private final InternalDeviceListenerZmq listener = new InternalDeviceListenerZmq();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceProviderRegistry providerRegistry;
private DeviceProviderService providerService;
- public DpnProvider() {
+ public ZmqDpnProvider() {
super(new ProviderId("fpc", "org.onosproject.providers.dpn"));
}
@@ -62,7 +73,7 @@
log.info("FPC Device Provider Stopped");
}
- public InternalDeviceListener getListener() {
+ public InternalDeviceListenerZmq getListener() {
return listener;
}
@@ -86,7 +97,7 @@
}
- public class InternalDeviceListener implements DpnDeviceListener {
+ public class InternalDeviceListenerZmq implements ZmqDpnDeviceListener {
@Override
public void deviceAdded(String id, byte topic) {
@@ -101,7 +112,8 @@
.set("topic", String.valueOf(topic))
.set("node/network", id)
.build();
- DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "fpc", "0.1", "0.1", id, chassisId),
+ DeviceDescription descriptionBase = new DefaultDeviceDescription(
+ deviceId.uri(), type, "fpc", "0.1", "0.1", id, chassisId),
description = new DefaultDeviceDescription(descriptionBase, annotations);
providerService.deviceConnected(deviceId, description);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProviderService.java
similarity index 87%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProviderService.java
index 515730a..cc78c6c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/ZmqDpnProviderService.java
@@ -18,8 +18,8 @@
import org.onosproject.net.device.DeviceProvider;
-public interface DpnProviderService extends DeviceProvider {
+public interface ZmqDpnProviderService extends DeviceProvider {
- DpnDeviceListener getListener();
+ ZmqDpnDeviceListener getListener();
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
index 7f99f9d..650e2f3 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
@@ -14,4 +14,7 @@
* limitations under the License.
*/
+/**
+ * Device providers for NGIC CP and DP.
+ */
package org.onosproject.fpcagent.providers;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
index 047f15e..9d2baac 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
@@ -39,7 +39,7 @@
/**
* Cache Manager.
*/
-public class CacheManager {
+public final class CacheManager {
// Cacher per different Tenant
public static ConcurrentMap<FpcIdentity, CacheManager> cacheInfo = Maps.newConcurrentMap();
@@ -121,8 +121,9 @@
if (tenant.fpcTopology().dpns() != null) {
return tenant.fpcTopology().dpns().stream()
- .filter(dpns -> nodeNetwork.equals(dpns.nodeId() + "/" + dpns.networkId()))
- .findFirst();
+ .filter(dpns -> nodeNetwork.equals(
+ dpns.nodeId() + "/" + dpns.networkId())
+ ).findFirst();
}
}
return Optional.empty();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
index 20868af..6bae6b9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
@@ -1,6 +1,27 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent.util;
-import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.HashMap;
import java.util.Map;
@@ -79,7 +100,7 @@
private Map<String, Object> additionalProperties = new HashMap<>();
/**
- * No args constructor for use in serialization
+ * No args constructor for use in serialization.
*/
public ConfigHelper() {
}
@@ -259,8 +280,8 @@
}
- public void setNodeId(String node_id) {
- this.nodeId = node_id;
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
}
@@ -269,8 +290,8 @@
}
- public void setNetworkId(String network_id) {
- this.networkId = network_id;
+ public void setNetworkId(String networkId) {
+ this.networkId = networkId;
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/Converter.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/Converter.java
index fd4ba4a..c5434ab 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/Converter.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/Converter.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent.util;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.tenant.fpcmobility.DefaultContexts;
@@ -17,11 +33,14 @@
import java.util.Arrays;
import java.util.List;
-public class Converter {
+public final class Converter {
private static final Logger log = LoggerFactory.getLogger(Converter.class);
+ private Converter() {
+ }
+
/**
- * Short to Byte
+ * Short to Byte.
*
* @param value - Short
* @return byte value
@@ -31,7 +50,7 @@
}
/**
- * Short to byte array
+ * Short to byte array.
*
* @param value - Short
* @return byte array
@@ -41,7 +60,7 @@
}
/**
- * Lower two bytes of an integer to byte array
+ * Lower two bytes of an integer to byte array.
*
* @param value - integer value
* @return byte array
@@ -67,13 +86,16 @@
* @return byte array
*/
public static byte[] toUint64(BigInteger value) {
- return new byte[]{value.shiftRight(56).byteValue(), value.shiftRight(48).byteValue(), value.shiftRight(40).byteValue(),
- value.shiftRight(32).byteValue(), value.shiftRight(24).byteValue(), value.shiftRight(16).byteValue(),
- value.shiftRight(8).byteValue(), value.and(BigInteger.valueOf(0xFF)).byteValue()};
+ return new byte[]{
+ value.shiftRight(56).byteValue(), value.shiftRight(48).byteValue(),
+ value.shiftRight(40).byteValue(), value.shiftRight(32).byteValue(),
+ value.shiftRight(24).byteValue(), value.shiftRight(16).byteValue(),
+ value.shiftRight(8).byteValue(), value.and(BigInteger.valueOf(0xFF)).byteValue()
+ };
}
/**
- * Decodes a 32 bit value
+ * Decodes a 32 bit value.
*
* @param source - byte array
* @param offset - offset in the array where the 8 bytes begins
@@ -84,7 +106,7 @@
}
/**
- * Converts a byte array to BigInteger
+ * Converts a byte array to BigInteger.
*
* @param source - byte array
* @param offset - offset in the array where the 8 bytes begins
@@ -95,7 +117,7 @@
}
/**
- * Converts an integer to a long (used for larger unsigned integers)
+ * Converts an integer to a long (used for larger unsigned integers).
*
* @param source - message buffer (byte array)
* @param offset - offset in the array where the 4 bytes begins
@@ -109,7 +131,8 @@
return value;
}
- public static DefaultContexts convertContext(org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.payload.Contexts contexts) {
+ public static DefaultContexts convertContext(org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803
+ .ietfdmmfpcagent.payload.Contexts contexts) {
DefaultContexts ctx = new DefaultContexts();
FpcContextId fpcContextId = contexts.contextId();
List<IpPrefix> ipPrefixes = contexts.delegatingIpPrefixes();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 18a38c9..3180aa7 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -23,20 +23,39 @@
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.Filter;
import org.onosproject.core.IdGenerator;
-import org.onosproject.fpcagent.util.eventStream.ConfigureService;
+import org.onosproject.fpcagent.util.eventstream.ConfigureService;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceStore;
import org.onosproject.restconf.utils.RestconfUtils;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultConfigResultNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.Result;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configresultnotification
+ .value.DefaultConfigResult;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opstatusvalue.OpStatusEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.result.ResultEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbody.resulttype.DefaultEmptyCase;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
-import org.onosproject.yang.model.*;
+import org.onosproject.yang.model.DataNode;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.DefaultResourceData;
+import org.onosproject.yang.model.InnerModelObject;
+import org.onosproject.yang.model.ModelConverter;
+import org.onosproject.yang.model.ModelObject;
+import org.onosproject.yang.model.ModelObjectData;
+import org.onosproject.yang.model.ModelObjectId;
+import org.onosproject.yang.model.ResourceData;
+import org.onosproject.yang.model.ResourceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.List;
@@ -48,16 +67,16 @@
/**
* Helper class which stores all the static variables.
*/
-public class FpcUtil {
+public final class FpcUtil {
public static final String FPC_APP_ID = "org.onosproject.fpcagent";
- public static final FpcIdentity defaultIdentity = FpcIdentity.fromString("default");
- public static final ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
- public static final ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
+ public static final FpcIdentity DEFAULT_IDENTITY;
+ public static final ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> CLIENT_INFO;
+ public static final ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> TENANT_INFO;
private static final Logger log = LoggerFactory.getLogger(FpcUtil.class);
// Services
- public static DynamicConfigService dynamicConfigService = null;
- public static ModelConverter modelConverter = null;
- public static DeviceStore deviceStore = null;
+ public static DynamicConfigService dynamicConfigService;
+ public static ModelConverter modelConverter;
+ public static DeviceStore deviceStore;
// Resource IDs
public static ResourceId configureDpn;
public static ResourceId configure;
@@ -70,6 +89,15 @@
// Notification IDs generator
public static IdGenerator notificationIds;
+ static {
+ DEFAULT_IDENTITY = FpcIdentity.fromString("default");
+ CLIENT_INFO = Maps.newConcurrentMap();
+ TENANT_INFO = Maps.newConcurrentMap();
+ }
+
+ private FpcUtil() {
+ }
+
/**
* Returns resource id from model converter.
*
@@ -84,8 +112,7 @@
}
/**
- * Returns the resource id, after constructing model object id and
- * converting it.
+ * Returns the resource id, after constructing model object id and converting it.
*/
public static void getResourceId() {
ModelObjectId moduleId = ModelObjectId.builder().build();
@@ -148,20 +175,20 @@
}
}
- /**
- * Returns the resource ID of the parent data node pointed by {@code path}.
- *
- * @param path resource ID of the given data node
- * @return resource ID of the parent data node
- */
- public static ResourceId parentOf(ResourceId path) throws Exception {
- try {
- return path.copyBuilder().removeLastKey().build();
- } catch (CloneNotSupportedException e) {
- log.error("Could not copy {}", path, e);
- throw new RuntimeException("Could not copy " + path, e);
- }
- }
+// /**
+// * Returns the resource ID of the parent data node pointed by {@code path}.
+// *
+// * @param path resource ID of the given data node
+// * @return resource ID of the parent data node
+// */
+// public static ResourceId parentOf(ResourceId path) throws Exception {
+// try {
+// return path.copyBuilder().removeLastKey().build();
+// } catch (CloneNotSupportedException e) {
+// log.error("Could not copy {}", path, e);
+// throw new RuntimeException("Could not copy " + path, e);
+// }
+// }
/**
* Get model object for specific tenant.
@@ -195,7 +222,7 @@
}
/**
- * Gets the mapping for node id / network id to ZMQ Topic
+ * Gets the mapping for node id / network id to ZMQ Topic.
*
* @param key - Concatenation of node id + / + network id
* @return ZMQ Topic
@@ -297,6 +324,35 @@
);
}
+ /**
+ * Creates a Config Result Notification for SPGW-C.
+ *
+ * @param opId operation identifier
+ * @param cause cause id
+ * @return config result notification model
+ */
+ public static ModelObject createConfigResultNotification(OpIdentifier opId, short cause) {
+ DefaultConfigResultNotification notify = new DefaultConfigResultNotification();
+
+ notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+ notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+ DefaultConfigResult result = new DefaultConfigResult();
+ result.causeValue(cause);
+ result.opId(opId);
+ result.opStatus(OpStatusEnum.OK);
+ result.result(Result.of(ResultEnum.OK));
+ result.resultType(new DefaultEmptyCase());
+ notify.value(result);
+
+ return notify;
+ }
+
+ /**
+ * Sends to specified CP the notification object.
+ *
+ * @param clientId client identifier (CP)
+ * @param notify notification object
+ */
public static void sendNotification(String clientId, ModelObject notify) {
try {
ResourceData dataNode = modelConverter.createDataNode(
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ConfigureService.java
similarity index 85%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ConfigureService.java
index b9aa3a5..70f3b46 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ConfigureService.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/JettyServer.java
similarity index 74%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/JettyServer.java
index 5b243f6..a06dc47 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/JettyServer.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.eclipse.jetty.server.Server;
@@ -18,7 +27,7 @@
import java.util.concurrent.Executors;
/**
- * SSE Server implementation
+ * SSE Server implementation.
*/
public class JettyServer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
@@ -37,7 +46,7 @@
}
/**
- * Method used to initialize and start the SSE server
+ * Method used to initialize and start the SSE server.
*/
public void open() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NbEventWorkerManager.java
similarity index 66%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NbEventWorkerManager.java
index 879f989..2fbc782 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NbEventWorkerManager.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -21,36 +30,45 @@
import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
/**
- * Implements a Worker to process event-data pairs sent by the FPC Client
+ * Implements a Worker to process event-data pairs sent by the FPC Client.
*/
-public class NBEventWorkerManager implements AutoCloseable {
- private static final Logger log = LoggerFactory.getLogger(NBEventWorkerManager.class);
- private static NBEventWorkerManager _instance = null;
+public class NbEventWorkerManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(NbEventWorkerManager.class);
+ private static NbEventWorkerManager _instance = null;
- public static Map<Integer, String> clientIdToUri = new ConcurrentHashMap<Integer, String>();
+ public static final Map<Integer, String> CLIENT_ID_TO_URI;
private final BlockingQueue<Entry<String, Entry<String, String>>> blockingQueue;
private final int poolSize;
private boolean run;
private final RestconfService restconfService;
- protected NBEventWorkerManager(int poolSize, RestconfService restconfService) {
+ static {
+ CLIENT_ID_TO_URI = new ConcurrentHashMap<>();
+ }
+
+ protected NbEventWorkerManager(int poolSize, RestconfService restconfService) {
this.poolSize = poolSize;
this.restconfService = restconfService;
this.blockingQueue = new LinkedBlockingQueue<>();
}
- public static NBEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
+ public static NbEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
if (_instance == null) {
- _instance = new NBEventWorkerManager(poolSize, restconfService);
+ _instance = new NbEventWorkerManager(poolSize, restconfService);
}
return _instance;
}
- public static NBEventWorkerManager getInstance() {
+ public static NbEventWorkerManager getInstance() {
return _instance;
}
@@ -81,11 +99,13 @@
node,
contents.getKey()
);
- String replace = output.get().output().toString().replace("ietf-dmm-fpcagent:output", "fpc:output");
+ String replace = output.get().output().toString()
+ .replace("ietf-dmm-fpcagent:output", "fpc:output");
ConfigureService.responseQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
- "event:application/json;/onos/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + replace + "\r\n"
+ "event:application/json;/onos/restconf/" +
+ "operations/ietf-dmm-fpcagent:configure\ndata:" + replace + "\r\n"
)
);
} else if (contents.getValue().getKey().contains("fpc:register-client")) {
@@ -98,7 +118,8 @@
ConfigureService.responseQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
- "event:application/json;/onos/restconf/operations/fpc:register-client\ndata:" + output.get().output().toString() + "\r\n"
+ "event:application/json;/onos/restconf/operations/fpc:register-client\ndata:" +
+ output.get().output().toString() + "\r\n"
)
);
} else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
@@ -118,31 +139,35 @@
}
/**
- * Add a FPC client URI to FPC client id mapping
+ * Add a FPC client URI to FPC client id mapping.
*
* @param registerClientOutput - Output of the register_cleint rpc
* @param uri - FPC Client URI
*/
private void addClientIdToUriMapping(String registerClientOutput, String uri) {
try {
- JSONObject registerJSON = new JSONObject(registerClientOutput);
-// log.info("registerJSON: {}", registerJSON);
- clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("fpc:output").getString("client-id")), uri);
+ JSONObject registerjson = new JSONObject(registerClientOutput);
+// log.info("registerjson: {}", registerjson);
+ CLIENT_ID_TO_URI.put(
+ Integer.parseInt(registerjson.getJSONObject("fpc:output").getString("client-id")), uri
+ );
} catch (JSONException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
}
/**
- * Remove a FPC client URI to FPCClient Id mapping
+ * Remove a FPC client URI to FPCClient Id mapping.
*
* @param deregisterClientInput - Input String of deregister_client rpc
*/
private void removeClientIdToUriMapping(String deregisterClientInput) {
try {
- JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
-// log.info("deregisterJSON: {}", deregisterJSON);
- clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
+ JSONObject deregisterjson = new JSONObject(deregisterClientInput);
+// log.info("deregisterjson: {}", deregisterjson);
+ CLIENT_ID_TO_URI.remove(
+ Integer.parseInt(deregisterjson.getJSONObject("input").getString("client-id"))
+ );
} catch (JSONException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NotificationServer.java
similarity index 76%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NotificationServer.java
index 28ba759..80151dd 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/NotificationServer.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -25,7 +34,7 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * A HTTP servlet that handles requests for notification streams
+ * A HTTP servlet that handles requests for notification streams.
*/
public class NotificationServer extends DefaultServlet {
private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
@@ -64,7 +73,8 @@
log.error(ExceptionUtils.getFullStackTrace(e));
}
ServletContext servletContext = request.getServletContext();
- Map<String, AsyncContext> notificationStreams = (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("notificationStreams");
+ Map<String, AsyncContext> notificationStreams =
+ (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("notificationStreams");
notificationStreams.put(clientId, asyncContext);
log.info("Client Id received in the notification stream request: " + clientId);
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ParseStream.java
similarity index 83%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ParseStream.java
index 52eeb07..18634e0 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ParseStream.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -20,9 +29,9 @@
import java.util.concurrent.LinkedBlockingQueue;
/**
- * A class that parses the request stream and enqueues creates event-data pairs for processing
+ * A class that parses the request stream and enqueues creates event-data pairs for processing.
*/
-public class ParseStream implements AutoCloseable {
+public final class ParseStream implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
private CharBuffer charBuf;
@@ -108,7 +117,7 @@
} else if (line.startsWith("data")) {
data = line.split(":", 2)[1];
if (event != null && data != null) {
- NBEventWorkerManager.getInstance()
+ NbEventWorkerManager.getInstance()
.send(
new AbstractMap.SimpleEntry<>(
entry.getKey(),
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/RequestServer.java
similarity index 68%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/RequestServer.java
index 9715053..ecf045e 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/RequestServer.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.http.HttpResponse;
@@ -23,38 +32,38 @@
import java.util.AbstractMap;
/**
- * A HTTP client that sends a request to a FPC Client to initiate the request stream.
+ * A HTTP ASYNC_CLIENT that sends a request to a FPC Client to initiate the request stream.
*/
public class RequestServer {
private static final Logger log = LoggerFactory.getLogger(RequestServer.class);
- private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+ private static final CloseableHttpAsyncClient ASYNC_CLIENT = HttpAsyncClients.createDefault();
protected String clientUri;
/**
- * Send HttpRequest to Client
+ * Send HttpRequest to Client.
*
* @param uri - FPC Client Uri
*/
public void connectToClient(String uri) {
this.clientUri = uri;
try {
- client.start();
+ ASYNC_CLIENT.start();
HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
- client.execute(get, new RequestStreamConsumer(this.clientUri), null);
+ ASYNC_CLIENT.execute(get, new RequestStreamConsumer(this.clientUri), null);
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
}
/**
- * A character consumer to read incoming characters on the request stream
+ * A character consumer to read incoming characters on the request stream.
*/
static class RequestStreamConsumer extends AsyncCharConsumer<Boolean> {
private String clientUri;
/**
- * Constructor
+ * Constructor.
*
* @param clientUri - URI of the FPC Client
*/
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ResponseServer.java
similarity index 75%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ResponseServer.java
index 2218f1b..6490f9b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/ResponseServer.java
@@ -1,11 +1,20 @@
/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ * Copyright 2017-present Open Networking Foundation
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.onosproject.fpcagent.util.eventStream;
+
+package org.onosproject.fpcagent.util.eventstream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -26,14 +35,14 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * A Http Servlet to server response streams
+ * A Http Servlet to server response streams.
*/
public class ResponseServer extends DefaultServlet {
private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
- private static ArrayList<String> clientUriList = new ArrayList<>();
+ private static final ArrayList<String> CLIENT_URI_LIST = new ArrayList<>();
/**
- * Method for stream initialization
+ * Method for stream initialization.
*
* @param clientUri - Client Uri
* @param request - The servlet request object
@@ -58,7 +67,8 @@
log.error(ExceptionUtils.getFullStackTrace(e));
}
ServletContext servletContext = request.getServletContext();
- Map<String, AsyncContext> responseStreams = (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("responseStreams");
+ Map<String, AsyncContext> responseStreams =
+ (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("responseStreams");
responseStreams.put(clientUri, asyncContext);
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
@@ -84,7 +94,7 @@
if (jsonStringBuilder.length() > 0) {
JSONObject jsonObj = new JSONObject(jsonStringBuilder.toString());
clientUri = jsonObj.getString("client-uri");
- if (!clientUriList.contains(clientUri)) {
+ if (!CLIENT_URI_LIST.contains(clientUri)) {
init(clientUri, request, response);
}
jsonStringBuilder.setLength(0);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/package-info.java
similarity index 78%
copy from apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
copy to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/package-info.java
index 8d099f0..d7a9648 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventstream/package-info.java
@@ -14,10 +14,7 @@
* limitations under the License.
*/
-package org.onosproject.fpcagent.providers;
-
-public interface DpnDeviceListener {
- void deviceAdded(String id, byte topic);
-
- void deviceRemoved(String id);
-}
+/**
+ * SSE implementation.
+ */
+package org.onosproject.fpcagent.util.eventstream;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
index b4b4877..7389ff3 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
@@ -14,4 +14,7 @@
* limitations under the License.
*/
+/**
+ * Utility classes.
+ */
package org.onosproject.fpcagent.util;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java
deleted file mode 100644
index 541d7f5..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBPublisherManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.onosproject.fpcagent.workers;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZContext;
-import org.zeromq.ZMQ;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.*;
-
-public class ZMQSBPublisherManager implements AutoCloseable {
- private static final Logger log = LoggerFactory.getLogger(ZMQSBPublisherManager.class);
- private static ZMQSBPublisherManager _instance = null;
- private final ZContext ctx;
- private final String address;
- private final BlockingQueue<ByteBuffer> blockingQueue;
- private final int poolSize;
- private boolean run;
-
- protected ZMQSBPublisherManager(String address, int poolSize) {
- this.ctx = new ZContext();
- this.address = address;
- this.run = true;
- this.blockingQueue = new LinkedBlockingQueue<>();
- this.poolSize = poolSize;
- }
-
- public static ZMQSBPublisherManager createInstance(String address, int poolSize) {
- if (_instance == null) {
- _instance = new ZMQSBPublisherManager(address, poolSize);
- }
- return _instance;
- }
-
- public static ZMQSBPublisherManager getInstance() {
- return _instance;
- }
-
- public void send(ByteBuffer buf) {
- try {
- blockingQueue.put(buf);
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
-
- public void open() {
- ExecutorService executorService = Executors.newFixedThreadPool(this.poolSize);
- executorService.submit(() -> {
- ZMQ.Socket socket = ctx.createSocket(ZMQ.PUB);
- socket.connect(address);
- log.debug("Publisher at {}", address);
- while ((!Thread.currentThread().isInterrupted()) && run) {
- try {
- byte[] array = blockingQueue.take().array();
- socket.send(array);
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
- });
- }
-
- @Override
- public void close() {
- run = false;
- }
-}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbPublisherManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbPublisherManager.java
new file mode 100644
index 0000000..a50d665
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbPublisherManager.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.workers;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ZmqSbPublisherManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(ZmqSbPublisherManager.class);
+ private static ZmqSbPublisherManager _instance = null;
+ private final ZContext ctx;
+ private final String address;
+ private final BlockingQueue<ByteBuffer> blockingQueue;
+ private final int poolSize;
+ private boolean run;
+
+ protected ZmqSbPublisherManager(String address, int poolSize) {
+ this.ctx = new ZContext();
+ this.address = address;
+ this.run = true;
+ this.blockingQueue = new LinkedBlockingQueue<>();
+ this.poolSize = poolSize;
+ }
+
+ public static ZmqSbPublisherManager createInstance(String address, int poolSize) {
+ if (_instance == null) {
+ _instance = new ZmqSbPublisherManager(address, poolSize);
+ }
+ return _instance;
+ }
+
+ public static ZmqSbPublisherManager getInstance() {
+ return _instance;
+ }
+
+ public void send(ByteBuffer buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ public void open() {
+ Executors.newFixedThreadPool(this.poolSize).submit(() -> {
+ ZMQ.Socket socket = ctx.createSocket(ZMQ.PUB);
+ socket.connect(address);
+ log.debug("Publisher at {}", address);
+ while ((!Thread.currentThread().isInterrupted()) && run) {
+ try {
+ byte[] array = blockingQueue.take().array();
+ socket.send(array);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ run = false;
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbSubscriberManager.java
similarity index 67%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbSubscriberManager.java
index 265046c..71ab3ed 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZmqSbSubscriberManager.java
@@ -1,19 +1,37 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onosproject.fpcagent.workers;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.onosproject.core.IdGenerator;
-import org.onosproject.fpcagent.providers.DpnDeviceListener;
+import org.onosproject.fpcagent.providers.ZmqDpnDeviceListener;
import org.onosproject.fpcagent.util.CacheManager;
import org.onosproject.fpcagent.util.FpcUtil;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.*;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configresultnotification.value.DefaultConfigResult;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opstatusvalue.OpStatusEnum;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.result.ResultEnum;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbody.resulttype.DefaultEmptyCase;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify
+ .value.DefaultDownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify
+ .value.DownlinkDataNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
+import org.onosproject.yang.model.ModelObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
@@ -30,13 +48,14 @@
import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
import static org.onosproject.fpcagent.util.Converter.*;
+import static org.onosproject.fpcagent.util.FpcUtil.createConfigResultNotification;
import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
-public class ZMQSBSubscriberManager implements AutoCloseable {
- private static final Logger log = LoggerFactory.getLogger(ZMQSBSubscriberManager.class);
- private static int MIN_TOPIC_VAL = 4;
- private static int MAX_TOPIC_VAL = 255;
- private static ZMQSBSubscriberManager _instance = null;
+public class ZmqSbSubscriberManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(ZmqSbSubscriberManager.class);
+ private static final int MIN_TOPIC_VAL = 4;
+ private static final int MAX_TOPIC_VAL = 255;
+ private static ZmqSbSubscriberManager _instance = null;
private final String address;
private final String nodeId;
private final String networkId;
@@ -51,27 +70,30 @@
private Future<?> broadcastTopicWorker;
private Future<?> generalWorker;
- private DpnDeviceListener dpnDeviceListener;
+ private ZmqDpnDeviceListener zmqDpnDeviceListener;
- protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener, IdGenerator notificationIds) {
+ protected ZmqSbSubscriberManager(String address, String nodeId, String networkId,
+ ZmqDpnDeviceListener zmqDpnDeviceListener, IdGenerator notificationIds) {
this.address = address;
this.run = true;
this.nodeId = nodeId;
this.networkId = networkId;
this.conflictingTopic = false;
this.controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
- this.dpnDeviceListener = dpnDeviceListener;
+ this.zmqDpnDeviceListener = zmqDpnDeviceListener;
this.notificationIds = notificationIds;
}
- public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService, IdGenerator notificationIds) {
+ public static ZmqSbSubscriberManager createInstance(String address, String nodeId, String networkId,
+ ZmqDpnDeviceListener providerService,
+ IdGenerator notificationIds) {
if (_instance == null) {
- _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService, notificationIds);
+ _instance = new ZmqSbSubscriberManager(address, nodeId, networkId, providerService, notificationIds);
}
return _instance;
}
- public static ZMQSBSubscriberManager getInstance() {
+ public static ZmqSbSubscriberManager getInstance() {
return _instance;
}
@@ -85,10 +107,10 @@
public void open() {
broadcastAllWorker = Executors.newSingleThreadExecutor()
- .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_ALL.getType()));
+ .submit(new ZmqSubscriberWorker(ReservedTopics.BROADCAST_ALL.getType()));
broadcastControllersWorker = Executors.newSingleThreadExecutor()
- .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_CONTROLLERS.getType()));
+ .submit(new ZmqSubscriberWorker(ReservedTopics.BROADCAST_CONTROLLERS.getType()));
broadcastTopicWorker = Executors.newSingleThreadExecutor()
.submit(new AssignTopic());
@@ -96,17 +118,17 @@
@Override
public void close() {
- send_goodbye_dpns(nodeId, networkId);
+ sendGoodbyeDpns(nodeId, networkId);
run = false;
}
/**
- * Interrupts the BroadcastTopicworker if there is an Assign topic Conflict
+ * Interrupts the BroadcastTopicworker if there is an Assign topic Conflict.
*
* @param conflict - Flag to indicate conflict
* @param subId - Topic Id that caused the conflict
*/
- protected void BroadcastAllSubIdCallBack(boolean conflict, byte subId) {
+ protected void broadcastAllSubIdCallBack(boolean conflict, byte subId) {
if (conflict && controllerTopic == subId) {
this.conflictingTopic = true;
broadcastTopicWorker.cancel(true);
@@ -114,33 +136,35 @@
}
/**
- * Broadcasts an Assign Conflict message
+ * Broadcasts an Assign Conflict message.
*
* @param contents - byte array received over the southbound.
*/
- protected void SendAssignConflictMessage(byte[] contents) {
+ protected void sendAssignConflictMessage(byte[] contents) {
byte topic = contents[2];
short nodeIdLen = contents[7];
short networkIdLen = contents[8 + nodeIdLen];
- String node_id = new String(Arrays.copyOfRange(contents, 8, 8 + nodeIdLen));
- String network_id = new String(Arrays.copyOfRange(contents, 9 + nodeIdLen, 9 + nodeIdLen + networkIdLen));
+ String nodeId = new String(Arrays.copyOfRange(contents, 8, 8 + nodeIdLen));
+ String networkId = new String(
+ Arrays.copyOfRange(contents, 9 + nodeIdLen, 9 + nodeIdLen + networkIdLen)
+ );
- if (controllerTopic == topic || (nodeId.equals(node_id) && networkId.equals(network_id))) {
- send_assign_conflict(nodeId, networkId);
+ if (controllerTopic == topic || (this.nodeId.equals(nodeId) && this.networkId.equals(networkId))) {
+ sendAssignConflict(this.nodeId, this.networkId);
}
}
- protected class ZMQSubscriberWorker implements Runnable {
+ protected class ZmqSubscriberWorker implements Runnable {
private final byte subscribedTopic;
private ZContext ctx;
- ZMQSubscriberWorker(byte subscribedTopic) {
+ ZmqSubscriberWorker(byte subscribedTopic) {
this.subscribedTopic = subscribedTopic;
this.ctx = new ZContext();
}
/**
- * Ensures the session id is an unsigned 64 bit integer
+ * Ensures the session id is an unsigned 64 bit integer.
*
* @param sessionId - session id received from the DPN
* @return unsigned session id
@@ -153,13 +177,13 @@
}
/**
- * Decodes a DownlinkDataNotification
+ * Decodes a DownlinkDataNotification.
*
* @param buf - message buffer
* @param key - Concatenation of node id + / + network id
* @return DownlinkDataNotification or null if it could not be successfully decoded
*/
- private DownlinkDataNotification processDDN(byte[] buf, String key) {
+ private DownlinkDataNotification processDdn(byte[] buf, String key) {
DownlinkDataNotification ddn = new DefaultDownlinkDataNotification();
ddn.sessionId(checkSessionId(toBigInt(buf, 2)));
ddn.notificationMessageType("Downlink-Data-Notification");
@@ -181,18 +205,21 @@
public Map.Entry<Object, Object> decode(byte[] buf) {
// uint8_t topic_id;
// uint8_t type;
- s11MsgType type;
- type = s11MsgType.getEnum(buf[1]);
- if (type.equals(s11MsgType.DDN)) {
+ S11MsgType type;
+ type = S11MsgType.getEnum(buf[1]);
+ if (type.equals(S11MsgType.DDN)) {
// uint64_t session_id;
// uint32_t client_id;
// uint32_t op_id;
// uint8_t node_network_id_buffer[NN_ID_BUF_LEN];
short nodeIdLen = buf[18];
short networkIdLen = buf[19 + nodeIdLen];
- String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
- return new AbstractMap.SimpleEntry<>(processDDN(buf, key), null);
- } else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
+ String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen))
+ + "/" + new String(
+ Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen)
+ );
+ return new AbstractMap.SimpleEntry<>(processDdn(buf, key), null);
+ } else if (type.equals(S11MsgType.DPN_STATUS_INDICATION)) {
// uint8_t source_topic_id;
// uint8_t status;
// uint32_t source;
@@ -200,36 +227,30 @@
DpnStatusIndication status;
short nodeIdLen = buf[8];
short networkIdLen = buf[9 + nodeIdLen];
- String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
+ String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/"
+ + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
status = DpnStatusIndication.getEnum(buf[3]);
- log.info("topic {}, status {}, source {}, node/net {}", (short) buf[2], status, fromIntToLong(buf, 4), deviceId);
+ log.info("topic {}, status {}, source {}, node/net {}", (short) buf[2], status,
+ fromIntToLong(buf, 4), deviceId);
if (status.equals(DpnStatusIndication.HELLO)) {
log.info("Hello {} on topic {}", deviceId, buf[2]);
- dpnDeviceListener.deviceAdded(deviceId, buf[2]);
+ zmqDpnDeviceListener.deviceAdded(deviceId, buf[2]);
} else if (status.equals(DpnStatusIndication.GOODBYE)) {
log.info("Bye {}", deviceId);
- dpnDeviceListener.deviceRemoved(deviceId);
+ zmqDpnDeviceListener.deviceRemoved(deviceId);
}
return new AbstractMap.SimpleEntry<>(status, deviceId);
- } else if (type.equals(s11MsgType.DPN_RESPONSE)) {
+ } else if (type.equals(S11MsgType.DPN_RESPONSE)) {
// uint8_t cause;
// uint32_t client_id;
// uint32_t op_id;
- ClientIdentifier clientId = ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 3))));
+ ClientIdentifier clientId = ClientIdentifier.of(
+ FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 3)))
+ );
OpIdentifier opId = OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 7)));
- log.info("cause {}, clientId {}, opId {}", (short) buf[2], clientId.toString(), opId.toString());
- DefaultConfigResultNotification notify = new DefaultConfigResultNotification();
- notify.notificationId(NotificationId.of(notificationIds.getNewId()));
- notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
- DefaultConfigResult result = new DefaultConfigResult();
- result.causeValue(buf[2]);
- result.opId(opId);
- result.opStatus(OpStatusEnum.OK);
- result.result(Result.of(ResultEnum.OK));
- result.resultType(new DefaultEmptyCase());
- notify.value(result);
-
- sendNotification(clientId.toString(), notify);
+ log.info("clientId {}, opId {}, cause {}", clientId.toString(), opId.toString(), (short) buf[2]);
+ ModelObject notification = createConfigResultNotification(opId, buf[2]);
+ sendNotification(clientId.toString(), notification);
}
return null;
}
@@ -243,16 +264,16 @@
while ((!Thread.currentThread().isInterrupted()) && run) {
byte[] contents = subscriber.recv();
byte topic = contents[0];
- s11MsgType messageType = s11MsgType.getEnum(contents[1]);
+ S11MsgType messageType = S11MsgType.getEnum(contents[1]);
log.info("Received {}", messageType);
switch (topic) {
case 1:
- if (messageType.equals(s11MsgType.ASSIGN_CONFLICT) &&
+ if (messageType.equals(S11MsgType.ASSIGN_CONFLICT) &&
toInt(contents, 3) != controllerSourceId) {
- BroadcastAllSubIdCallBack(true, contents[2]);
- } else if (messageType.equals(s11MsgType.ASSIGN_TOPIC) &&
+ broadcastAllSubIdCallBack(true, contents[2]);
+ } else if (messageType.equals(S11MsgType.ASSIGN_TOPIC) &&
toInt(contents, 3) != controllerSourceId) {
- SendAssignConflictMessage(contents);
+ sendAssignConflictMessage(contents);
}
break;
default:
@@ -270,7 +291,7 @@
if (key.equals(DpnStatusIndication.HELLO)) {
byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());
if (dpnTopic != -1) {
- send_status_ack(nodeId, networkId, dpnTopic);
+ sendStatusAck(nodeId, networkId, dpnTopic);
}
}
}
@@ -289,7 +310,7 @@
}
/**
- * Class to broadcast a topic for the controller
+ * Class to broadcast a topic for the controller.
*/
protected class AssignTopic implements Runnable {
private byte topic;
@@ -301,7 +322,7 @@
@Override
public void run() {
try {
- send_assign_topic(nodeId, networkId, this.topic);
+ sendAssignTopic(nodeId, networkId, this.topic);
log.debug("Thread sleeping: " + Thread.currentThread().getName());
Thread.sleep(2000); // wait 10 sec before assigning topic
} catch (InterruptedException e) {
@@ -316,7 +337,7 @@
}
}
controllerTopic = this.topic;
- generalWorker = Executors.newSingleThreadExecutor().submit(new ZMQSubscriberWorker(this.topic));
+ generalWorker = Executors.newSingleThreadExecutor().submit(new ZmqSubscriberWorker(this.topic));
try {
Thread.sleep(2000);
@@ -324,7 +345,7 @@
log.error(ExceptionUtils.getFullStackTrace(e));
}
- send_hello_dpns(nodeId, networkId);
+ sendHelloDpns(nodeId, networkId);
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
index f6a5857..e310b23 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
@@ -14,4 +14,7 @@
* limitations under the License.
*/
+/**
+ * ZMQ Publisher and Subscriber workers.
+ */
package org.onosproject.fpcagent.workers;
\ No newline at end of file