added DPNs as devices
diff --git a/apps/fpcagent/fpcagent.json b/apps/fpcagent/fpcagent.json
index 6b26cbf..c614015 100644
--- a/apps/fpcagent/fpcagent.json
+++ b/apps/fpcagent/fpcagent.json
@@ -6,10 +6,7 @@
"dpn-publisher-uri": "tcp://*:5555",
"dpn-client-threads": 5,
"node-id": "node0",
- "network-id": "network1",
- "zmq-broadcast-all": "1",
- "zmq-broadcast-controllers": "2",
- "zmq-broadcast-dpns": "3"
+ "network-id": "network1"
}
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 7929e59..ba2f989 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -16,10 +16,13 @@
package org.onosproject.fpcagent;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.*;
import org.onosproject.config.DynamicConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.fpcagent.providers.DpnProviderService;
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.util.ConfigHelper;
import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
@@ -30,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
import java.util.Optional;
import static org.onosproject.fpcagent.util.FpcUtil.FPC_APP_ID;
@@ -68,9 +72,13 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CoreService coreService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DpnProviderService dpnProviderService;
+
/* Variables */
private FpcConfig fpcConfig;
private boolean started = false;
+ private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
/* Config */
private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -88,8 +96,7 @@
configService.addListener(configListener);
registry.registerConfigFactory(fpcConfigConfigFactory);
-
- log.info("FPC Agent Started");
+ log.info("FPC Service Started");
}
@Deactivate
@@ -100,9 +107,10 @@
if (started) {
ZMQSBSubscriberManager.getInstance().close();
ZMQSBPublisherManager.getInstance().close();
+ started = false;
}
- log.info("FPC Agent Stopped");
+ log.info("FPC Servicea Stopped");
}
/**
@@ -114,11 +122,9 @@
started = true;
ZMQSBSubscriberManager.createInstance(
helper.dpnSubscriberUri(),
- helper.zmqBroadcastAll(),
- helper.zmqBroadcastControllers(),
- helper.zmqBroadcastDpns(),
helper.nodeId(),
- helper.networkId()
+ helper.networkId(),
+ dpnProviderService.getListener()
);
ZMQSBPublisherManager.createInstance(
@@ -133,6 +139,16 @@
}
@Override
+ public void addListener(DpnDeviceListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(DpnDeviceListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
public Optional<ConfigHelper> getConfig() {
return fpcConfig != null ? fpcConfig.getConfig() : Optional.empty();
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index fd10302..61aa4bd 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -16,7 +16,6 @@
package org.onosproject.fpcagent;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -28,7 +27,9 @@
import org.onosproject.fpcagent.protocols.DpnCommunicationService;
import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
import org.onosproject.fpcagent.protocols.DpnP4Communicator;
-import org.onosproject.fpcagent.util.*;
+import org.onosproject.fpcagent.util.CacheManager;
+import org.onosproject.fpcagent.util.FpcUtil;
+import org.onosproject.net.device.DeviceStore;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.DefaultConnectionInfo;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.ZmqDpnControlProtocol;
@@ -78,6 +79,8 @@
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
@@ -104,24 +107,30 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private RpcRegistry registry;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DeviceStore deviceStore;
+
private ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
+ // FIXME configurable
+ private ExecutorService executorService = Executors.newFixedThreadPool(25);
@Activate
protected void activate() {
init();
registry.registerRpcService(this);
- log.info("Tenant Service Started");
+ log.info("FPC RPC Service Started");
}
@Deactivate
protected void deactivate() {
registry.unregisterRpcService(this);
- log.info("Tenant Service Stopped");
+ log.info("FPC RPC Service Stopped");
}
private void init() {
FpcUtil.modelConverter = modelConverter;
FpcUtil.dynamicConfigService = dynamicConfigService;
+ FpcUtil.deviceStore = deviceStore;
getResourceId();
// Create the Default Tenant and added to the Tenants structure.
@@ -202,8 +211,8 @@
}
// get DPN Topic from Node/Network pair
- Short topic_id = getTopicFromNode(key.get());
- if (topic_id == null) {
+ byte topic_id = getTopicFromNode(key.get());
+ if (topic_id == -1) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -361,8 +370,8 @@
}
// get DPN Topic from Node/Network pair
- Short topic_id = getTopicFromNode(key.get());
- if (topic_id == null) {
+ byte topic_id = getTopicFromNode(key.get());
+ if (topic_id == -1) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -504,8 +513,8 @@
}
// find DPN Topic from Node/Network ID pair.
- Short topic_id = getTopicFromNode(key.get());
- if (topic_id == null) {
+ byte topic_id = getTopicFromNode(key.get());
+ if (topic_id == -1) {
throw new RuntimeException("Could not find Topic ID");
}
@@ -659,265 +668,282 @@
@Override
public RpcOutput configureDpn(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- DefaultConfigureDpnOutput configureDpnOutput = new DefaultConfigureDpnOutput();
- configureDpnOutput.result(Result.of(ResultEnum.OK));
- configureDpnOutput.resultType(new DefaultEmptyCase());
- RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
- try {
- for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureDpn)) {
- DefaultConfigureDpnInput input = (DefaultConfigureDpnInput) modelObject;
- switch (input.operation().enumeration()) {
- case ADD:
- configureDpnOutput = configureDpnAdd(input);
- break;
- case REMOVE:
- configureDpnOutput = configureDpnRemove(input);
- break;
+ return CompletableFuture.supplyAsync(() -> {
+ Instant start = Instant.now();
+ DefaultConfigureDpnOutput configureDpnOutput = new DefaultConfigureDpnOutput();
+ configureDpnOutput.result(Result.of(ResultEnum.OK));
+ configureDpnOutput.resultType(new DefaultEmptyCase());
+ RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+ try {
+ for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureDpn)) {
+ DefaultConfigureDpnInput input = (DefaultConfigureDpnInput) modelObject;
+ switch (input.operation().enumeration()) {
+ case ADD:
+ configureDpnOutput = configureDpnAdd(input);
+ break;
+ case REMOVE:
+ configureDpnOutput = configureDpnRemove(input);
+ break;
+ }
}
+ } catch (Exception e) {
+ status = RpcOutput.Status.RPC_FAILURE;
+ org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr defaultErr = new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr();
+ defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
+ defaultErr.errorTypeId(ErrorTypeId.of(0));
+ configureDpnOutput.resultType(defaultErr);
+ configureDpnOutput.result(Result.of(ResultEnum.ERR));
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (Exception e) {
- status = RpcOutput.Status.RPC_FAILURE;
- org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr defaultErr = new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr();
- defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
- defaultErr.errorTypeId(ErrorTypeId.of(0));
- configureDpnOutput.resultType(defaultErr);
- configureDpnOutput.result(Result.of(ResultEnum.ERR));
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(configureDpnOutput)
- .build()
- );
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
- return new RpcOutput(status, dataNode.dataNodes().get(0));
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(configureDpnOutput)
+ .build()
+ );
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+ return new RpcOutput(status, dataNode.dataNodes().get(0));
+ }, executorService).join();
}
@Override
public RpcOutput configure(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
- RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
- try {
- for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
- DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
- throw new RuntimeException("Client Identifier is not registered.");
- }
- switch (input.opType()) {
- case CREATE:
- configureOutput = configureCreate(
- (CreateOrUpdate) input.opBody(),
- clientInfo.get(input.clientId()),
- input.opId()
- );
- break;
- case UPDATE:
- configureOutput = configureUpdate(
- (CreateOrUpdate) input.opBody(),
- clientInfo.get(input.clientId()),
- input.opId()
- );
- break;
- case QUERY:
- break;
- case DELETE:
- configureOutput = configureDelete(
- (DeleteOrQuery) input.opBody(),
- clientInfo.get(input.clientId()),
- input.opId()
- );
- break;
- }
- configureOutput.opId(input.opId());
- }
- } catch (Exception e) {
- // if there is an exception respond with an error.
- status = RpcOutput.Status.RPC_FAILURE;
- DefaultErr defaultErr = new DefaultErr();
- defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
- defaultErr.errorTypeId(ErrorTypeId.of(0));
- configureOutput.resultType(defaultErr);
- configureOutput.result(Result.of(ResultEnum.ERR));
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(configureOutput)
- .build()
- );
- log.info("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
- return new RpcOutput(status, dataNode.dataNodes().get(0));
- }
-
- @Override
- public RpcOutput configureBundles(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- DefaultConfigureBundlesOutput configureBundlesOutput = new DefaultConfigureBundlesOutput();
- RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
- try {
- for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
- DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
- throw new RuntimeException("Client Identifier is not registered.");
- }
- for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
- DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
- switch (bundle.opType()) {
+ return CompletableFuture.supplyAsync(() -> {
+ Instant start = Instant.now();
+ DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
+ RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+ try {
+ for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
+ DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
+ if (!clientInfo.containsKey(input.clientId())) {
+ throw new RuntimeException("Client Identifier is not registered.");
+ }
+ switch (input.opType()) {
case CREATE:
configureOutput = configureCreate(
- (CreateOrUpdate) bundle.opBody(),
+ (CreateOrUpdate) input.opBody(),
clientInfo.get(input.clientId()),
- bundle.opId()
+ input.opId()
);
break;
case UPDATE:
configureOutput = configureUpdate(
- (CreateOrUpdate) bundle.opBody(),
+ (CreateOrUpdate) input.opBody(),
clientInfo.get(input.clientId()),
- bundle.opId()
+ input.opId()
);
break;
case QUERY:
break;
case DELETE:
configureOutput = configureDelete(
- (DeleteOrQuery) bundle.opBody(),
+ (DeleteOrQuery) input.opBody(),
clientInfo.get(input.clientId()),
- bundle.opId()
+ input.opId()
);
break;
}
- Bundles result = new DefaultBundles();
- result.opId(bundle.opId());
- result.result(configureOutput.result());
- result.resultType(configureOutput.resultType());
- configureBundlesOutput.addToBundles(result);
+ configureOutput.opId(input.opId());
}
+ } catch (Exception e) {
+ // if there is an exception respond with an error.
+ status = RpcOutput.Status.RPC_FAILURE;
+ DefaultErr defaultErr = new DefaultErr();
+ defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
+ defaultErr.errorTypeId(ErrorTypeId.of(0));
+ configureOutput.resultType(defaultErr);
+ configureOutput.result(Result.of(ResultEnum.ERR));
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (Exception e) {
- // if there is an exception respond with an error.
- status = RpcOutput.Status.RPC_FAILURE;
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(configureBundlesOutput)
- .build()
- );
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
- return new RpcOutput(status, dataNode.dataNodes().get(0));
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(configureOutput)
+ .build()
+ );
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+ return new RpcOutput(status, dataNode.dataNodes().get(0));
+ }, executorService).join();
+ }
+
+ @Override
+ public RpcOutput configureBundles(RpcInput rpcInput) {
+ return CompletableFuture.supplyAsync(() -> {
+ Instant start = Instant.now();
+ DefaultConfigureBundlesOutput configureBundlesOutput = new DefaultConfigureBundlesOutput();
+ RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+ try {
+ for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
+ DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
+ if (!clientInfo.containsKey(input.clientId())) {
+ throw new RuntimeException("Client Identifier is not registered.");
+ }
+ for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
+ DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
+ switch (bundle.opType()) {
+ case CREATE:
+ configureOutput = configureCreate(
+ (CreateOrUpdate) bundle.opBody(),
+ clientInfo.get(input.clientId()),
+ bundle.opId()
+ );
+ break;
+ case UPDATE:
+ configureOutput = configureUpdate(
+ (CreateOrUpdate) bundle.opBody(),
+ clientInfo.get(input.clientId()),
+ bundle.opId()
+ );
+ break;
+ case QUERY:
+ break;
+ case DELETE:
+ configureOutput = configureDelete(
+ (DeleteOrQuery) bundle.opBody(),
+ clientInfo.get(input.clientId()),
+ bundle.opId()
+ );
+ break;
+ }
+ Bundles result = new DefaultBundles();
+ result.opId(bundle.opId());
+ result.result(configureOutput.result());
+ result.resultType(configureOutput.resultType());
+ configureBundlesOutput.addToBundles(result);
+ }
+ }
+ } catch (Exception e) {
+ // if there is an exception respond with an error.
+ status = RpcOutput.Status.RPC_FAILURE;
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(configureBundlesOutput)
+ .build()
+ );
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+ return new RpcOutput(status, dataNode.dataNodes().get(0));
+ }, executorService).join();
}
@Override
public RpcOutput eventRegister(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- // TODO implement
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+ Instant start = Instant.now();
+ CompletableFuture.supplyAsync(() -> {
+ // TODO implement
+ return null;
+ }, executorService);
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
return null;
}
@Override
public RpcOutput eventDeregister(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- // TODO implement
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+ Instant start = Instant.now();
+ CompletableFuture.supplyAsync(() -> {
+ // TODO implement
+ return null;
+ }, executorService);
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
return null;
}
@Override
public RpcOutput probe(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- // TODO implement
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+ Instant start = Instant.now();
+ CompletableFuture.supplyAsync(() -> {
+ // TODO implement
+ return null;
+ }, executorService);
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
return null;
}
@Override
public RpcOutput registerClient(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- DefaultRegisterClientOutput registerClientOutput = new DefaultRegisterClientOutput();
- RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
- try {
- for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
- DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- if (clientInfo.containsKey(input.clientId())) {
- throw new RuntimeException("Client already registered.");
+ return CompletableFuture.supplyAsync(() -> {
+ Instant start = Instant.now();
+ DefaultRegisterClientOutput registerClientOutput = new DefaultRegisterClientOutput();
+ RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+ try {
+ for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
+ DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
+ if (clientInfo.containsKey(input.clientId())) {
+ throw new RuntimeException("Client already registered.");
+ }
+ clientInfo.put(input.clientId(), input);
+ registerClientOutput.clientId(input.clientId());
+ registerClientOutput.supportedFeatures(input.supportedFeatures());
+ registerClientOutput.endpointUri(input.endpointUri());
+ registerClientOutput.supportsAckModel(input.supportsAckModel());
+ registerClientOutput.tenantId(input.tenantId());
+
+ DefaultConnections defaultConnections = new DefaultConnections();
+ defaultConnections.clientId(input.clientId().toString());
+
+ ModelObjectId modelObjectId = ModelObjectId.builder()
+ .addChild(DefaultConnectionInfo.class)
+ .build();
+ createNode(defaultConnections, modelObjectId);
}
- clientInfo.put(input.clientId(), input);
- registerClientOutput.clientId(input.clientId());
- registerClientOutput.supportedFeatures(input.supportedFeatures());
- registerClientOutput.endpointUri(input.endpointUri());
- registerClientOutput.supportsAckModel(input.supportsAckModel());
- registerClientOutput.tenantId(input.tenantId());
-
- DefaultConnections defaultConnections = new DefaultConnections();
- defaultConnections.clientId(input.clientId().toString());
-
- ModelObjectId modelObjectId = ModelObjectId.builder()
- .addChild(DefaultConnectionInfo.class)
- .build();
- createNode(defaultConnections, modelObjectId);
+ } catch (Exception e) {
+ // if there is an exception respond with an error.
+ status = RpcOutput.Status.RPC_FAILURE;
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (Exception e) {
- // if there is an exception respond with an error.
- status = RpcOutput.Status.RPC_FAILURE;
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(registerClientOutput)
- .build()
- );
-
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
- return new RpcOutput(status, dataNode.dataNodes().get(0));
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(registerClientOutput)
+ .build()
+ );
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+ return new RpcOutput(status, dataNode.dataNodes().get(0));
+ }).join();
}
@Override
public RpcOutput deregisterClient(RpcInput rpcInput) {
- Stopwatch timer = Stopwatch.createStarted();
- DefaultDeregisterClientOutput deregisterClientOutput = new DefaultDeregisterClientOutput();
- RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+ return CompletableFuture.supplyAsync(() -> {
+ Instant start = Instant.now();
+ DefaultDeregisterClientOutput deregisterClientOutput = new DefaultDeregisterClientOutput();
+ RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
- try {
- for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
- DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- if (!clientInfo.containsKey(input.clientId())) {
- throw new RuntimeException("Client does not exist.");
+ try {
+ for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
+ DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
+ if (!clientInfo.containsKey(input.clientId())) {
+ throw new RuntimeException("Client does not exist.");
+ }
+ clientInfo.remove(input.clientId());
+ deregisterClientOutput.clientId(input.clientId());
+
+ DefaultConnections defaultConnections = new DefaultConnections();
+ defaultConnections.clientId(input.clientId().toString());
+
+ ConnectionsKeys connectionsKeys = new ConnectionsKeys();
+ connectionsKeys.clientId(input.clientId().toString());
+
+ ResourceId resourceVal = getResourceVal(ModelObjectId.builder()
+ .addChild(DefaultConnectionInfo.class)
+ .addChild(DefaultConnections.class, connectionsKeys)
+ .build());
+
+ dynamicConfigService.deleteNode(resourceVal);
}
- clientInfo.remove(input.clientId());
- deregisterClientOutput.clientId(input.clientId());
-
- DefaultConnections defaultConnections = new DefaultConnections();
- defaultConnections.clientId(input.clientId().toString());
-
- ConnectionsKeys connectionsKeys = new ConnectionsKeys();
- connectionsKeys.clientId(input.clientId().toString());
-
- ResourceId resourceVal = getResourceVal(ModelObjectId.builder()
- .addChild(DefaultConnectionInfo.class)
- .addChild(DefaultConnections.class, connectionsKeys)
- .build());
-
- dynamicConfigService.deleteNode(resourceVal);
+ } catch (Exception e) {
+ // if there is an exception respond with an error.
+ status = RpcOutput.Status.RPC_FAILURE;
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (Exception e) {
- // if there is an exception respond with an error.
- status = RpcOutput.Status.RPC_FAILURE;
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(deregisterClientOutput)
- .build()
- );
-
- log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
- return new RpcOutput(status, dataNode.dataNodes().get(0));
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(deregisterClientOutput)
+ .build()
+ );
+ log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+ return new RpcOutput(status, dataNode.dataNodes().get(0));
+ }, executorService).join();
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
index 61aa6cf..6adc0bc 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
@@ -1,10 +1,15 @@
package org.onosproject.fpcagent;
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.util.ConfigHelper;
import java.util.Optional;
public interface FpcService {
+ void addListener(DpnDeviceListener listener);
+
+ void removeListener(DpnDeviceListener listener);
+
Optional<ConfigHelper> getConfig();
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
index 7405169..17df670 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
@@ -27,8 +27,7 @@
/**
* Creates Mobility Session.
- *
- * @param topic_id - DPN Topic ID
+ * @param topic_id - DPN Topic ID
* @param imsi - IMSI identifier
* @param default_ebi - EBI
* @param ue_ipv4 - UE IPv4 Address
@@ -39,7 +38,7 @@
* @param op_id - Operation Identifier
*/
void create_session(
- Short topic_id,
+ byte topic_id,
BigInteger imsi,
Short default_ebi,
Ip4Address ue_ipv4,
@@ -52,8 +51,7 @@
/**
* Modifies Bearer.
- *
- * @param topic_id - DPN Topic ID
+ * @param topic_id - DPN Topic ID
* @param s1u_sgw_ipv4 - SGW IPv4 Address
* @param s1u_enodeb_teid - ENodeB Tunnel Identifier
* @param s1u_enodeb_ipv4 - ENodeB IPv4 Address
@@ -62,7 +60,7 @@
* @param op_id - Operation Identifier
*/
void modify_bearer(
- Short topic_id,
+ byte topic_id,
Ip4Address s1u_sgw_ipv4,
Long s1u_enodeb_teid,
Ip4Address s1u_enodeb_ipv4,
@@ -73,14 +71,13 @@
/**
* Deletes Mobility Session.
- *
- * @param topic_id - DPN Topic ID
+ * @param topic_id - DPN Topic ID
* @param session_id - Context Identifier
* @param client_id - Client Identifier
* @param op_id - Operation Identifier
*/
void delete_session(
- Short topic_id,
+ byte topic_id,
Long session_id,
Long client_id,
BigInteger op_id
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
index 7740ec6..78f1171 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
@@ -26,6 +26,8 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Optional;
import static org.onosproject.fpcagent.util.Converter.*;
@@ -35,9 +37,92 @@
public class DpnNgicCommunicator extends ZmqDpnControlProtocol implements DpnCommunicationService {
protected static final Logger log = LoggerFactory.getLogger(DpnNgicCommunicator.class);
+ /**
+ * Broadcasts the GOODBYE message to all the DPNs
+ */
+ public static void send_goodbye_dpns(String nodeId, String networkId) {
+ ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
+ bb.put(ReservedTopics.BROADCAST_DPNS.getType())
+ .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(ControllerStatusIndication.GOODBYE.getType())
+ .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint8((short) nodeId.length()))
+ .put(nodeId.getBytes())
+ .put(toUint8((short) networkId.length()))
+ .put(networkId.getBytes());
+
+ log.info("send_goodbye_dpns: {}", bb.array());
+ ZMQSBPublisherManager.getInstance().send(bb);
+ }
+
+ /**
+ * Broadcasts the HELLO message to all the DPNs
+ */
+ public static void send_hello_dpns(String nodeId, String networkId) {
+ ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
+ bb.put(ReservedTopics.BROADCAST_DPNS.getType())
+ .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(ControllerStatusIndication.HELLO.getType())
+ .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint8((short) nodeId.length()))
+ .put(nodeId.getBytes())
+ .put(toUint8((short) networkId.length()))
+ .put(networkId.getBytes());
+
+ log.info("send_hello_dpns: {}", bb.array());
+ ZMQSBPublisherManager.getInstance().send(bb);
+ }
+
+ public static void send_assign_conflict(String nodeId, String networkId) {
+ ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
+ bb.put(ReservedTopics.BROADCAST_ALL.getType())
+ .put(s11MsgType.ASSIGN_CONFLICT.getType())
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint8((short) nodeId.length()))
+ .put(nodeId.getBytes())
+ .put(toUint8((short) networkId.length()))
+ .put(networkId.getBytes());
+
+ log.info("send_assign_conflict: {}", bb.array());
+ ZMQSBPublisherManager.getInstance().send(bb);
+ }
+
+ public static void send_assign_topic(String nodeId, String networkId, byte topic) {
+ ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
+ bb.put(ReservedTopics.BROADCAST_ALL.getType())
+ .put(s11MsgType.ASSIGN_TOPIC.getType())
+ .put(topic)
+ .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint8((short) nodeId.length()))
+ .put(nodeId.getBytes())
+ .put(toUint8((short) networkId.length()))
+ .put(networkId.getBytes());
+
+ log.info("send_assign_topic: {}", bb.array());
+ ZMQSBPublisherManager.getInstance().send(bb);
+ }
+
+ public static void send_status_ack(String nodeId, String networkId, byte topic) {
+ ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
+ .put(topic)
+ .put(s11MsgType.DPN_STATUS_ACK.getType())
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+ .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+ .put(toUint8((short) nodeId.length()))
+ .put(nodeId.getBytes())
+ .put(toUint8((short) networkId.length()))
+ .put(networkId.getBytes());
+
+ log.info("send_status_ack: {}", bb.array());
+ ZMQSBPublisherManager.getInstance().send(bb);
+ }
+
@Override
public void create_session(
- Short topic_id,
+ byte topic_id,
BigInteger imsi,
Short default_ebi,
Ip4Address ue_ipv4,
@@ -64,7 +149,7 @@
*/
// TODO: check if subscriber is open.
ByteBuffer bb = ByteBuffer.allocate(41)
- .put(toUint8(topic_id))
+ .put(topic_id)
.put(s11MsgType.CREATE_SESSION.getType())
.put(toUint64(imsi))
.put(toUint8(default_ebi))
@@ -72,7 +157,7 @@
.put(toUint32(s1u_sgw_teid))
.put(toUint32(s1u_sgw_ipv4.toInt()))
.put(toUint64(BigInteger.valueOf(session_id)))
- .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
.put(toUint32(client_id))
.put(toUint32(op_id.longValue()));
@@ -82,7 +167,7 @@
@Override
public void modify_bearer(
- Short topic_id,
+ byte topic_id,
Ip4Address s1u_sgw_ipv4,
Long s1u_enodeb_teid,
Ip4Address s1u_enodeb_ipv4,
@@ -105,13 +190,13 @@
} modify_bearer_msg;
*/
ByteBuffer bb = ByteBuffer.allocate(32)
- .put(toUint8(topic_id))
- .put(s11MsgType.MODIFY_BEARER.getType())
+ .put(topic_id)
+ .put(s11MsgType.UPDATE_MODIFY_BEARER.getType())
.put(toUint32(s1u_sgw_ipv4.toInt()))
.put(toUint32(s1u_enodeb_teid))
.put(toUint32(s1u_enodeb_ipv4.toInt()))
.put(toUint64(BigInteger.valueOf(session_id)))
- .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
.put(toUint32(client_id))
.put(toUint32(op_id.longValue()));
@@ -121,7 +206,7 @@
@Override
public void delete_session(
- Short topic_id,
+ byte topic_id,
Long session_id,
Long client_id,
BigInteger op_id
@@ -138,10 +223,10 @@
} delete_session_msg;
*/
ByteBuffer bb = ByteBuffer.allocate(19)
- .put(toUint8(topic_id))
+ .put(topic_id)
.put(s11MsgType.DELETE_SESSION.getType())
.put(toUint64(BigInteger.valueOf(session_id)))
- .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+ .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
.put(toUint32(client_id))
.put(toUint32(op_id.longValue()));
@@ -193,32 +278,133 @@
bb.put(toUint8((short) sponsor_ID.length()))
.put(sponsor_ID.getBytes());
}
- bb.put(toUint8(ZMQSBSubscriberManager.getControllerTopic()));
+ bb.put(ZMQSBSubscriberManager.getInstance().getControllerTopic());
log.info("send_ADC_rules: {}", bb.array());
ZMQSBPublisherManager.getInstance().send(bb);
}
- /**
- * Following the NGIC message types.
- *
- * This type structure is defined in NGIC at interface/zmq/zmqsub.h:51
- */
- enum s11MsgType {
- CREATE_SESSION(1),
- MODIFY_BEARER(2),
- DELETE_SESSION(3),
- DPN_RESPONSE(4),
- DDN(5),
- ASSIGN_TOPIC(10),
- ASSIGN_CONFLICT(11),
- DPN_STATUS_INDICATION(12),
- DPN_STATUS_ACK(13),
- CONTROLLER_STATUS_INDICATION(14),
- ADC_RULE(17),
- PCC_RULE(18),
- METER_RULE(19),
- SDF_RULE(20);
+ public enum s11MsgType {
+ CREATE_SESSION(1) {
+ @Override
+ public String toString() {
+ return "CREATE_SESSION";
+ }
+ },
+ UPDATE_MODIFY_BEARER(2) {
+ @Override
+ public String toString() {
+ return "UPDATE_MODIFY_BEARER";
+ }
+ },
+ DELETE_SESSION(3) {
+ @Override
+ public String toString() {
+ return "DELETE_SESSION";
+ }
+ },
+ DPN_RESPONSE(4) {
+ @Override
+ public String toString() {
+ return "DPN_RESPONSE";
+ }
+ },
+ DDN(5) {
+ @Override
+ public String toString() {
+ return "DDN";
+ }
+ },
+ DDN_ACK(5) {
+ @Override
+ public String toString() {
+ return "DDN_ACK";
+ }
+ },
+ RESERVED_1(7) {
+ @Override
+ public String toString() {
+ return "RESERVED_1";
+ }
+ },
+ RESERVED_2(8) {
+ @Override
+ public String toString() {
+ return "RESERVED_2";
+ }
+ },
+ RESERVED_3(9) {
+ @Override
+ public String toString() {
+ return "RESERVED_3";
+ }
+ },
+ ASSIGN_TOPIC(10) {
+ @Override
+ public String toString() {
+ return "ASSIGN_TOPIC";
+ }
+ },
+ ASSIGN_CONFLICT(11) {
+ @Override
+ public String toString() {
+ return "ASSIGN_CONFLICT";
+ }
+ },
+ DPN_STATUS_INDICATION(12) {
+ @Override
+ public String toString() {
+ return "DPN_STATUS_INDICATION";
+ }
+ },
+ DPN_STATUS_ACK(13) {
+ @Override
+ public String toString() {
+ return "DPN_STATUS_ACK";
+ }
+ },
+ CONTROLLER_STATUS_INDICATION(14) {
+ @Override
+ public String toString() {
+ return "CONTROLLER_STATUS_INDICATION";
+ }
+ },
+ GENERATE_CDR(15) {
+ @Override
+ public String toString() {
+ return "GENERATE_CDR";
+ }
+ },
+ GENERATE_CDR_ACK(16) {
+ @Override
+ public String toString() {
+ return "GENERATE_CDR_ACK";
+ }
+ },
+ ADC_RULE(17) {
+ @Override
+ public String toString() {
+ return "ADC_RULE";
+ }
+ },
+ PCC_RULE(18) {
+ @Override
+ public String toString() {
+ return "PCC_RULE";
+ }
+ },
+ METER_RULE(19) {
+ @Override
+ public String toString() {
+ return "METER_RULE";
+ }
+ },
+ SDF_RULE(20) {
+ @Override
+ public String toString() {
+ return "SDF_RULE";
+ }
+ };
private byte type;
@@ -226,8 +412,140 @@
this.type = (byte) type;
}
+ public static s11MsgType getEnum(byte name) {
+ Optional<s11MsgType> any = Arrays.stream(s11MsgType.values())
+ .filter(typeStr -> typeStr.type == name)
+ .findAny();
+ if (any.isPresent()) {
+ return any.get();
+ }
+ throw new IllegalArgumentException("No enum defined for string: " + name);
+ }
+
public byte getType() {
return type;
}
}
+
+ public enum DpnStatusIndication {
+ HELLO(1) {
+ @Override
+ public String toString() {
+ return "HELLO";
+ }
+ },
+ GOODBYE(2) {
+ @Override
+ public String toString() {
+ return "GOODBYE";
+ }
+ },
+ OVERLOAD_START(3) {
+ @Override
+ public String toString() {
+ return "OVERLOAD_START";
+ }
+ },
+ OVERLOAD_STOP(4) {
+ @Override
+ public String toString() {
+ return "OVERLOAD_STOP";
+ }
+ },
+ MATERIAL_CHANGE(5) {
+ @Override
+ public String toString() {
+ return "MATERIAL_CHANGE";
+ }
+ },
+ RESTART(6) {
+ @Override
+ public String toString() {
+ return "RESTART";
+ }
+ },
+ OUT_OF_SERVICE(7) {
+ @Override
+ public String toString() {
+ return "OUT_OF_SERVICE";
+ }
+ };
+
+ private byte type;
+
+ DpnStatusIndication(int type) {
+ this.type = (byte) type;
+ }
+
+ public static DpnStatusIndication getEnum(byte name) {
+ Optional<DpnStatusIndication> any = Arrays.stream(DpnStatusIndication.values())
+ .filter(typeStr -> typeStr.type == name)
+ .findAny();
+ if (any.isPresent()) {
+ return any.get();
+ }
+ throw new IllegalArgumentException("No enum defined for string: " + name);
+ }
+
+ public byte getType() {
+ return type;
+ }
+ }
+
+ public enum ControllerStatusIndication {
+ HELLO(1) {
+ @Override
+ public String toString() {
+ return "HELLO";
+ }
+ },
+ GOODBYE(2) {
+ @Override
+ public String toString() {
+ return "GOODBYE";
+ }
+ };
+
+ private byte type;
+
+ ControllerStatusIndication(int type) {
+ this.type = (byte) type;
+ }
+
+ public byte getType() {
+ return type;
+ }
+ }
+
+ public enum ReservedTopics {
+ BROADCAST_ALL(1) {
+ @Override
+ public String toString() {
+ return "BROADCAST_ALL";
+ }
+ },
+ BROADCAST_CONTROLLERS(2) {
+ @Override
+ public String toString() {
+ return "BROADCAST_CONTROLLERS";
+ }
+ },
+ BROADCAST_DPNS(3) {
+ @Override
+ public String toString() {
+ return "BROADCAST_DPNS";
+ }
+ };
+
+ private byte type;
+
+ ReservedTopics(int type) {
+ this.type = (byte) type;
+ }
+
+ public byte getType() {
+ return type;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
index 5b12e05..af706e6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
@@ -23,17 +23,17 @@
public class DpnP4Communicator extends P4DpnControlProtocol implements DpnCommunicationService {
@Override
- public void create_session(Short topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+ public void create_session(byte topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
}
@Override
- public void modify_bearer(Short topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+ public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
}
@Override
- public void delete_session(Short topic_id, Long session_id, Long client_id, BigInteger op_id) {
+ public void delete_session(byte topic_id, Long session_id, Long client_id, BigInteger op_id) {
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
new file mode 100644
index 0000000..21c1413
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.protocols;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
new file mode 100644
index 0000000..8d099f0
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+public interface DpnDeviceListener {
+ void deviceAdded(String id, byte topic);
+
+ void deviceRemoved(String id);
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
new file mode 100644
index 0000000..d72991e
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+import org.apache.felix.scr.annotations.*;
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.*;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+
+import static org.onosproject.net.DeviceId.deviceId;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ *
+ */
+@Component(immediate = true)
+@Service
+public class DpnProvider extends AbstractProvider implements DpnProviderService {
+
+ private static final Logger log = getLogger(DpnProvider.class);
+ private final InternalDeviceListener listener = new InternalDeviceListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceProviderRegistry providerRegistry;
+
+ private DeviceProviderService providerService;
+
+ public DpnProvider() {
+ super(new ProviderId("fpc", "org.onosproject.providers.dpn"));
+ }
+
+ @Activate
+ public void activate() {
+ providerService = providerRegistry.register(this);
+ log.info("FPC Device Provider Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ providerRegistry.unregister(this);
+ providerService = null;
+ log.info("FPC Device Provider Stopped");
+ }
+
+ public InternalDeviceListener getListener() {
+ return listener;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ return true;
+ }
+
+ @Override
+ public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
+
+ }
+
+ public class InternalDeviceListener implements DpnDeviceListener {
+
+ @Override
+ public void deviceAdded(String id, byte topic) {
+ DeviceId deviceId = deviceId("fpc:" + id);
+ ChassisId chassisId = new ChassisId(deviceId.hashCode());
+
+ Device.Type type = Device.Type.OTHER;
+ SparseAnnotations annotations = DefaultAnnotations.builder()
+ .set(AnnotationKeys.NAME, id)
+ .set(AnnotationKeys.PROTOCOL, "ZMQ")
+ .set(AnnotationKeys.MANAGEMENT_ADDRESS, "127.0.0.1")
+ .set("topic", String.valueOf(topic))
+ .build();
+ DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "fpc", "0.1", "0.1", id, chassisId),
+ description = new DefaultDeviceDescription(descriptionBase, annotations);
+
+ providerService.deviceConnected(deviceId, description);
+ }
+
+ @Override
+ public void deviceRemoved(String id) {
+ providerService.deviceDisconnected(deviceId("fpc:" + id));
+ }
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
new file mode 100644
index 0000000..515730a
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+import org.onosproject.net.device.DeviceProvider;
+
+public interface DpnProviderService extends DeviceProvider {
+
+ DpnDeviceListener getListener();
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
new file mode 100644
index 0000000..7f99f9d
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 0141b36..c3fa25a 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,58 +16,38 @@
package org.onosproject.fpcagent.util;
-import com.google.common.collect.Maps;
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.Filter;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceStore;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
import org.onosproject.yang.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigInteger;
-import java.util.*;
+import java.util.List;
+import java.util.Optional;
-import static org.onosproject.fpcagent.util.Converter.fromIntToLong;
-import static org.onosproject.fpcagent.util.Converter.toBigInt;
+import static org.onosproject.net.DeviceId.deviceId;
/**
* Helper class which stores all the static variables.
*/
public class FpcUtil {
- public static final int MAX_EVENTS = 1000;
- public static final int MAX_BATCH_MS = 5000;
- public static final int MAX_IDLE_MS = 1000;
- public static final String TIMER = "dynamic-config-fpcagent-timer";
- public static final String UNKNOWN_EVENT = "FPC Agent listener: unknown event: {}";
- public static final String EVENT_NULL = "Event cannot be null";
public static final String FPC_APP_ID = "org.onosproject.fpcagent";
public static final FpcIdentity defaultIdentity = FpcIdentity.fromString("default");
private static final Logger log = LoggerFactory.getLogger(FpcUtil.class);
- private static final Map<String, FpcDpnId> uplinkDpnMap = Maps.newConcurrentMap();
- private static final Map<String, Short> nodeToTopicMap = Maps.newConcurrentMap();
- private static final byte DPN_HELLO = 0b0000_0001;
- private static final byte DPN_BYE = 0b0000_0010;
- private static final byte DOWNLINK_DATA_NOTIFICATION = 0b0000_0101;
- private static final byte DPN_STATUS_INDICATION = 0b0000_1100;
- private static final byte DPN_OVERLOAD_INDICATION = 0b0000_0101;
- private static final byte DPN_REPLY = 0b0000_0100;
- private static final String DOWNLINK_DATA_NOTIFICATION_STRING = "Downlink-Data-Notification";
+
public static DynamicConfigService dynamicConfigService = null;
public static ModelConverter modelConverter = null;
- // Resource ID for Configure DPN RPC command
+ public static DeviceStore deviceStore = null;
+
public static ResourceId configureDpn;
- // Resource ID for Configure RPC command
public static ResourceId configure;
- // Resource ID for tenants data
public static ResourceId tenants;
public static ResourceId configureBundles;
public static ResourceId registerClient;
@@ -188,79 +168,19 @@
}
/**
- * Ensures the session id is an unsigned 64 bit integer
- *
- * @param sessionId - session id received from the DPN
- * @return unsigned session id
- */
- private static BigInteger checkSessionId(BigInteger sessionId) {
- if (sessionId.compareTo(BigInteger.ZERO) < 0) {
- sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
- }
- return sessionId;
- }
-
- /**
- * Decodes a DownlinkDataNotification
- *
- * @param buf - message buffer
- * @param key - Concatenation of node id + / + network id
- * @return DownlinkDataNotification or null if it could not be successfully decoded
- */
- private static DownlinkDataNotification processDDN(byte[] buf, String key) {
- DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
- ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
- ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
- ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
- ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
- ddnB.notificationDpnId(uplinkDpnMap.get(key));
- return ddnB;
- }
-
- /**
- * Decodes a DPN message.
- *
- * @param buf - message buffer
- * @return - A pair with the DPN Id and decoded Object
- */
- public static Map.Entry<FpcDpnId, Object> decode(byte[] buf) {
- if (buf[1] == DPN_REPLY) {
- return null;
- } else if (buf[1] == DOWNLINK_DATA_NOTIFICATION) {
- short nodeIdLen = buf[18];
- short networkIdLen = buf[19 + nodeIdLen];
- String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
- return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
- } else if (buf[1] == DPN_STATUS_INDICATION) {
- DPNStatusIndication.Status status = null;
-
- short nodeIdLen = buf[8];
- short networkIdLen = buf[9 + nodeIdLen];
- String key = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
- if (buf[3] == DPN_OVERLOAD_INDICATION) {
- status = DPNStatusIndication.Status.OVERLOAD_INDICATION;
- } else if (buf[3] == DPN_HELLO) {
- status = DPNStatusIndication.Status.HELLO;
- log.info("Hello {} on topic {}", key, buf[2]);
- nodeToTopicMap.put(key, (short) buf[2]);
- } else if (buf[3] == DPN_BYE) {
- status = DPNStatusIndication.Status.BYE;
- log.info("Bye {}", key);
- nodeToTopicMap.remove(key);
- }
- return new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), new DPNStatusIndication(status, key));
- }
- return null;
- }
-
- /**
* Gets the mapping for node id / network id to ZMQ Topic
*
- * @param Key - Concatenation of node id + / + network id
+ * @param key - Concatenation of node id + / + network id
* @return - ZMQ Topic
*/
- public static Short getTopicFromNode(String Key) {
- return nodeToTopicMap.get(Key);
+ public static byte getTopicFromNode(String key) {
+ // TODO add cache
+ Device device = deviceStore.getDevice(deviceId("fpc:" + key));
+ if (device != null) {
+ String topic = device.annotations().value("topic");
+ return Byte.parseByte(topic);
+ }
+ return -1;
}
/**
@@ -349,55 +269,4 @@
node -> dynamicConfigService.updateNode(dataNode.resourceId(), node)
);
}
-
- /**
- * Provides basic status changes,
- */
- public static class DPNStatusIndication {
- private final Status status;
- private final String key; //nodeId +"/"+ networkId
- /**
- * Node Reference of the DPN
- */
- public Short nodeRef;
-
- /**
- * Constructor providing the DPN and its associated Status.
- *
- * @param status - DPN Status
- * @param key - Combination of node id and network id
- */
- public DPNStatusIndication(Status status,
- String key) {
- this.status = status;
- this.key = key;
- }
-
- /**
- * Provides DPN Status
- *
- * @return Status associated to the DPN.
- */
- public Status getStatus() {
- return status;
- }
-
- /**
- * Provides the DPN key - nodeId +"/"+ networkId
- *
- * @return FpcDpnId
- */
- public String getKey() {
- return this.key;
- }
-
- /**
- * Basic DPN Status
- */
- public enum Status {
- HELLO,
- BYE,
- OVERLOAD_INDICATION
- }
- }
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
new file mode 100644
index 0000000..b4b4877
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.util;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index f137693..5cd5028 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,70 +1,56 @@
package org.onosproject.fpcagent.workers;
+import javafx.util.Pair;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.util.FpcUtil;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
-import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import static org.onosproject.fpcagent.util.Converter.*;
+import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
+import static org.onosproject.fpcagent.util.Converter.toInt;
public class ZMQSBSubscriberManager implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ZMQSBSubscriberManager.class);
private static int MIN_TOPIC_VAL = 4;
private static int MAX_TOPIC_VAL = 255;
- private static byte ASSIGN_ID = 0b0000_1010;
- private static byte ASSIGN_CONFLICT = 0b0000_1011;
- private static byte HELLO_REPLY = 0b0000_1101;
- private static byte CONTROLLER_STATUS_INDICATION = 0b0000_1110;
- private static byte HELLO = 0b0000_0001;
- private static byte GOODBYE = 0b0000_0010;
private static ZMQSBSubscriberManager _instance = null;
- private static Long controllerSourceId;
- private static Short subscriberId;
private final String address;
- private final Short broadcastAllId;
- private final Short broadcastControllersId;
- private final Short broadcastDpnsId;
private final String nodeId;
private final String networkId;
+ private Long controllerSourceId;
+ private byte controllerTopic;
private boolean run;
private boolean conflictingTopic;
+
private Future<?> broadcastAllWorker;
private Future<?> broadcastControllersWorker;
private Future<?> broadcastTopicWorker;
private Future<?> generalWorker;
- protected ZMQSBSubscriberManager(String address, String broadcastAllId, String broadcastControllersId,
- String broadcastDpnsId, String nodeId, String networkId) {
+ private DpnDeviceListener dpnDeviceListener;
+
+ protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener) {
this.address = address;
this.run = true;
-
this.nodeId = nodeId;
this.networkId = networkId;
- this.broadcastAllId = Short.parseShort(broadcastAllId);
- this.broadcastControllersId = Short.parseShort(broadcastControllersId);
- this.broadcastDpnsId = Short.parseShort(broadcastDpnsId);
-
this.conflictingTopic = false;
- controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
+ this.controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
+ this.dpnDeviceListener = dpnDeviceListener;
}
- public static ZMQSBSubscriberManager createInstance(String address, String broadcastAllId,
- String broadcastControllersId, String broadcastDpnsId,
- String nodeId, String networkId) {
+ public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService) {
if (_instance == null) {
- _instance = new ZMQSBSubscriberManager(address, broadcastAllId, broadcastControllersId,
- broadcastDpnsId, nodeId, networkId);
+ _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService);
}
return _instance;
}
@@ -73,29 +59,28 @@
return _instance;
}
- public static Short getControllerTopic() {
- return subscriberId;
+ public byte getControllerTopic() {
+ return controllerTopic;
}
- public static Long getControllerSourceId() {
+ public Long getControllerSourceId() {
return controllerSourceId;
}
public void open() {
- short subscriberId = (short) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
-
broadcastAllWorker = Executors.newSingleThreadExecutor()
- .submit(new ZMQSubscriberWorker(broadcastAllId));
+ .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_ALL.getType()));
broadcastControllersWorker = Executors.newSingleThreadExecutor()
- .submit(new ZMQSubscriberWorker(broadcastControllersId));
+ .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_CONTROLLERS.getType()));
broadcastTopicWorker = Executors.newSingleThreadExecutor()
- .submit(new BroadcastTopic(subscriberId));
+ .submit(new AssignTopic());
}
@Override
public void close() {
+ send_goodbye_dpns(nodeId, networkId);
run = false;
}
@@ -105,33 +90,14 @@
* @param conflict - Flag to indicate conflict
* @param subId - Topic Id that caused the conflict
*/
- protected void BroadcastAllSubIdCallBack(boolean conflict, Short subId) {
- if (conflict && subscriberId.equals(subId)) {
+ protected void BroadcastAllSubIdCallBack(boolean conflict, byte subId) {
+ if (conflict && controllerTopic == subId) {
this.conflictingTopic = true;
broadcastTopicWorker.cancel(true);
}
}
/**
- * Broadcasts the GOODBYE message to all the DPNs
- */
- public void sendGoodbyeToDpns() {
- ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
- bb.put(toUint8(broadcastDpnsId))
- .put(CONTROLLER_STATUS_INDICATION)
- .put(toUint8(subscriberId))
- .put(GOODBYE)
- .put(toUint32(controllerSourceId))
- .put(toUint8((short) nodeId.length()))
- .put(nodeId.getBytes())
- .put(toUint8((short) networkId.length()))
- .put(networkId.getBytes());
-
- log.info("sendGoodbyeToDpns: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
- }
-
- /**
* Broadcasts an Assign Conflict message
*
* @param contents - byte array received over the southbound.
@@ -143,81 +109,112 @@
String node_id = new String(Arrays.copyOfRange(contents, 8, 8 + nodeIdLen));
String network_id = new String(Arrays.copyOfRange(contents, 9 + nodeIdLen, 9 + nodeIdLen + networkIdLen));
- if (toUint8(subscriberId) == topic || (nodeId.equals(node_id) && networkId.equals(network_id))) {
- ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
- bb.put(toUint8(broadcastAllId))
- .put(ASSIGN_CONFLICT)
- .put(topic)
- .put(toUint32(controllerSourceId))
- .put(toUint8((short) nodeId.length()))
- .put(nodeId.getBytes())
- .put(toUint8((short) networkId.length()))
- .put(networkId.getBytes());
-
- log.info("SendAssignConflictMessage: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ if (controllerTopic == topic || (nodeId.equals(node_id) && networkId.equals(network_id))) {
+ send_assign_conflict(nodeId, networkId);
}
}
protected class ZMQSubscriberWorker implements Runnable {
- private final Short subscriberId;
+ private final byte subscribedTopic;
private ZContext ctx;
- ZMQSubscriberWorker(Short subscriberId) {
- this.subscriberId = subscriberId;
+ ZMQSubscriberWorker(byte subscribedTopic) {
+ this.subscribedTopic = subscribedTopic;
this.ctx = new ZContext();
}
- /**
- * Sends a reply to a DPN Hello
- *
- * @param dpnStatus - DPN Status Indication message received from the DPN
- */
- protected void sendHelloReply(FpcUtil.DPNStatusIndication dpnStatus) {
- if (FpcUtil.getTopicFromNode(dpnStatus.getKey()) != null) {
- ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
- .put(toUint8(FpcUtil.getTopicFromNode(dpnStatus.getKey())))
- .put(HELLO_REPLY)
- .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
- .put(toUint32(ZMQSBSubscriberManager.getControllerSourceId()))
- .put(toUint8((short) nodeId.length()))
- .put(nodeId.getBytes())
- .put(toUint8((short) networkId.length()))
- .put(networkId.getBytes());
+// /**
+// * Ensures the session id is an unsigned 64 bit integer
+// *
+// * @param sessionId - session id received from the DPN
+// * @return unsigned session id
+// */
+// private static BigInteger checkSessionId(BigInteger sessionId) {
+// if (sessionId.compareTo(BigInteger.ZERO) < 0) {
+// sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
+// }
+// return sessionId;
+// }
+//
+// /**
+// * Decodes a DownlinkDataNotification
+// *
+// * @param buf - message buffer
+// * @param key - Concatenation of node id + / + network id
+// * @return DownlinkDataNotification or null if it could not be successfully decoded
+// */
+// private static DownlinkDataNotification processDDN(byte[] buf, String key) {
+// DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
+// ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
+// ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
+// ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+// ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+// ddnB.notificationDpnId(uplinkDpnMap.get(key));
+// return ddnB;
+// }
- log.info("sendHelloReply: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ public Pair<Object, Object> decode(byte[] buf) {
+ s11MsgType type;
+ type = s11MsgType.getEnum(buf[1]);
+ if (type.equals(s11MsgType.DDN)) {
+ short nodeIdLen = buf[18];
+ short networkIdLen = buf[19 + nodeIdLen];
+ String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
+// return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
+ } else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
+ DpnStatusIndication status;
+
+ short nodeIdLen = buf[8];
+ short networkIdLen = buf[9 + nodeIdLen];
+ String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
+
+ status = DpnStatusIndication.getEnum(buf[3]);
+ if (status.equals(DpnStatusIndication.HELLO)) {
+ log.info("Hello {} on topic {}", deviceId, buf[2]);
+
+ dpnDeviceListener.deviceAdded(deviceId, buf[2]);
+ } else if (status.equals(DpnStatusIndication.GOODBYE)) {
+ log.info("Bye {}", deviceId);
+ dpnDeviceListener.deviceRemoved(deviceId);
+ }
+ return new Pair<>(status, deviceId);
}
+ return null;
}
@Override
public void run() {
ZMQ.Socket subscriber = this.ctx.createSocket(ZMQ.SUB);
subscriber.connect(address);
- subscriber.subscribe(new byte[]{toUint8(subscriberId)});
- log.debug("Subscriber at {} / {}", address, subscriberId);
+ subscriber.subscribe(new byte[]{subscribedTopic});
+ log.debug("Subscriber at {} / {}", address, subscribedTopic);
while ((!Thread.currentThread().isInterrupted()) && run) {
byte[] contents = subscriber.recv();
byte topic = contents[0];
- byte messageType = contents[1];
- log.debug("Received {}", contents);
+ s11MsgType messageType = s11MsgType.getEnum(contents[1]);
+ log.info("Received {}", messageType);
switch (topic) {
case 1:
- if (messageType == ASSIGN_CONFLICT && toInt(contents, 3) != controllerSourceId) {
- BroadcastAllSubIdCallBack(true, (short) contents[2]);
- } else if (messageType == ASSIGN_ID && toInt(contents, 3) != controllerSourceId) {
+ if (messageType.equals(s11MsgType.ASSIGN_CONFLICT) &&
+ toInt(contents, 3) != controllerSourceId) {
+ BroadcastAllSubIdCallBack(true, contents[2]);
+ } else if (messageType.equals(s11MsgType.ASSIGN_TOPIC) &&
+ toInt(contents, 3) != controllerSourceId) {
SendAssignConflictMessage(contents);
}
break;
default:
- Map.Entry<FpcDpnId, Object> entry = FpcUtil.decode(contents);
- if (entry != null) {
- if (entry.getValue() instanceof DownlinkDataNotification) {
+ Pair msg = decode(contents);
+ if (msg != null) {
+ Object key = msg.getKey();
+ if (key instanceof DownlinkDataNotification) {
// TODO handle DL notification
- } else if (entry.getValue() instanceof FpcUtil.DPNStatusIndication) {
- FpcUtil.DPNStatusIndication dpnStatus = (FpcUtil.DPNStatusIndication) entry.getValue();
- if (dpnStatus.getStatus() == FpcUtil.DPNStatusIndication.Status.HELLO) {
- sendHelloReply(dpnStatus);
+ } else if (key instanceof DpnStatusIndication) {
+ if (key.equals(DpnStatusIndication.HELLO)) {
+ byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());
+ if (dpnTopic != -1) {
+ send_status_ack(nodeId, networkId, dpnTopic);
+ }
}
}
}
@@ -237,75 +234,31 @@
/**
* Class to broadcast a topic for the controller
*/
- protected class BroadcastTopic implements Runnable {
- private Short topic;
+ protected class AssignTopic implements Runnable {
+ private byte topic;
- /**
- * Constructor
- *
- * @param topic - Topic to broadcast
- */
- public BroadcastTopic(Short topic) {
- this.topic = topic;
- }
-
- /**
- * Broadcasts the topic
- */
- private void broadcastTopic() {
- ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
- bb.put(toUint8(broadcastAllId))
- .put(ASSIGN_ID)
- .put(toUint8(this.topic))
- .put(toUint32(controllerSourceId))
- .put(toUint8((short) nodeId.length()))
- .put(nodeId.getBytes())
- .put(toUint8((short) networkId.length()))
- .put(networkId.getBytes());
-
- log.info("broadcastTopic: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
- }
-
- /**
- * Broadcasts the HELLO message to all the DPNs
- */
- private void sendHelloToDpns() {
- ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
- bb.put(toUint8(broadcastDpnsId))
- .put(CONTROLLER_STATUS_INDICATION)
- .put(toUint8(subscriberId))
- .put(HELLO)
- .put(toUint32(controllerSourceId))
- .put(toUint8((short) nodeId.length()))
- .put(nodeId.getBytes())
- .put(toUint8((short) networkId.length()))
- .put(networkId.getBytes());
-
-
- log.info("sendHelloToDpns: {}", bb.array());
- ZMQSBPublisherManager.getInstance().send(bb);
+ public AssignTopic() {
+ this.topic = (byte) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
}
@Override
public void run() {
try {
- this.broadcastTopic();
+ send_assign_topic(nodeId, networkId, this.topic);
log.debug("Thread sleeping: " + Thread.currentThread().getName());
- Thread.sleep(2000);
+ Thread.sleep(2000); // wait 10 sec before assigning topic
} catch (InterruptedException e) {
if (conflictingTopic) {
conflictingTopic = false;
- this.topic = (short) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
- subscriberId = this.topic;
+ this.topic = (byte) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
+ controllerTopic = this.topic;
this.run();
return;
} else {
log.error(ExceptionUtils.getFullStackTrace(e));
}
}
- subscriberId = this.topic;
- log.info("Topic Id: " + this.topic);
+ controllerTopic = this.topic;
generalWorker = Executors.newSingleThreadExecutor().submit(new ZMQSubscriberWorker(this.topic));
try {
@@ -313,7 +266,8 @@
} catch (InterruptedException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
- sendHelloToDpns();
+
+ send_hello_dpns(nodeId, networkId);
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
new file mode 100644
index 0000000..f6a5857
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.workers;
\ No newline at end of file
diff --git a/scripts/benchmark.sh b/scripts/benchmark.sh
index 0e2bbc5..a5772f5 100755
--- a/scripts/benchmark.sh
+++ b/scripts/benchmark.sh
@@ -8,9 +8,6 @@
echo "Running 100 configure create.."
for (( i=1; i<=100; i++)); do
./configure.sh create $i 1 &> /dev/null &
- if ! (($i % 10)); then
- wait
- fi
done
wait
@@ -18,11 +15,10 @@
echo "Running 100 configure delete.."
for (( i=1; i<=100; i++)); do
./configure.sh delete $i &> /dev/null &
- if ! (($i % 10)); then
- wait
- fi
done
+wait
+
echo "Delete DPN.."
./deleteDPN.sh 1 &> /dev/null
echo "Deregister Client.."