added DPNs as devices
diff --git a/apps/fpcagent/fpcagent.json b/apps/fpcagent/fpcagent.json
index 6b26cbf..c614015 100644
--- a/apps/fpcagent/fpcagent.json
+++ b/apps/fpcagent/fpcagent.json
@@ -6,10 +6,7 @@
         "dpn-publisher-uri": "tcp://*:5555",
         "dpn-client-threads": 5,
         "node-id": "node0",
-        "network-id": "network1",
-        "zmq-broadcast-all": "1",
-        "zmq-broadcast-controllers": "2",
-        "zmq-broadcast-dpns": "3"
+        "network-id": "network1"
       }
     }
   }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 7929e59..ba2f989 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -16,10 +16,13 @@
 
 package org.onosproject.fpcagent;
 
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.*;
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.fpcagent.providers.DpnProviderService;
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.util.ConfigHelper;
 import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
 import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
@@ -30,6 +33,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
 import java.util.Optional;
 
 import static org.onosproject.fpcagent.util.FpcUtil.FPC_APP_ID;
@@ -68,9 +72,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private CoreService coreService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DpnProviderService dpnProviderService;
+
     /* Variables */
     private FpcConfig fpcConfig;
     private boolean started = false;
+    private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
 
     /* Config */
     private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -88,8 +96,7 @@
         configService.addListener(configListener);
         registry.registerConfigFactory(fpcConfigConfigFactory);
 
-
-        log.info("FPC Agent Started");
+        log.info("FPC Service Started");
     }
 
     @Deactivate
@@ -100,9 +107,10 @@
         if (started) {
             ZMQSBSubscriberManager.getInstance().close();
             ZMQSBPublisherManager.getInstance().close();
+            started = false;
         }
 
-        log.info("FPC Agent Stopped");
+        log.info("FPC Servicea Stopped");
     }
 
     /**
@@ -114,11 +122,9 @@
                     started = true;
                     ZMQSBSubscriberManager.createInstance(
                             helper.dpnSubscriberUri(),
-                            helper.zmqBroadcastAll(),
-                            helper.zmqBroadcastControllers(),
-                            helper.zmqBroadcastDpns(),
                             helper.nodeId(),
-                            helper.networkId()
+                            helper.networkId(),
+                            dpnProviderService.getListener()
                     );
 
                     ZMQSBPublisherManager.createInstance(
@@ -133,6 +139,16 @@
     }
 
     @Override
+    public void addListener(DpnDeviceListener listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(DpnDeviceListener listener) {
+        listeners.remove(listener);
+    }
+
+    @Override
     public Optional<ConfigHelper> getConfig() {
         return fpcConfig != null ? fpcConfig.getConfig() : Optional.empty();
     }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index fd10302..61aa4bd 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.fpcagent;
 
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -28,7 +27,9 @@
 import org.onosproject.fpcagent.protocols.DpnCommunicationService;
 import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
 import org.onosproject.fpcagent.protocols.DpnP4Communicator;
-import org.onosproject.fpcagent.util.*;
+import org.onosproject.fpcagent.util.CacheManager;
+import org.onosproject.fpcagent.util.FpcUtil;
+import org.onosproject.net.device.DeviceStore;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.DefaultConnectionInfo;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.ZmqDpnControlProtocol;
@@ -78,6 +79,8 @@
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Optional;
@@ -104,24 +107,30 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private RpcRegistry registry;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DeviceStore deviceStore;
+
     private ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
+    // FIXME configurable
+    private ExecutorService executorService = Executors.newFixedThreadPool(25);
 
     @Activate
     protected void activate() {
         init();
         registry.registerRpcService(this);
-        log.info("Tenant Service Started");
+        log.info("FPC RPC Service Started");
     }
 
     @Deactivate
     protected void deactivate() {
         registry.unregisterRpcService(this);
-        log.info("Tenant Service Stopped");
+        log.info("FPC RPC Service Stopped");
     }
 
     private void init() {
         FpcUtil.modelConverter = modelConverter;
         FpcUtil.dynamicConfigService = dynamicConfigService;
+        FpcUtil.deviceStore = deviceStore;
         getResourceId();
 
         // Create the Default Tenant and added to the Tenants structure.
@@ -202,8 +211,8 @@
                 }
 
                 // get DPN Topic from Node/Network pair
-                Short topic_id = getTopicFromNode(key.get());
-                if (topic_id == null) {
+                byte topic_id = getTopicFromNode(key.get());
+                if (topic_id == -1) {
                     throw new RuntimeException("Could not find Topic ID");
                 }
 
@@ -361,8 +370,8 @@
                 }
 
                 // get DPN Topic from Node/Network pair
-                Short topic_id = getTopicFromNode(key.get());
-                if (topic_id == null) {
+                byte topic_id = getTopicFromNode(key.get());
+                if (topic_id == -1) {
                     throw new RuntimeException("Could not find Topic ID");
                 }
 
@@ -504,8 +513,8 @@
                 }
 
                 // find DPN Topic from Node/Network ID pair.
-                Short topic_id = getTopicFromNode(key.get());
-                if (topic_id == null) {
+                byte topic_id = getTopicFromNode(key.get());
+                if (topic_id == -1) {
                     throw new RuntimeException("Could not find Topic ID");
                 }
 
@@ -659,265 +668,282 @@
 
     @Override
     public RpcOutput configureDpn(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        DefaultConfigureDpnOutput configureDpnOutput = new DefaultConfigureDpnOutput();
-        configureDpnOutput.result(Result.of(ResultEnum.OK));
-        configureDpnOutput.resultType(new DefaultEmptyCase());
-        RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
-        try {
-            for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureDpn)) {
-                DefaultConfigureDpnInput input = (DefaultConfigureDpnInput) modelObject;
-                switch (input.operation().enumeration()) {
-                    case ADD:
-                        configureDpnOutput = configureDpnAdd(input);
-                        break;
-                    case REMOVE:
-                        configureDpnOutput = configureDpnRemove(input);
-                        break;
+        return CompletableFuture.supplyAsync(() -> {
+            Instant start = Instant.now();
+            DefaultConfigureDpnOutput configureDpnOutput = new DefaultConfigureDpnOutput();
+            configureDpnOutput.result(Result.of(ResultEnum.OK));
+            configureDpnOutput.resultType(new DefaultEmptyCase());
+            RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+            try {
+                for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureDpn)) {
+                    DefaultConfigureDpnInput input = (DefaultConfigureDpnInput) modelObject;
+                    switch (input.operation().enumeration()) {
+                        case ADD:
+                            configureDpnOutput = configureDpnAdd(input);
+                            break;
+                        case REMOVE:
+                            configureDpnOutput = configureDpnRemove(input);
+                            break;
+                    }
                 }
+            } catch (Exception e) {
+                status = RpcOutput.Status.RPC_FAILURE;
+                org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr defaultErr = new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr();
+                defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
+                defaultErr.errorTypeId(ErrorTypeId.of(0));
+                configureDpnOutput.resultType(defaultErr);
+                configureDpnOutput.result(Result.of(ResultEnum.ERR));
+                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        } catch (Exception e) {
-            status = RpcOutput.Status.RPC_FAILURE;
-            org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr defaultErr = new org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbodydpn.resulttype.DefaultErr();
-            defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
-            defaultErr.errorTypeId(ErrorTypeId.of(0));
-            configureDpnOutput.resultType(defaultErr);
-            configureDpnOutput.result(Result.of(ResultEnum.ERR));
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(configureDpnOutput)
-                        .build()
-        );
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
-        return new RpcOutput(status, dataNode.dataNodes().get(0));
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(configureDpnOutput)
+                            .build()
+            );
+            log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+            return new RpcOutput(status, dataNode.dataNodes().get(0));
+        }, executorService).join();
     }
 
     @Override
     public RpcOutput configure(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
-        RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
-        try {
-            for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
-                DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
-                if (!clientInfo.containsKey(input.clientId())) {
-                    throw new RuntimeException("Client Identifier is not registered.");
-                }
-                switch (input.opType()) {
-                    case CREATE:
-                        configureOutput = configureCreate(
-                                (CreateOrUpdate) input.opBody(),
-                                clientInfo.get(input.clientId()),
-                                input.opId()
-                        );
-                        break;
-                    case UPDATE:
-                        configureOutput = configureUpdate(
-                                (CreateOrUpdate) input.opBody(),
-                                clientInfo.get(input.clientId()),
-                                input.opId()
-                        );
-                        break;
-                    case QUERY:
-                        break;
-                    case DELETE:
-                        configureOutput = configureDelete(
-                                (DeleteOrQuery) input.opBody(),
-                                clientInfo.get(input.clientId()),
-                                input.opId()
-                        );
-                        break;
-                }
-                configureOutput.opId(input.opId());
-            }
-        } catch (Exception e) {
-            // if there is an exception respond with an error.
-            status = RpcOutput.Status.RPC_FAILURE;
-            DefaultErr defaultErr = new DefaultErr();
-            defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
-            defaultErr.errorTypeId(ErrorTypeId.of(0));
-            configureOutput.resultType(defaultErr);
-            configureOutput.result(Result.of(ResultEnum.ERR));
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(configureOutput)
-                        .build()
-        );
-        log.info("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
-        return new RpcOutput(status, dataNode.dataNodes().get(0));
-    }
-
-    @Override
-    public RpcOutput configureBundles(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        DefaultConfigureBundlesOutput configureBundlesOutput = new DefaultConfigureBundlesOutput();
-        RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
-        try {
-            for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
-                DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
-                if (!clientInfo.containsKey(input.clientId())) {
-                    throw new RuntimeException("Client Identifier is not registered.");
-                }
-                for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
-                    DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
-                    switch (bundle.opType()) {
+        return CompletableFuture.supplyAsync(() -> {
+            Instant start = Instant.now();
+            DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
+            RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+            try {
+                for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
+                    DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
+                    if (!clientInfo.containsKey(input.clientId())) {
+                        throw new RuntimeException("Client Identifier is not registered.");
+                    }
+                    switch (input.opType()) {
                         case CREATE:
                             configureOutput = configureCreate(
-                                    (CreateOrUpdate) bundle.opBody(),
+                                    (CreateOrUpdate) input.opBody(),
                                     clientInfo.get(input.clientId()),
-                                    bundle.opId()
+                                    input.opId()
                             );
                             break;
                         case UPDATE:
                             configureOutput = configureUpdate(
-                                    (CreateOrUpdate) bundle.opBody(),
+                                    (CreateOrUpdate) input.opBody(),
                                     clientInfo.get(input.clientId()),
-                                    bundle.opId()
+                                    input.opId()
                             );
                             break;
                         case QUERY:
                             break;
                         case DELETE:
                             configureOutput = configureDelete(
-                                    (DeleteOrQuery) bundle.opBody(),
+                                    (DeleteOrQuery) input.opBody(),
                                     clientInfo.get(input.clientId()),
-                                    bundle.opId()
+                                    input.opId()
                             );
                             break;
                     }
-                    Bundles result = new DefaultBundles();
-                    result.opId(bundle.opId());
-                    result.result(configureOutput.result());
-                    result.resultType(configureOutput.resultType());
-                    configureBundlesOutput.addToBundles(result);
+                    configureOutput.opId(input.opId());
                 }
+            } catch (Exception e) {
+                // if there is an exception respond with an error.
+                status = RpcOutput.Status.RPC_FAILURE;
+                DefaultErr defaultErr = new DefaultErr();
+                defaultErr.errorInfo(ExceptionUtils.getFullStackTrace(e));
+                defaultErr.errorTypeId(ErrorTypeId.of(0));
+                configureOutput.resultType(defaultErr);
+                configureOutput.result(Result.of(ResultEnum.ERR));
+                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        } catch (Exception e) {
-            // if there is an exception respond with an error.
-            status = RpcOutput.Status.RPC_FAILURE;
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(configureBundlesOutput)
-                        .build()
-        );
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
-        return new RpcOutput(status, dataNode.dataNodes().get(0));
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(configureOutput)
+                            .build()
+            );
+            log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+            return new RpcOutput(status, dataNode.dataNodes().get(0));
+        }, executorService).join();
+    }
+
+    @Override
+    public RpcOutput configureBundles(RpcInput rpcInput) {
+        return CompletableFuture.supplyAsync(() -> {
+            Instant start = Instant.now();
+            DefaultConfigureBundlesOutput configureBundlesOutput = new DefaultConfigureBundlesOutput();
+            RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+            try {
+                for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
+                    DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
+                    if (!clientInfo.containsKey(input.clientId())) {
+                        throw new RuntimeException("Client Identifier is not registered.");
+                    }
+                    for (org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configurebundles.configurebundlesinput.Bundles bundle : input.bundles()) {
+                        DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
+                        switch (bundle.opType()) {
+                            case CREATE:
+                                configureOutput = configureCreate(
+                                        (CreateOrUpdate) bundle.opBody(),
+                                        clientInfo.get(input.clientId()),
+                                        bundle.opId()
+                                );
+                                break;
+                            case UPDATE:
+                                configureOutput = configureUpdate(
+                                        (CreateOrUpdate) bundle.opBody(),
+                                        clientInfo.get(input.clientId()),
+                                        bundle.opId()
+                                );
+                                break;
+                            case QUERY:
+                                break;
+                            case DELETE:
+                                configureOutput = configureDelete(
+                                        (DeleteOrQuery) bundle.opBody(),
+                                        clientInfo.get(input.clientId()),
+                                        bundle.opId()
+                                );
+                                break;
+                        }
+                        Bundles result = new DefaultBundles();
+                        result.opId(bundle.opId());
+                        result.result(configureOutput.result());
+                        result.resultType(configureOutput.resultType());
+                        configureBundlesOutput.addToBundles(result);
+                    }
+                }
+            } catch (Exception e) {
+                // if there is an exception respond with an error.
+                status = RpcOutput.Status.RPC_FAILURE;
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(configureBundlesOutput)
+                            .build()
+            );
+            log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+            return new RpcOutput(status, dataNode.dataNodes().get(0));
+        }, executorService).join();
     }
 
     @Override
     public RpcOutput eventRegister(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        // TODO implement
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+        Instant start = Instant.now();
+        CompletableFuture.supplyAsync(() -> {
+            // TODO implement
+            return null;
+        }, executorService);
+        log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
         return null;
     }
 
     @Override
     public RpcOutput eventDeregister(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        // TODO implement
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+        Instant start = Instant.now();
+        CompletableFuture.supplyAsync(() -> {
+            // TODO implement
+            return null;
+        }, executorService);
+        log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
         return null;
     }
 
     @Override
     public RpcOutput probe(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        // TODO implement
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
+        Instant start = Instant.now();
+        CompletableFuture.supplyAsync(() -> {
+            // TODO implement
+            return null;
+        }, executorService);
+        log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
         return null;
     }
 
     @Override
     public RpcOutput registerClient(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        DefaultRegisterClientOutput registerClientOutput = new DefaultRegisterClientOutput();
-        RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
-        try {
-            for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
-                DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
-                if (clientInfo.containsKey(input.clientId())) {
-                    throw new RuntimeException("Client already registered.");
+        return CompletableFuture.supplyAsync(() -> {
+            Instant start = Instant.now();
+            DefaultRegisterClientOutput registerClientOutput = new DefaultRegisterClientOutput();
+            RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+            try {
+                for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
+                    DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
+                    if (clientInfo.containsKey(input.clientId())) {
+                        throw new RuntimeException("Client already registered.");
+                    }
+                    clientInfo.put(input.clientId(), input);
+                    registerClientOutput.clientId(input.clientId());
+                    registerClientOutput.supportedFeatures(input.supportedFeatures());
+                    registerClientOutput.endpointUri(input.endpointUri());
+                    registerClientOutput.supportsAckModel(input.supportsAckModel());
+                    registerClientOutput.tenantId(input.tenantId());
+
+                    DefaultConnections defaultConnections = new DefaultConnections();
+                    defaultConnections.clientId(input.clientId().toString());
+
+                    ModelObjectId modelObjectId = ModelObjectId.builder()
+                            .addChild(DefaultConnectionInfo.class)
+                            .build();
+                    createNode(defaultConnections, modelObjectId);
                 }
-                clientInfo.put(input.clientId(), input);
-                registerClientOutput.clientId(input.clientId());
-                registerClientOutput.supportedFeatures(input.supportedFeatures());
-                registerClientOutput.endpointUri(input.endpointUri());
-                registerClientOutput.supportsAckModel(input.supportsAckModel());
-                registerClientOutput.tenantId(input.tenantId());
-
-                DefaultConnections defaultConnections = new DefaultConnections();
-                defaultConnections.clientId(input.clientId().toString());
-
-                ModelObjectId modelObjectId = ModelObjectId.builder()
-                        .addChild(DefaultConnectionInfo.class)
-                        .build();
-                createNode(defaultConnections, modelObjectId);
+            } catch (Exception e) {
+                // if there is an exception respond with an error.
+                status = RpcOutput.Status.RPC_FAILURE;
+                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        } catch (Exception e) {
-            // if there is an exception respond with an error.
-            status = RpcOutput.Status.RPC_FAILURE;
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
 
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(registerClientOutput)
-                        .build()
-        );
-
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
-        return new RpcOutput(status, dataNode.dataNodes().get(0));
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(registerClientOutput)
+                            .build()
+            );
+            log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+            return new RpcOutput(status, dataNode.dataNodes().get(0));
+        }).join();
     }
 
     @Override
     public RpcOutput deregisterClient(RpcInput rpcInput) {
-        Stopwatch timer = Stopwatch.createStarted();
-        DefaultDeregisterClientOutput deregisterClientOutput = new DefaultDeregisterClientOutput();
-        RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
+        return CompletableFuture.supplyAsync(() -> {
+            Instant start = Instant.now();
+            DefaultDeregisterClientOutput deregisterClientOutput = new DefaultDeregisterClientOutput();
+            RpcOutput.Status status = RpcOutput.Status.RPC_SUCCESS;
 
-        try {
-            for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
-                DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
-                if (!clientInfo.containsKey(input.clientId())) {
-                    throw new RuntimeException("Client does not exist.");
+            try {
+                for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
+                    DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
+                    if (!clientInfo.containsKey(input.clientId())) {
+                        throw new RuntimeException("Client does not exist.");
+                    }
+                    clientInfo.remove(input.clientId());
+                    deregisterClientOutput.clientId(input.clientId());
+
+                    DefaultConnections defaultConnections = new DefaultConnections();
+                    defaultConnections.clientId(input.clientId().toString());
+
+                    ConnectionsKeys connectionsKeys = new ConnectionsKeys();
+                    connectionsKeys.clientId(input.clientId().toString());
+
+                    ResourceId resourceVal = getResourceVal(ModelObjectId.builder()
+                            .addChild(DefaultConnectionInfo.class)
+                            .addChild(DefaultConnections.class, connectionsKeys)
+                            .build());
+
+                    dynamicConfigService.deleteNode(resourceVal);
                 }
-                clientInfo.remove(input.clientId());
-                deregisterClientOutput.clientId(input.clientId());
-
-                DefaultConnections defaultConnections = new DefaultConnections();
-                defaultConnections.clientId(input.clientId().toString());
-
-                ConnectionsKeys connectionsKeys = new ConnectionsKeys();
-                connectionsKeys.clientId(input.clientId().toString());
-
-                ResourceId resourceVal = getResourceVal(ModelObjectId.builder()
-                        .addChild(DefaultConnectionInfo.class)
-                        .addChild(DefaultConnections.class, connectionsKeys)
-                        .build());
-
-                dynamicConfigService.deleteNode(resourceVal);
+            } catch (Exception e) {
+                // if there is an exception respond with an error.
+                status = RpcOutput.Status.RPC_FAILURE;
+                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        } catch (Exception e) {
-            // if there is an exception respond with an error.
-            status = RpcOutput.Status.RPC_FAILURE;
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
 
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(deregisterClientOutput)
-                        .build()
-        );
-
-        log.debug("Time Elapsed {} ms", timer.stop().elapsed(TimeUnit.MILLISECONDS));
-        return new RpcOutput(status, dataNode.dataNodes().get(0));
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(deregisterClientOutput)
+                            .build()
+            );
+            log.info("Time Elapsed {} ms", Duration.between(start, Instant.now()).toMillis());
+            return new RpcOutput(status, dataNode.dataNodes().get(0));
+        }, executorService).join();
     }
 
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
index 61aa6cf..6adc0bc 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcService.java
@@ -1,10 +1,15 @@
 package org.onosproject.fpcagent;
 
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.util.ConfigHelper;
 
 import java.util.Optional;
 
 public interface FpcService {
 
+    void addListener(DpnDeviceListener listener);
+
+    void removeListener(DpnDeviceListener listener);
+
     Optional<ConfigHelper> getConfig();
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
index 7405169..17df670 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnCommunicationService.java
@@ -27,8 +27,7 @@
 
     /**
      * Creates Mobility Session.
-     *
-     * @param topic_id     - DPN Topic ID
+     *  @param topic_id     - DPN Topic ID
      * @param imsi         - IMSI identifier
      * @param default_ebi  - EBI
      * @param ue_ipv4      - UE IPv4 Address
@@ -39,7 +38,7 @@
      * @param op_id        - Operation Identifier
      */
     void create_session(
-            Short topic_id,
+            byte topic_id,
             BigInteger imsi,
             Short default_ebi,
             Ip4Address ue_ipv4,
@@ -52,8 +51,7 @@
 
     /**
      * Modifies Bearer.
-     *
-     * @param topic_id        - DPN Topic ID
+     *  @param topic_id        - DPN Topic ID
      * @param s1u_sgw_ipv4    - SGW IPv4 Address
      * @param s1u_enodeb_teid - ENodeB Tunnel Identifier
      * @param s1u_enodeb_ipv4 - ENodeB IPv4 Address
@@ -62,7 +60,7 @@
      * @param op_id           - Operation Identifier
      */
     void modify_bearer(
-            Short topic_id,
+            byte topic_id,
             Ip4Address s1u_sgw_ipv4,
             Long s1u_enodeb_teid,
             Ip4Address s1u_enodeb_ipv4,
@@ -73,14 +71,13 @@
 
     /**
      * Deletes Mobility Session.
-     *
-     * @param topic_id   - DPN Topic ID
+     *  @param topic_id   - DPN Topic ID
      * @param session_id - Context Identifier
      * @param client_id  - Client Identifier
      * @param op_id      - Operation Identifier
      */
     void delete_session(
-            Short topic_id,
+            byte topic_id,
             Long session_id,
             Long client_id,
             BigInteger op_id
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
index 7740ec6..78f1171 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnNgicCommunicator.java
@@ -26,6 +26,8 @@
 
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Optional;
 
 import static org.onosproject.fpcagent.util.Converter.*;
 
@@ -35,9 +37,92 @@
 public class DpnNgicCommunicator extends ZmqDpnControlProtocol implements DpnCommunicationService {
     protected static final Logger log = LoggerFactory.getLogger(DpnNgicCommunicator.class);
 
+    /**
+     * Broadcasts the GOODBYE message to all the DPNs
+     */
+    public static void send_goodbye_dpns(String nodeId, String networkId) {
+        ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
+        bb.put(ReservedTopics.BROADCAST_DPNS.getType())
+                .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+                .put(ControllerStatusIndication.GOODBYE.getType())
+                .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+                .put(toUint8((short) nodeId.length()))
+                .put(nodeId.getBytes())
+                .put(toUint8((short) networkId.length()))
+                .put(networkId.getBytes());
+
+        log.info("send_goodbye_dpns: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    /**
+     * Broadcasts the HELLO message to all the DPNs
+     */
+    public static void send_hello_dpns(String nodeId, String networkId) {
+        ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
+        bb.put(ReservedTopics.BROADCAST_DPNS.getType())
+                .put(s11MsgType.CONTROLLER_STATUS_INDICATION.getType())
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+                .put(ControllerStatusIndication.HELLO.getType())
+                .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+                .put(toUint8((short) nodeId.length()))
+                .put(nodeId.getBytes())
+                .put(toUint8((short) networkId.length()))
+                .put(networkId.getBytes());
+
+        log.info("send_hello_dpns: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    public static void send_assign_conflict(String nodeId, String networkId) {
+        ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
+        bb.put(ReservedTopics.BROADCAST_ALL.getType())
+                .put(s11MsgType.ASSIGN_CONFLICT.getType())
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+                .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+                .put(toUint8((short) nodeId.length()))
+                .put(nodeId.getBytes())
+                .put(toUint8((short) networkId.length()))
+                .put(networkId.getBytes());
+
+        log.info("send_assign_conflict: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    public static void send_assign_topic(String nodeId, String networkId, byte topic) {
+        ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
+        bb.put(ReservedTopics.BROADCAST_ALL.getType())
+                .put(s11MsgType.ASSIGN_TOPIC.getType())
+                .put(topic)
+                .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+                .put(toUint8((short) nodeId.length()))
+                .put(nodeId.getBytes())
+                .put(toUint8((short) networkId.length()))
+                .put(networkId.getBytes());
+
+        log.info("send_assign_topic: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    public static void send_status_ack(String nodeId, String networkId, byte topic) {
+        ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
+                .put(topic)
+                .put(s11MsgType.DPN_STATUS_ACK.getType())
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
+                .put(toUint32(ZMQSBSubscriberManager.getInstance().getControllerSourceId()))
+                .put(toUint8((short) nodeId.length()))
+                .put(nodeId.getBytes())
+                .put(toUint8((short) networkId.length()))
+                .put(networkId.getBytes());
+
+        log.info("send_status_ack: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
     @Override
     public void create_session(
-            Short topic_id,
+            byte topic_id,
             BigInteger imsi,
             Short default_ebi,
             Ip4Address ue_ipv4,
@@ -64,7 +149,7 @@
          */
         // TODO: check if subscriber is open.
         ByteBuffer bb = ByteBuffer.allocate(41)
-                .put(toUint8(topic_id))
+                .put(topic_id)
                 .put(s11MsgType.CREATE_SESSION.getType())
                 .put(toUint64(imsi))
                 .put(toUint8(default_ebi))
@@ -72,7 +157,7 @@
                 .put(toUint32(s1u_sgw_teid))
                 .put(toUint32(s1u_sgw_ipv4.toInt()))
                 .put(toUint64(BigInteger.valueOf(session_id)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
                 .put(toUint32(client_id))
                 .put(toUint32(op_id.longValue()));
 
@@ -82,7 +167,7 @@
 
     @Override
     public void modify_bearer(
-            Short topic_id,
+            byte topic_id,
             Ip4Address s1u_sgw_ipv4,
             Long s1u_enodeb_teid,
             Ip4Address s1u_enodeb_ipv4,
@@ -105,13 +190,13 @@
 		    } modify_bearer_msg;
          */
         ByteBuffer bb = ByteBuffer.allocate(32)
-                .put(toUint8(topic_id))
-                .put(s11MsgType.MODIFY_BEARER.getType())
+                .put(topic_id)
+                .put(s11MsgType.UPDATE_MODIFY_BEARER.getType())
                 .put(toUint32(s1u_sgw_ipv4.toInt()))
                 .put(toUint32(s1u_enodeb_teid))
                 .put(toUint32(s1u_enodeb_ipv4.toInt()))
                 .put(toUint64(BigInteger.valueOf(session_id)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
                 .put(toUint32(client_id))
                 .put(toUint32(op_id.longValue()));
 
@@ -121,7 +206,7 @@
 
     @Override
     public void delete_session(
-            Short topic_id,
+            byte topic_id,
             Long session_id,
             Long client_id,
             BigInteger op_id
@@ -138,10 +223,10 @@
 		    } delete_session_msg;
          */
         ByteBuffer bb = ByteBuffer.allocate(19)
-                .put(toUint8(topic_id))
+                .put(topic_id)
                 .put(s11MsgType.DELETE_SESSION.getType())
                 .put(toUint64(BigInteger.valueOf(session_id)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(ZMQSBSubscriberManager.getInstance().getControllerTopic())
                 .put(toUint32(client_id))
                 .put(toUint32(op_id.longValue()));
 
@@ -193,32 +278,133 @@
             bb.put(toUint8((short) sponsor_ID.length()))
                     .put(sponsor_ID.getBytes());
         }
-        bb.put(toUint8(ZMQSBSubscriberManager.getControllerTopic()));
+        bb.put(ZMQSBSubscriberManager.getInstance().getControllerTopic());
 
         log.info("send_ADC_rules: {}", bb.array());
         ZMQSBPublisherManager.getInstance().send(bb);
     }
 
-    /**
-     * Following the NGIC message types.
-     *
-     * This type structure is defined in NGIC at interface/zmq/zmqsub.h:51
-     */
-    enum s11MsgType {
-        CREATE_SESSION(1),
-        MODIFY_BEARER(2),
-        DELETE_SESSION(3),
-        DPN_RESPONSE(4),
-        DDN(5),
-        ASSIGN_TOPIC(10),
-        ASSIGN_CONFLICT(11),
-        DPN_STATUS_INDICATION(12),
-        DPN_STATUS_ACK(13),
-        CONTROLLER_STATUS_INDICATION(14),
-        ADC_RULE(17),
-        PCC_RULE(18),
-        METER_RULE(19),
-        SDF_RULE(20);
+    public enum s11MsgType {
+        CREATE_SESSION(1) {
+            @Override
+            public String toString() {
+                return "CREATE_SESSION";
+            }
+        },
+        UPDATE_MODIFY_BEARER(2) {
+            @Override
+            public String toString() {
+                return "UPDATE_MODIFY_BEARER";
+            }
+        },
+        DELETE_SESSION(3) {
+            @Override
+            public String toString() {
+                return "DELETE_SESSION";
+            }
+        },
+        DPN_RESPONSE(4) {
+            @Override
+            public String toString() {
+                return "DPN_RESPONSE";
+            }
+        },
+        DDN(5) {
+            @Override
+            public String toString() {
+                return "DDN";
+            }
+        },
+        DDN_ACK(5) {
+            @Override
+            public String toString() {
+                return "DDN_ACK";
+            }
+        },
+        RESERVED_1(7) {
+            @Override
+            public String toString() {
+                return "RESERVED_1";
+            }
+        },
+        RESERVED_2(8) {
+            @Override
+            public String toString() {
+                return "RESERVED_2";
+            }
+        },
+        RESERVED_3(9) {
+            @Override
+            public String toString() {
+                return "RESERVED_3";
+            }
+        },
+        ASSIGN_TOPIC(10) {
+            @Override
+            public String toString() {
+                return "ASSIGN_TOPIC";
+            }
+        },
+        ASSIGN_CONFLICT(11) {
+            @Override
+            public String toString() {
+                return "ASSIGN_CONFLICT";
+            }
+        },
+        DPN_STATUS_INDICATION(12) {
+            @Override
+            public String toString() {
+                return "DPN_STATUS_INDICATION";
+            }
+        },
+        DPN_STATUS_ACK(13) {
+            @Override
+            public String toString() {
+                return "DPN_STATUS_ACK";
+            }
+        },
+        CONTROLLER_STATUS_INDICATION(14) {
+            @Override
+            public String toString() {
+                return "CONTROLLER_STATUS_INDICATION";
+            }
+        },
+        GENERATE_CDR(15) {
+            @Override
+            public String toString() {
+                return "GENERATE_CDR";
+            }
+        },
+        GENERATE_CDR_ACK(16) {
+            @Override
+            public String toString() {
+                return "GENERATE_CDR_ACK";
+            }
+        },
+        ADC_RULE(17) {
+            @Override
+            public String toString() {
+                return "ADC_RULE";
+            }
+        },
+        PCC_RULE(18) {
+            @Override
+            public String toString() {
+                return "PCC_RULE";
+            }
+        },
+        METER_RULE(19) {
+            @Override
+            public String toString() {
+                return "METER_RULE";
+            }
+        },
+        SDF_RULE(20) {
+            @Override
+            public String toString() {
+                return "SDF_RULE";
+            }
+        };
 
         private byte type;
 
@@ -226,8 +412,140 @@
             this.type = (byte) type;
         }
 
+        public static s11MsgType getEnum(byte name) {
+            Optional<s11MsgType> any = Arrays.stream(s11MsgType.values())
+                    .filter(typeStr -> typeStr.type == name)
+                    .findAny();
+            if (any.isPresent()) {
+                return any.get();
+            }
+            throw new IllegalArgumentException("No enum defined for string: " + name);
+        }
+
         public byte getType() {
             return type;
         }
     }
+
+    public enum DpnStatusIndication {
+        HELLO(1) {
+            @Override
+            public String toString() {
+                return "HELLO";
+            }
+        },
+        GOODBYE(2) {
+            @Override
+            public String toString() {
+                return "GOODBYE";
+            }
+        },
+        OVERLOAD_START(3) {
+            @Override
+            public String toString() {
+                return "OVERLOAD_START";
+            }
+        },
+        OVERLOAD_STOP(4) {
+            @Override
+            public String toString() {
+                return "OVERLOAD_STOP";
+            }
+        },
+        MATERIAL_CHANGE(5) {
+            @Override
+            public String toString() {
+                return "MATERIAL_CHANGE";
+            }
+        },
+        RESTART(6) {
+            @Override
+            public String toString() {
+                return "RESTART";
+            }
+        },
+        OUT_OF_SERVICE(7) {
+            @Override
+            public String toString() {
+                return "OUT_OF_SERVICE";
+            }
+        };
+
+        private byte type;
+
+        DpnStatusIndication(int type) {
+            this.type = (byte) type;
+        }
+
+        public static DpnStatusIndication getEnum(byte name) {
+            Optional<DpnStatusIndication> any = Arrays.stream(DpnStatusIndication.values())
+                    .filter(typeStr -> typeStr.type == name)
+                    .findAny();
+            if (any.isPresent()) {
+                return any.get();
+            }
+            throw new IllegalArgumentException("No enum defined for string: " + name);
+        }
+
+        public byte getType() {
+            return type;
+        }
+    }
+
+    public enum ControllerStatusIndication {
+        HELLO(1) {
+            @Override
+            public String toString() {
+                return "HELLO";
+            }
+        },
+        GOODBYE(2) {
+            @Override
+            public String toString() {
+                return "GOODBYE";
+            }
+        };
+
+        private byte type;
+
+        ControllerStatusIndication(int type) {
+            this.type = (byte) type;
+        }
+
+        public byte getType() {
+            return type;
+        }
+    }
+
+    public enum ReservedTopics {
+        BROADCAST_ALL(1) {
+            @Override
+            public String toString() {
+                return "BROADCAST_ALL";
+            }
+        },
+        BROADCAST_CONTROLLERS(2) {
+            @Override
+            public String toString() {
+                return "BROADCAST_CONTROLLERS";
+            }
+        },
+        BROADCAST_DPNS(3) {
+            @Override
+            public String toString() {
+                return "BROADCAST_DPNS";
+            }
+        };
+
+        private byte type;
+
+        ReservedTopics(int type) {
+            this.type = (byte) type;
+        }
+
+        public byte getType() {
+            return type;
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
index 5b12e05..af706e6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
@@ -23,17 +23,17 @@
 
 public class DpnP4Communicator extends P4DpnControlProtocol implements DpnCommunicationService {
     @Override
-    public void create_session(Short topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+    public void create_session(byte topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
 
     }
 
     @Override
-    public void modify_bearer(Short topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+    public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
 
     }
 
     @Override
-    public void delete_session(Short topic_id, Long session_id, Long client_id, BigInteger op_id) {
+    public void delete_session(byte topic_id, Long session_id, Long client_id, BigInteger op_id) {
 
     }
 
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
new file mode 100644
index 0000000..21c1413
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.protocols;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
new file mode 100644
index 0000000..8d099f0
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnDeviceListener.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+public interface DpnDeviceListener {
+    void deviceAdded(String id, byte topic);
+
+    void deviceRemoved(String id);
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
new file mode 100644
index 0000000..d72991e
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+import org.apache.felix.scr.annotations.*;
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.*;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+
+import static org.onosproject.net.DeviceId.deviceId;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ *
+ */
+@Component(immediate = true)
+@Service
+public class DpnProvider extends AbstractProvider implements DpnProviderService {
+
+    private static final Logger log = getLogger(DpnProvider.class);
+    private final InternalDeviceListener listener = new InternalDeviceListener();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceProviderRegistry providerRegistry;
+
+    private DeviceProviderService providerService;
+
+    public DpnProvider() {
+        super(new ProviderId("fpc", "org.onosproject.providers.dpn"));
+    }
+
+    @Activate
+    public void activate() {
+        providerService = providerRegistry.register(this);
+        log.info("FPC Device Provider Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        providerRegistry.unregister(this);
+        providerService = null;
+        log.info("FPC Device Provider Stopped");
+    }
+
+    public InternalDeviceListener getListener() {
+        return listener;
+    }
+
+    @Override
+    public void triggerProbe(DeviceId deviceId) {
+
+    }
+
+    @Override
+    public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+
+    }
+
+    @Override
+    public boolean isReachable(DeviceId deviceId) {
+        return true;
+    }
+
+    @Override
+    public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
+
+    }
+
+    public class InternalDeviceListener implements DpnDeviceListener {
+
+        @Override
+        public void deviceAdded(String id, byte topic) {
+            DeviceId deviceId = deviceId("fpc:" + id);
+            ChassisId chassisId = new ChassisId(deviceId.hashCode());
+
+            Device.Type type = Device.Type.OTHER;
+            SparseAnnotations annotations = DefaultAnnotations.builder()
+                    .set(AnnotationKeys.NAME, id)
+                    .set(AnnotationKeys.PROTOCOL, "ZMQ")
+                    .set(AnnotationKeys.MANAGEMENT_ADDRESS, "127.0.0.1")
+                    .set("topic", String.valueOf(topic))
+                    .build();
+            DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "fpc", "0.1", "0.1", id, chassisId),
+                    description = new DefaultDeviceDescription(descriptionBase, annotations);
+
+            providerService.deviceConnected(deviceId, description);
+        }
+
+        @Override
+        public void deviceRemoved(String id) {
+            providerService.deviceDisconnected(deviceId("fpc:" + id));
+        }
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
new file mode 100644
index 0000000..515730a
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProviderService.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
+
+import org.onosproject.net.device.DeviceProvider;
+
+public interface DpnProviderService extends DeviceProvider {
+
+    DpnDeviceListener getListener();
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
new file mode 100644
index 0000000..7f99f9d
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.providers;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 0141b36..c3fa25a 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,58 +16,38 @@
 
 package org.onosproject.fpcagent.util;
 
-import com.google.common.collect.Maps;
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.config.Filter;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceStore;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
 import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
 import org.onosproject.yang.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.math.BigInteger;
-import java.util.*;
+import java.util.List;
+import java.util.Optional;
 
-import static org.onosproject.fpcagent.util.Converter.fromIntToLong;
-import static org.onosproject.fpcagent.util.Converter.toBigInt;
+import static org.onosproject.net.DeviceId.deviceId;
 
 /**
  * Helper class which stores all the static variables.
  */
 public class FpcUtil {
-    public static final int MAX_EVENTS = 1000;
-    public static final int MAX_BATCH_MS = 5000;
-    public static final int MAX_IDLE_MS = 1000;
-    public static final String TIMER = "dynamic-config-fpcagent-timer";
-    public static final String UNKNOWN_EVENT = "FPC Agent listener: unknown event: {}";
-    public static final String EVENT_NULL = "Event cannot be null";
     public static final String FPC_APP_ID = "org.onosproject.fpcagent";
     public static final FpcIdentity defaultIdentity = FpcIdentity.fromString("default");
     private static final Logger log = LoggerFactory.getLogger(FpcUtil.class);
-    private static final Map<String, FpcDpnId> uplinkDpnMap = Maps.newConcurrentMap();
-    private static final Map<String, Short> nodeToTopicMap = Maps.newConcurrentMap();
-    private static final byte DPN_HELLO = 0b0000_0001;
-    private static final byte DPN_BYE = 0b0000_0010;
-    private static final byte DOWNLINK_DATA_NOTIFICATION = 0b0000_0101;
-    private static final byte DPN_STATUS_INDICATION = 0b0000_1100;
-    private static final byte DPN_OVERLOAD_INDICATION = 0b0000_0101;
-    private static final byte DPN_REPLY = 0b0000_0100;
-    private static final String DOWNLINK_DATA_NOTIFICATION_STRING = "Downlink-Data-Notification";
+
     public static DynamicConfigService dynamicConfigService = null;
     public static ModelConverter modelConverter = null;
-    // Resource ID for Configure DPN RPC command
+    public static DeviceStore deviceStore = null;
+
     public static ResourceId configureDpn;
-    // Resource ID for Configure RPC command
     public static ResourceId configure;
-    // Resource ID for tenants data
     public static ResourceId tenants;
     public static ResourceId configureBundles;
     public static ResourceId registerClient;
@@ -188,79 +168,19 @@
     }
 
     /**
-     * Ensures the session id is an unsigned 64 bit integer
-     *
-     * @param sessionId - session id received from the DPN
-     * @return unsigned session id
-     */
-    private static BigInteger checkSessionId(BigInteger sessionId) {
-        if (sessionId.compareTo(BigInteger.ZERO) < 0) {
-            sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
-        }
-        return sessionId;
-    }
-
-    /**
-     * Decodes a DownlinkDataNotification
-     *
-     * @param buf - message buffer
-     * @param key - Concatenation of node id + / + network id
-     * @return DownlinkDataNotification or null if it could not be successfully decoded
-     */
-    private static DownlinkDataNotification processDDN(byte[] buf, String key) {
-        DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
-        ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
-        ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
-        ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
-        ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
-        ddnB.notificationDpnId(uplinkDpnMap.get(key));
-        return ddnB;
-    }
-
-    /**
-     * Decodes a DPN message.
-     *
-     * @param buf - message buffer
-     * @return - A pair with the DPN Id and decoded Object
-     */
-    public static Map.Entry<FpcDpnId, Object> decode(byte[] buf) {
-        if (buf[1] == DPN_REPLY) {
-            return null;
-        } else if (buf[1] == DOWNLINK_DATA_NOTIFICATION) {
-            short nodeIdLen = buf[18];
-            short networkIdLen = buf[19 + nodeIdLen];
-            String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
-            return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
-        } else if (buf[1] == DPN_STATUS_INDICATION) {
-            DPNStatusIndication.Status status = null;
-
-            short nodeIdLen = buf[8];
-            short networkIdLen = buf[9 + nodeIdLen];
-            String key = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
-            if (buf[3] == DPN_OVERLOAD_INDICATION) {
-                status = DPNStatusIndication.Status.OVERLOAD_INDICATION;
-            } else if (buf[3] == DPN_HELLO) {
-                status = DPNStatusIndication.Status.HELLO;
-                log.info("Hello {} on topic {}", key, buf[2]);
-                nodeToTopicMap.put(key, (short) buf[2]);
-            } else if (buf[3] == DPN_BYE) {
-                status = DPNStatusIndication.Status.BYE;
-                log.info("Bye {}", key);
-                nodeToTopicMap.remove(key);
-            }
-            return new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), new DPNStatusIndication(status, key));
-        }
-        return null;
-    }
-
-    /**
      * Gets the mapping for node id / network id to ZMQ Topic
      *
-     * @param Key - Concatenation of node id + / + network id
+     * @param key - Concatenation of node id + / + network id
      * @return - ZMQ Topic
      */
-    public static Short getTopicFromNode(String Key) {
-        return nodeToTopicMap.get(Key);
+    public static byte getTopicFromNode(String key) {
+        // TODO add cache
+        Device device = deviceStore.getDevice(deviceId("fpc:" + key));
+        if (device != null) {
+            String topic = device.annotations().value("topic");
+            return Byte.parseByte(topic);
+        }
+        return -1;
     }
 
     /**
@@ -349,55 +269,4 @@
                 node -> dynamicConfigService.updateNode(dataNode.resourceId(), node)
         );
     }
-
-    /**
-     * Provides basic status changes,
-     */
-    public static class DPNStatusIndication {
-        private final Status status;
-        private final String key; //nodeId +"/"+ networkId
-        /**
-         * Node Reference of the DPN
-         */
-        public Short nodeRef;
-
-        /**
-         * Constructor providing the DPN and its associated Status.
-         *
-         * @param status - DPN Status
-         * @param key    - Combination of node id and network id
-         */
-        public DPNStatusIndication(Status status,
-                                   String key) {
-            this.status = status;
-            this.key = key;
-        }
-
-        /**
-         * Provides DPN Status
-         *
-         * @return Status associated to the DPN.
-         */
-        public Status getStatus() {
-            return status;
-        }
-
-        /**
-         * Provides the DPN key - nodeId +"/"+ networkId
-         *
-         * @return FpcDpnId
-         */
-        public String getKey() {
-            return this.key;
-        }
-
-        /**
-         * Basic DPN Status
-         */
-        public enum Status {
-            HELLO,
-            BYE,
-            OVERLOAD_INDICATION
-        }
-    }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
new file mode 100644
index 0000000..b4b4877
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.util;
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index f137693..5cd5028 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,70 +1,56 @@
 package org.onosproject.fpcagent.workers;
 
+import javafx.util.Pair;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.util.FpcUtil;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZContext;
 import org.zeromq.ZMQ;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.onosproject.fpcagent.util.Converter.*;
+import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
+import static org.onosproject.fpcagent.util.Converter.toInt;
 
 public class ZMQSBSubscriberManager implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(ZMQSBSubscriberManager.class);
     private static int MIN_TOPIC_VAL = 4;
     private static int MAX_TOPIC_VAL = 255;
-    private static byte ASSIGN_ID = 0b0000_1010;
-    private static byte ASSIGN_CONFLICT = 0b0000_1011;
-    private static byte HELLO_REPLY = 0b0000_1101;
-    private static byte CONTROLLER_STATUS_INDICATION = 0b0000_1110;
-    private static byte HELLO = 0b0000_0001;
-    private static byte GOODBYE = 0b0000_0010;
     private static ZMQSBSubscriberManager _instance = null;
-    private static Long controllerSourceId;
-    private static Short subscriberId;
     private final String address;
-    private final Short broadcastAllId;
-    private final Short broadcastControllersId;
-    private final Short broadcastDpnsId;
     private final String nodeId;
     private final String networkId;
+    private Long controllerSourceId;
+    private byte controllerTopic;
     private boolean run;
     private boolean conflictingTopic;
+
     private Future<?> broadcastAllWorker;
     private Future<?> broadcastControllersWorker;
     private Future<?> broadcastTopicWorker;
     private Future<?> generalWorker;
 
-    protected ZMQSBSubscriberManager(String address, String broadcastAllId, String broadcastControllersId,
-                                     String broadcastDpnsId, String nodeId, String networkId) {
+    private DpnDeviceListener dpnDeviceListener;
+
+    protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener) {
         this.address = address;
         this.run = true;
-
         this.nodeId = nodeId;
         this.networkId = networkId;
-        this.broadcastAllId = Short.parseShort(broadcastAllId);
-        this.broadcastControllersId = Short.parseShort(broadcastControllersId);
-        this.broadcastDpnsId = Short.parseShort(broadcastDpnsId);
-
         this.conflictingTopic = false;
-        controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
+        this.controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
+        this.dpnDeviceListener = dpnDeviceListener;
     }
 
-    public static ZMQSBSubscriberManager createInstance(String address, String broadcastAllId,
-                                                        String broadcastControllersId, String broadcastDpnsId,
-                                                        String nodeId, String networkId) {
+    public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService) {
         if (_instance == null) {
-            _instance = new ZMQSBSubscriberManager(address, broadcastAllId, broadcastControllersId,
-                    broadcastDpnsId, nodeId, networkId);
+            _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService);
         }
         return _instance;
     }
@@ -73,29 +59,28 @@
         return _instance;
     }
 
-    public static Short getControllerTopic() {
-        return subscriberId;
+    public byte getControllerTopic() {
+        return controllerTopic;
     }
 
-    public static Long getControllerSourceId() {
+    public Long getControllerSourceId() {
         return controllerSourceId;
     }
 
     public void open() {
-        short subscriberId = (short) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
-
         broadcastAllWorker = Executors.newSingleThreadExecutor()
-                .submit(new ZMQSubscriberWorker(broadcastAllId));
+                .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_ALL.getType()));
 
         broadcastControllersWorker = Executors.newSingleThreadExecutor()
-                .submit(new ZMQSubscriberWorker(broadcastControllersId));
+                .submit(new ZMQSubscriberWorker(ReservedTopics.BROADCAST_CONTROLLERS.getType()));
 
         broadcastTopicWorker = Executors.newSingleThreadExecutor()
-                .submit(new BroadcastTopic(subscriberId));
+                .submit(new AssignTopic());
     }
 
     @Override
     public void close() {
+        send_goodbye_dpns(nodeId, networkId);
         run = false;
     }
 
@@ -105,33 +90,14 @@
      * @param conflict - Flag to indicate conflict
      * @param subId    - Topic Id that caused the conflict
      */
-    protected void BroadcastAllSubIdCallBack(boolean conflict, Short subId) {
-        if (conflict && subscriberId.equals(subId)) {
+    protected void BroadcastAllSubIdCallBack(boolean conflict, byte subId) {
+        if (conflict && controllerTopic == subId) {
             this.conflictingTopic = true;
             broadcastTopicWorker.cancel(true);
         }
     }
 
     /**
-     * Broadcasts the GOODBYE message to all the DPNs
-     */
-    public void sendGoodbyeToDpns() {
-        ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
-        bb.put(toUint8(broadcastDpnsId))
-                .put(CONTROLLER_STATUS_INDICATION)
-                .put(toUint8(subscriberId))
-                .put(GOODBYE)
-                .put(toUint32(controllerSourceId))
-                .put(toUint8((short) nodeId.length()))
-                .put(nodeId.getBytes())
-                .put(toUint8((short) networkId.length()))
-                .put(networkId.getBytes());
-
-        log.info("sendGoodbyeToDpns: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
      * Broadcasts an Assign Conflict message
      *
      * @param contents - byte array received over the southbound.
@@ -143,81 +109,112 @@
         String node_id = new String(Arrays.copyOfRange(contents, 8, 8 + nodeIdLen));
         String network_id = new String(Arrays.copyOfRange(contents, 9 + nodeIdLen, 9 + nodeIdLen + networkIdLen));
 
-        if (toUint8(subscriberId) == topic || (nodeId.equals(node_id) && networkId.equals(network_id))) {
-            ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
-            bb.put(toUint8(broadcastAllId))
-                    .put(ASSIGN_CONFLICT)
-                    .put(topic)
-                    .put(toUint32(controllerSourceId))
-                    .put(toUint8((short) nodeId.length()))
-                    .put(nodeId.getBytes())
-                    .put(toUint8((short) networkId.length()))
-                    .put(networkId.getBytes());
-
-            log.info("SendAssignConflictMessage: {}", bb.array());
-            ZMQSBPublisherManager.getInstance().send(bb);
+        if (controllerTopic == topic || (nodeId.equals(node_id) && networkId.equals(network_id))) {
+            send_assign_conflict(nodeId, networkId);
         }
     }
 
     protected class ZMQSubscriberWorker implements Runnable {
-        private final Short subscriberId;
+        private final byte subscribedTopic;
         private ZContext ctx;
 
-        ZMQSubscriberWorker(Short subscriberId) {
-            this.subscriberId = subscriberId;
+        ZMQSubscriberWorker(byte subscribedTopic) {
+            this.subscribedTopic = subscribedTopic;
             this.ctx = new ZContext();
         }
 
-        /**
-         * Sends a reply to a DPN Hello
-         *
-         * @param dpnStatus - DPN Status Indication message received from the DPN
-         */
-        protected void sendHelloReply(FpcUtil.DPNStatusIndication dpnStatus) {
-            if (FpcUtil.getTopicFromNode(dpnStatus.getKey()) != null) {
-                ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
-                        .put(toUint8(FpcUtil.getTopicFromNode(dpnStatus.getKey())))
-                        .put(HELLO_REPLY)
-                        .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
-                        .put(toUint32(ZMQSBSubscriberManager.getControllerSourceId()))
-                        .put(toUint8((short) nodeId.length()))
-                        .put(nodeId.getBytes())
-                        .put(toUint8((short) networkId.length()))
-                        .put(networkId.getBytes());
+//        /**
+//         * Ensures the session id is an unsigned 64 bit integer
+//         *
+//         * @param sessionId - session id received from the DPN
+//         * @return unsigned session id
+//         */
+//        private static BigInteger checkSessionId(BigInteger sessionId) {
+//            if (sessionId.compareTo(BigInteger.ZERO) < 0) {
+//                sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
+//            }
+//            return sessionId;
+//        }
+//
+//        /**
+//         * Decodes a DownlinkDataNotification
+//         *
+//         * @param buf - message buffer
+//         * @param key - Concatenation of node id + / + network id
+//         * @return DownlinkDataNotification or null if it could not be successfully decoded
+//         */
+//        private static DownlinkDataNotification processDDN(byte[] buf, String key) {
+//            DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
+//            ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
+//            ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
+//            ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+//            ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+//            ddnB.notificationDpnId(uplinkDpnMap.get(key));
+//            return ddnB;
+//        }
 
-                log.info("sendHelloReply: {}", bb.array());
-                ZMQSBPublisherManager.getInstance().send(bb);
+        public Pair<Object, Object> decode(byte[] buf) {
+            s11MsgType type;
+            type = s11MsgType.getEnum(buf[1]);
+            if (type.equals(s11MsgType.DDN)) {
+                short nodeIdLen = buf[18];
+                short networkIdLen = buf[19 + nodeIdLen];
+                String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
+//                return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
+            } else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
+                DpnStatusIndication status;
+
+                short nodeIdLen = buf[8];
+                short networkIdLen = buf[9 + nodeIdLen];
+                String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
+
+                status = DpnStatusIndication.getEnum(buf[3]);
+                if (status.equals(DpnStatusIndication.HELLO)) {
+                    log.info("Hello {} on topic {}", deviceId, buf[2]);
+
+                    dpnDeviceListener.deviceAdded(deviceId, buf[2]);
+                } else if (status.equals(DpnStatusIndication.GOODBYE)) {
+                    log.info("Bye {}", deviceId);
+                    dpnDeviceListener.deviceRemoved(deviceId);
+                }
+                return new Pair<>(status, deviceId);
             }
+            return null;
         }
 
         @Override
         public void run() {
             ZMQ.Socket subscriber = this.ctx.createSocket(ZMQ.SUB);
             subscriber.connect(address);
-            subscriber.subscribe(new byte[]{toUint8(subscriberId)});
-            log.debug("Subscriber at {} / {}", address, subscriberId);
+            subscriber.subscribe(new byte[]{subscribedTopic});
+            log.debug("Subscriber at {} / {}", address, subscribedTopic);
             while ((!Thread.currentThread().isInterrupted()) && run) {
                 byte[] contents = subscriber.recv();
                 byte topic = contents[0];
-                byte messageType = contents[1];
-                log.debug("Received {}", contents);
+                s11MsgType messageType = s11MsgType.getEnum(contents[1]);
+                log.info("Received {}", messageType);
                 switch (topic) {
                     case 1:
-                        if (messageType == ASSIGN_CONFLICT && toInt(contents, 3) != controllerSourceId) {
-                            BroadcastAllSubIdCallBack(true, (short) contents[2]);
-                        } else if (messageType == ASSIGN_ID && toInt(contents, 3) != controllerSourceId) {
+                        if (messageType.equals(s11MsgType.ASSIGN_CONFLICT) &&
+                                toInt(contents, 3) != controllerSourceId) {
+                            BroadcastAllSubIdCallBack(true, contents[2]);
+                        } else if (messageType.equals(s11MsgType.ASSIGN_TOPIC) &&
+                                toInt(contents, 3) != controllerSourceId) {
                             SendAssignConflictMessage(contents);
                         }
                         break;
                     default:
-                        Map.Entry<FpcDpnId, Object> entry = FpcUtil.decode(contents);
-                        if (entry != null) {
-                            if (entry.getValue() instanceof DownlinkDataNotification) {
+                        Pair msg = decode(contents);
+                        if (msg != null) {
+                            Object key = msg.getKey();
+                            if (key instanceof DownlinkDataNotification) {
                                 // TODO handle DL notification
-                            } else if (entry.getValue() instanceof FpcUtil.DPNStatusIndication) {
-                                FpcUtil.DPNStatusIndication dpnStatus = (FpcUtil.DPNStatusIndication) entry.getValue();
-                                if (dpnStatus.getStatus() == FpcUtil.DPNStatusIndication.Status.HELLO) {
-                                    sendHelloReply(dpnStatus);
+                            } else if (key instanceof DpnStatusIndication) {
+                                if (key.equals(DpnStatusIndication.HELLO)) {
+                                    byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());
+                                    if (dpnTopic != -1) {
+                                        send_status_ack(nodeId, networkId, dpnTopic);
+                                    }
                                 }
                             }
                         }
@@ -237,75 +234,31 @@
     /**
      * Class to broadcast a topic for the controller
      */
-    protected class BroadcastTopic implements Runnable {
-        private Short topic;
+    protected class AssignTopic implements Runnable {
+        private byte topic;
 
-        /**
-         * Constructor
-         *
-         * @param topic - Topic to broadcast
-         */
-        public BroadcastTopic(Short topic) {
-            this.topic = topic;
-        }
-
-        /**
-         * Broadcasts the topic
-         */
-        private void broadcastTopic() {
-            ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length());
-            bb.put(toUint8(broadcastAllId))
-                    .put(ASSIGN_ID)
-                    .put(toUint8(this.topic))
-                    .put(toUint32(controllerSourceId))
-                    .put(toUint8((short) nodeId.length()))
-                    .put(nodeId.getBytes())
-                    .put(toUint8((short) networkId.length()))
-                    .put(networkId.getBytes());
-
-            log.info("broadcastTopic: {}", bb.array());
-            ZMQSBPublisherManager.getInstance().send(bb);
-        }
-
-        /**
-         * Broadcasts the HELLO message to all the DPNs
-         */
-        private void sendHelloToDpns() {
-            ByteBuffer bb = ByteBuffer.allocate(10 + nodeId.length() + networkId.length());
-            bb.put(toUint8(broadcastDpnsId))
-                    .put(CONTROLLER_STATUS_INDICATION)
-                    .put(toUint8(subscriberId))
-                    .put(HELLO)
-                    .put(toUint32(controllerSourceId))
-                    .put(toUint8((short) nodeId.length()))
-                    .put(nodeId.getBytes())
-                    .put(toUint8((short) networkId.length()))
-                    .put(networkId.getBytes());
-
-
-            log.info("sendHelloToDpns: {}", bb.array());
-            ZMQSBPublisherManager.getInstance().send(bb);
+        public AssignTopic() {
+            this.topic = (byte) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
         }
 
         @Override
         public void run() {
             try {
-                this.broadcastTopic();
+                send_assign_topic(nodeId, networkId, this.topic);
                 log.debug("Thread sleeping: " + Thread.currentThread().getName());
-                Thread.sleep(2000);
+                Thread.sleep(2000); // wait 10 sec before assigning topic
             } catch (InterruptedException e) {
                 if (conflictingTopic) {
                     conflictingTopic = false;
-                    this.topic = (short) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
-                    subscriberId = this.topic;
+                    this.topic = (byte) ThreadLocalRandom.current().nextInt(MIN_TOPIC_VAL, MAX_TOPIC_VAL + 1);
+                    controllerTopic = this.topic;
                     this.run();
                     return;
                 } else {
                     log.error(ExceptionUtils.getFullStackTrace(e));
                 }
             }
-            subscriberId = this.topic;
-            log.info("Topic Id: " + this.topic);
+            controllerTopic = this.topic;
             generalWorker = Executors.newSingleThreadExecutor().submit(new ZMQSubscriberWorker(this.topic));
 
             try {
@@ -313,7 +266,8 @@
             } catch (InterruptedException e) {
                 log.error(ExceptionUtils.getFullStackTrace(e));
             }
-            sendHelloToDpns();
+
+            send_hello_dpns(nodeId, networkId);
         }
 
     }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
new file mode 100644
index 0000000..f6a5857
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.workers;
\ No newline at end of file
diff --git a/scripts/benchmark.sh b/scripts/benchmark.sh
index 0e2bbc5..a5772f5 100755
--- a/scripts/benchmark.sh
+++ b/scripts/benchmark.sh
@@ -8,9 +8,6 @@
 echo "Running 100 configure create.."
 for (( i=1; i<=100; i++)); do
 	./configure.sh create $i 1 &> /dev/null &
-	if ! (($i % 10)); then
-		wait
-	fi
 done
 
 wait
@@ -18,11 +15,10 @@
 echo "Running 100 configure delete.."
 for (( i=1; i<=100; i++)); do
 	./configure.sh delete $i &> /dev/null &
-	if ! (($i % 10)); then
-		wait
-	fi
 done
 
+wait 
+
 echo "Delete DPN.."
 ./deleteDPN.sh 1 &> /dev/null
 echo "Deregister Client.."