Merge branch 'master' of ssh://gerrit.opencord.org:29418/fpcagent
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index a44e12b..a5a456f 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -30,7 +30,6 @@
 import org.onosproject.fpcagent.util.eventStream.JettyServer;
 import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
 import org.onosproject.fpcagent.util.eventStream.ParseStream;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
 import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
 import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
 import org.onosproject.net.config.*;
@@ -95,8 +94,6 @@
     private FpcConfig fpcConfig;
     private boolean started = false;
     private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
-    private Thread jettyServer;
-    private Thread parseStream;
 
     /* Config */
     private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -120,7 +117,6 @@
         JettyServer.createInstance().open();
         ParseStream.createInstance().open();
         NBEventWorkerManager.createInstance(20, restconfService).open();
-        HTTPNotifier.createInstance().open();
 
         log.info("FPC Service Started");
     }
@@ -138,7 +134,6 @@
         }
 
         NBEventWorkerManager.getInstance().close();
-        HTTPNotifier.getInstance().close();
         ParseStream.getInstance().close();
         JettyServer.getInstance().close();
 
@@ -156,7 +151,8 @@
                             helper.dpnSubscriberUri(),
                             helper.nodeId(),
                             helper.networkId(),
-                            dpnProviderService.getListener()
+                            dpnProviderService.getListener(),
+                            notificationIds
                     );
 
                     ZMQSBPublisherManager.createInstance(
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index dfa7c73..cef7592 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -260,7 +260,7 @@
                 Instr3GppMob instr3GppMob = (Instr3GppMob) instructions.instrType();
                 String commands = Bits.toString(instr3GppMob.instr3GppMob().bits());
 
-                log.info("handling configure create event {} for {}", commands, dpn.dpnId());
+                log.debug("handling configure create event {} for {}", commands, dpn.dpnId());
 
                 Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
                         s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
@@ -419,7 +419,7 @@
                 Instr3GppMob instr3GppMob = (Instr3GppMob) instructions.instrType();
                 String commands = Bits.toString(instr3GppMob.instr3GppMob().bits());
 
-                log.info("handling configure update event {} for {}", commands, dpn.dpnId());
+                log.debug("handling configure update event {} for {}", commands, dpn.dpnId());
 
                 Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
                         s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
@@ -527,7 +527,7 @@
                 throw new RuntimeException("Context doesn't exist. Please issue create operation..");
             }
 
-            log.info("handling configure-delete {}", targetStr);
+            log.debug("handling configure-delete {}", targetStr);
 
             DefaultContexts context = defaultContexts.get();
             for (Dpns dpn : context.dpns()) {
@@ -764,7 +764,6 @@
             try {
                 for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
                     DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
-                    log.info("configure event {}", input.clientId());
                     if (!clientInfo.containsKey(input.clientId())) {
                         throw new RuntimeException("Client Identifier is not registered.");
                     }
@@ -824,7 +823,6 @@
             try {
                 for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
                     DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
-                    log.info("configure-bundles event {}", input.clientId());
                     if (!clientInfo.containsKey(input.clientId())) {
                         throw new RuntimeException("Client Identifier is not registered.");
                     }
@@ -919,7 +917,6 @@
             try {
                 for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
                     DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
-                    log.info("register-client event {}", input.clientId());
                     if (clientInfo.containsKey(input.clientId())) {
                         throw new RuntimeException("Client already registered.");
                     }
@@ -975,7 +972,6 @@
             try {
                 for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
                     DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
-                    log.info("deregister-client event {}", input.clientId());
                     if (!clientInfo.containsKey(input.clientId())) {
                         throw new RuntimeException("Client does not exist.");
                     }
@@ -1053,61 +1049,62 @@
                                             }
 
                                             notify.value(availability);
-                                            clients.forEach(client -> sendNotification(notify, client));
+                                            clients.forEach(client -> sendNotification(client.toString(), notify));
                                         }
                                 );
                             } catch (ExecutionException e) {
                                 log.error(ExceptionUtils.getFullStackTrace(e));
                             }
                         });
-            } else if (event.subject().manufacturer().equals("cp")) {
-                String clientId = event.subject().annotations().value("client-id");
-                if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
-                    DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
-
-                    Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
-                    if (defaultTenant.isPresent()) {
-                        DefaultTenant tenant = defaultTenant.get();
-                        if (tenant.fpcTopology().dpns() != null) {
-                            tenant.fpcTopology().dpns().forEach(
-                                    dpn -> {
-                                        DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
-                                        notify.notificationId(NotificationId.of(notificationIds.getNewId()));
-
-                                        notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
-                                        DefaultDpnAvailability availability = new DefaultDpnAvailability();
-                                        availability.availabilityMessageType("Dpn-Availability");
-                                        availability.dpnId(dpn.dpnId());
-                                        availability.dpnGroups(dpn.dpnGroups());
-                                        availability.controlProtocol(dpn.controlProtocol());
-                                        availability.networkId(dpn.networkId());
-                                        availability.nodeId(dpn.nodeId());
-                                        availability.dpnName(dpn.dpnName());
-
-                                        switch (event.type()) {
-                                            case DEVICE_UPDATED:
-                                            case DEVICE_ADDED: {
-                                                availability.dpnStatus(DpnStatusEnum.AVAILABLE);
-                                                break;
-                                            }
-                                            case DEVICE_AVAILABILITY_CHANGED:
-                                            case DEVICE_REMOVED: {
-                                                availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
-                                                break;
-                                            }
-                                            default:
-                                                throw new RuntimeException("Unknown Device case.");
-                                        }
-
-                                        notify.value(availability);
-                                        // sendNotification(notify, clientInput.clientId());
-                                    }
-                            );
-
-                        }
-                    }
-                }
             }
+//            else if (event.subject().manufacturer().equals("cp")) {
+//                String clientId = event.subject().annotations().value("client-id");
+//                if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
+//                    DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
+//
+//                    Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
+//                    if (defaultTenant.isPresent()) {
+//                        DefaultTenant tenant = defaultTenant.get();
+//                        if (tenant.fpcTopology().dpns() != null) {
+//                            tenant.fpcTopology().dpns().forEach(
+//                                    dpn -> {
+//                                        DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+//                                        notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+//
+//                                        notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+//                                        DefaultDpnAvailability availability = new DefaultDpnAvailability();
+//                                        availability.availabilityMessageType("Dpn-Availability");
+//                                        availability.dpnId(dpn.dpnId());
+//                                        availability.dpnGroups(dpn.dpnGroups());
+//                                        availability.controlProtocol(dpn.controlProtocol());
+//                                        availability.networkId(dpn.networkId());
+//                                        availability.nodeId(dpn.nodeId());
+//                                        availability.dpnName(dpn.dpnName());
+//
+//                                        switch (event.type()) {
+//                                            case DEVICE_UPDATED:
+//                                            case DEVICE_ADDED: {
+//                                                availability.dpnStatus(DpnStatusEnum.AVAILABLE);
+//                                                break;
+//                                            }
+//                                            case DEVICE_AVAILABILITY_CHANGED:
+//                                            case DEVICE_REMOVED: {
+//                                                   availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+//                                                break;
+//                                            }
+//                                            default:
+//                                                throw new RuntimeException("Unknown Device case.");
+//                                        }
+//
+//                                        notify.value(availability);
+//                                        sendNotification(clientInput.clientId().toString(), notify);
+//                                    }
+//                            );
+//
+//                        }
+//                    }
+//                }
+//            }
         }
     }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index e711024..18a38c9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,17 +16,20 @@
 
 package org.onosproject.fpcagent.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.config.Filter;
 import org.onosproject.core.IdGenerator;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
+import org.onosproject.fpcagent.util.eventStream.ConfigureService;
 import org.onosproject.net.Device;
 import org.onosproject.net.device.DeviceStore;
+import org.onosproject.restconf.utils.RestconfUtils;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
 import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
@@ -294,21 +297,26 @@
         );
     }
 
-    /**
-     * Sends a HTTP Post request to specified client.
-     *
-     * @param notify message
-     * @param client HTTP client to receive message
-     */
-    public static void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
-        if (clientInfo.keySet().contains(client)) {
-            log.info("Sending HTTP notification {} to {}", notify, client);
-            HTTPNotifier.getInstance().send(
-                    new AbstractMap.SimpleEntry<>(
-                            clientInfo.get(client).endpointUri().toString(),
-                            notify
+    public static void sendNotification(String clientId, ModelObject notify) {
+        try {
+            ResourceData dataNode = modelConverter.createDataNode(
+                    DefaultModelObjectData.builder()
+                            .addModelObject(notify)
+                            .build()
+            );
+            ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+            ObjectMapper mapper = new ObjectMapper();
+
+            String value = mapper.writeValueAsString(jsonNodes);
+
+            ConfigureService.notificationQueue.add(
+                    new AbstractMap.SimpleEntry(
+                            clientId,
+                            "event:application/json;/notification\ndata:" + value + "\r\n"
                     )
             );
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
         }
     }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
index 2332bb3..b9aa3a5 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
@@ -29,23 +29,28 @@
 
     private static final Logger log = LoggerFactory.getLogger(ConfigureService.class);
     //key = client-id, value = configure-output
-    public static BlockingQueue<Map.Entry<String, String>> configureQueue = new LinkedBlockingQueue<>();
-    public static BlockingQueue<Map.Entry<String, String>> registerClientQueue = new LinkedBlockingQueue<>();
+    public static BlockingQueue<Map.Entry<String, String>> responseQueue = new LinkedBlockingQueue<>();
+    //key = client id, value = config-result-notification
+    public static BlockingQueue<Map.Entry<String, String>> notificationQueue = new LinkedBlockingQueue<>();
 
     public void contextInitialized(ServletContextEvent sce) {
         Map<String, AsyncContext> responseStreams = new ConcurrentHashMap<>();
+        Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<>();
+
+        sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
         sce.getServletContext().setAttribute("responseStreams", responseStreams);
+
         Executors.newSingleThreadExecutor().submit(() -> {
             String clientUri = null;
             AsyncContext asyncContext = null;
             while (true) {
                 try {
-                    Map.Entry<String, String> entry = configureQueue.take();
+                    Map.Entry<String, String> entry = responseQueue.take();
                     clientUri = entry.getKey();
                     String notify = entry.getValue();
-
                     asyncContext = responseStreams.get(clientUri);
                     if (responseStreams.get(clientUri) != null) {
+                        log.info("Sending on /response {}", notify);
                         ServletOutputStream out = asyncContext.getResponse().getOutputStream();
                         out.write(notify.getBytes());
                         out.flush();
@@ -61,23 +66,26 @@
         });
 
         Executors.newSingleThreadExecutor().submit(() -> {
-            String clientUri = null;
+            String clientId = null;
             AsyncContext asyncContext = null;
             while (true) {
                 try {
-                    Map.Entry<String, String> entry = registerClientQueue.take();
-                    clientUri = entry.getKey();
-                    asyncContext = responseStreams.get(clientUri);
-                    if (responseStreams.get(entry.getKey()) != null) {
+                    Map.Entry<String, String> entry = notificationQueue.take();
+                    clientId = entry.getKey();
+                    String notify = entry.getValue();
+                    asyncContext = notificationStreams.get(clientId);
+                    log.info("clientId {}, streams {}", clientId, notificationStreams.keySet());
+                    if (notificationStreams.get(clientId) != null) {
+                        log.info("Sending on /notification {}", notify);
                         ServletOutputStream out = asyncContext.getResponse().getOutputStream();
-                        out.write(entry.getValue().getBytes());
+                        out.write(notify.getBytes());
                         out.flush();
                         asyncContext.getResponse().flushBuffer();
                     }
                 } catch (Exception e) {
                     log.error(ExceptionUtils.getFullStackTrace(e));
                     asyncContext.complete();
-                    responseStreams.remove(clientUri);
+                    notificationStreams.remove(clientId);
                     break;
                 }
             }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index e6d11d1..5b243f6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -53,7 +53,6 @@
                 server.setHandler(context);
 
                 context.addEventListener(new ConfigureService());
-                context.addEventListener(new NotificationService());
 
                 try {
                     server.start();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 72f283c..879f989 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -73,7 +73,7 @@
 
                     // call RESTCONF service
                     ObjectMapper mapper = new ObjectMapper();
-                    log.info("NB Event Contents: {}", contents.getValue().getValue());
+                    log.debug("NB Event Contents: {}", contents.getValue().getValue());
                     ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
                     if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
                         CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
@@ -82,7 +82,7 @@
                                 contents.getKey()
                         );
                         String replace = output.get().output().toString().replace("ietf-dmm-fpcagent:output", "fpc:output");
-                        ConfigureService.configureQueue.add(
+                        ConfigureService.responseQueue.add(
                                 new AbstractMap.SimpleEntry(
                                         contents.getKey(),
                                         "event:application/json;/onos/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + replace + "\r\n"
@@ -95,7 +95,7 @@
                                 contents.getKey()
                         );
                         addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
-                        ConfigureService.registerClientQueue.add(
+                        ConfigureService.responseQueue.add(
                                 new AbstractMap.SimpleEntry(
                                         contents.getKey(),
                                         "event:application/json;/onos/restconf/operations/fpc:register-client\ndata:" + output.get().output().toString() + "\r\n"
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
index e448324..28ba759 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
@@ -31,18 +31,6 @@
     private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
 
     @Override
-    protected void doGet(HttpServletRequest request, HttpServletResponse response) {
-        try {
-            response.setContentType("text/html");
-            response.setStatus(HttpServletResponse.SC_OK);
-            response.getWriter().println("<h1>Hello World</h1>");
-            response.getWriter().println("session=" + request.getSession(true).getId());
-        } catch (IOException e) {
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-    }
-
-    @Override
     protected void doPost(HttpServletRequest request, HttpServletResponse response) {
         log.info("Notification stream Inititated");
         String clientId = null;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
deleted file mode 100644
index 13edc1e..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.onosproject.fpcagent.util.eventStream;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.AsyncContext;
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-import javax.servlet.ServletOutputStream;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-
-/**
- * A servlet context listener that sends event-data pairs for config result notifications
- */
-public class NotificationService implements ServletContextListener {
-    private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
-
-    //key = client id, value = config-result-notification
-    public static BlockingQueue<Map.Entry<String, String>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, String>>();
-
-    public void contextInitialized(ServletContextEvent sce) {
-        Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<String, AsyncContext>();
-        sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
-        Thread thread = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                String clientId = null;
-                AsyncContext asyncContext = null;
-                while (true) {
-                    try {
-                        Map.Entry<String, String> entry = blockingQueue.take();
-                        clientId = entry.getKey();
-                        asyncContext = notificationStreams.get(clientId);
-                        if (notificationStreams.get(entry.getKey()) != null) {
-                            ServletOutputStream out = asyncContext.getResponse().getOutputStream();
-                            out.write(entry.getValue().getBytes());
-                            out.flush();
-                            asyncContext.getResponse().flushBuffer();
-                        }
-                    } catch (Exception e) {
-                        log.error(ExceptionUtils.getFullStackTrace(e));
-                        asyncContext.complete();
-                        notificationStreams.remove(clientId);
-                        break;
-                    }
-                }
-            }
-
-        });
-        thread.start();
-
-    }
-
-    public void contextDestroyed(ServletContextEvent sce) {
-    }
-}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
similarity index 97%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
index 7736d51..9715053 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
@@ -25,8 +25,8 @@
 /**
  * A HTTP client that sends a request to a FPC Client to initiate the request stream.
  */
-public class ResponseService {
-    private static final Logger log = LoggerFactory.getLogger(ResponseService.class);
+public class RequestServer {
+    private static final Logger log = LoggerFactory.getLogger(RequestServer.class);
 
     private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
     protected String clientUri;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
index 6189501..2218f1b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
@@ -44,7 +44,7 @@
         try {
             HttpSession session = request.getSession();
             session.setMaxInactiveInterval(72 * 60 * 60);
-            ResponseService client = new ResponseService();
+            RequestServer client = new RequestServer();
             client.connectToClient(clientUri);
             response.setHeader("Content-Type", "text/event-stream");
             response.setHeader("Cache-Control", "no-cache, no-store");
@@ -66,18 +66,6 @@
     }
 
     @Override
-    protected void doGet(HttpServletRequest request, HttpServletResponse response) {
-        try {
-            response.setContentType("text/html");
-            response.setStatus(HttpServletResponse.SC_OK);
-            response.getWriter().println("<h1>Hello World</h1>");
-            response.getWriter().println("session=" + request.getSession(true).getId());
-        } catch (IOException e) {
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-    }
-
-    @Override
     protected void doPost(HttpServletRequest request, HttpServletResponse response) {
         try {
             String clientUri = null;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
deleted file mode 100644
index d3755ff..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.fpcagent.workers;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpVersion;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.json.JSONObject;
-import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
-import org.onosproject.restconf.utils.RestconfUtils;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.model.DefaultModelObjectData;
-import org.onosproject.yang.model.ResourceData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
-import static org.onosproject.fpcagent.util.FpcUtil.module;
-
-public class HTTPNotifier implements AutoCloseable {
-    private static final Logger log = LoggerFactory.getLogger(HTTPNotifier.class);
-    private static HTTPNotifier _instance = null;
-    private final BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue;
-    private boolean run;
-    private ResponseHandler<String> handler = new BasicResponseHandler();
-
-    protected HTTPNotifier() {
-        this.run = true;
-        this.blockingQueue = new LinkedBlockingQueue<>();
-    }
-
-    public static HTTPNotifier createInstance() {
-        if (_instance == null) {
-            _instance = new HTTPNotifier();
-        }
-        return _instance;
-    }
-
-    public static HTTPNotifier getInstance() {
-        return _instance;
-    }
-
-    public void send(Map.Entry<String, DefaultYangAutoPrefixNotify> buf) {
-        try {
-            blockingQueue.put(buf);
-        } catch (InterruptedException e) {
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-    }
-
-
-    public void open() {
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        executorService.submit(() -> {
-            while ((!Thread.currentThread().isInterrupted()) && run) {
-                try {
-                    Map.Entry<String, DefaultYangAutoPrefixNotify> entry = blockingQueue.take();
-                    DefaultYangAutoPrefixNotify notify = entry.getValue();
-
-                    CloseableHttpClient client = HttpClients.createDefault();
-                    HttpPost httpPost = new HttpPost(entry.getKey());
-                    httpPost.addHeader("User-Agent", "ONOS Notification Agent");
-                    httpPost.addHeader("Charset", "utf-8");
-                    httpPost.addHeader("Content-type", "application/json");
-                    httpPost.setProtocolVersion(HttpVersion.HTTP_1_1);
-
-                    ResourceData dataNode = modelConverter.createDataNode(
-                            DefaultModelObjectData.builder()
-                                    .addModelObject(notify)
-                                    .build()
-                    );
-                    ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
-                    ObjectMapper mapper = new ObjectMapper();
-
-                    StringEntity params = new StringEntity(mapper.writeValueAsString(jsonNodes));
-                    httpPost.setEntity(params);
-
-                    log.info("Sending HTTP POST {}", httpPost);
-                    HttpResponse response = client.execute(httpPost);
-
-                    if (notify.value() instanceof DownlinkDataNotification) {
-                        String msg = handler.handleResponse(response);
-                        JSONObject json_body = new JSONObject(msg);
-                        DpnNgicCommunicator.send_ddn_ack(json_body);
-                        log.info("Response {}", response);
-                    }
-                } catch (Exception e) {
-                    log.error(ExceptionUtils.getFullStackTrace(e));
-                }
-            }
-        });
-    }
-
-    @Override
-    public void close() {
-        run = false;
-    }
-}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index 9ab50aa..8207205 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,13 +1,15 @@
 package org.onosproject.fpcagent.workers;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.util.CacheManager;
 import org.onosproject.fpcagent.util.FpcUtil;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.*;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configresultnotification.value.DefaultConfigResult;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opstatusvalue.OpStatusEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.result.ResultEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbody.resulttype.DefaultEmptyCase;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
 import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
@@ -28,7 +30,6 @@
 
 import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
 import static org.onosproject.fpcagent.util.Converter.*;
-import static org.onosproject.fpcagent.util.FpcUtil.notificationIds;
 import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
 
 public class ZMQSBSubscriberManager implements AutoCloseable {
@@ -43,6 +44,7 @@
     private byte controllerTopic;
     private boolean run;
     private boolean conflictingTopic;
+    private IdGenerator notificationIds;
 
     private Future<?> broadcastAllWorker;
     private Future<?> broadcastControllersWorker;
@@ -51,7 +53,7 @@
 
     private DpnDeviceListener dpnDeviceListener;
 
-    protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener) {
+    protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener, IdGenerator notificationIds) {
         this.address = address;
         this.run = true;
         this.nodeId = nodeId;
@@ -59,11 +61,12 @@
         this.conflictingTopic = false;
         this.controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
         this.dpnDeviceListener = dpnDeviceListener;
+        this.notificationIds = notificationIds;
     }
 
-    public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService) {
+    public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService, IdGenerator notificationIds) {
         if (_instance == null) {
-            _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService);
+            _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService, notificationIds);
         }
         return _instance;
     }
@@ -161,7 +164,7 @@
             ddn.sessionId(checkSessionId(toBigInt(buf, 2)));
             ddn.notificationMessageType("Downlink-Data-Notification");
             ddn.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
-            ddn.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+            ddn.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 13))));
             CacheManager.getCaches().forEach(
                     cacheManager -> {
                         try {
@@ -176,22 +179,30 @@
         }
 
         public Map.Entry<Object, Object> decode(byte[] buf) {
+//            uint8_t topic_id;
+//            uint8_t type;
             s11MsgType type;
             type = s11MsgType.getEnum(buf[1]);
             if (type.equals(s11MsgType.DDN)) {
+//                uint64_t session_id;
+//                uint32_t client_id;
+//                uint32_t op_id;
+//                uint8_t  node_network_id_buffer[NN_ID_BUF_LEN];
                 short nodeIdLen = buf[18];
                 short networkIdLen = buf[19 + nodeIdLen];
                 String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
-
                 return new AbstractMap.SimpleEntry<>(processDDN(buf, key), null);
             } else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
+//                uint8_t  source_topic_id;
+//                uint8_t  status;
+//                uint32_t source;
+//                uint8_t  node_network_id_buffer[NN_ID_BUF_LEN];
                 DpnStatusIndication status;
-
                 short nodeIdLen = buf[8];
                 short networkIdLen = buf[9 + nodeIdLen];
                 String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
-
                 status = DpnStatusIndication.getEnum(buf[3]);
+                log.info("topic {}, status {}, source {}, node/net {}", (short) buf[2], status, toInt(buf, 4), deviceId);
                 if (status.equals(DpnStatusIndication.HELLO)) {
                     log.info("Hello {} on topic {}", deviceId, buf[2]);
                     dpnDeviceListener.deviceAdded(deviceId, buf[2]);
@@ -200,6 +211,25 @@
                     dpnDeviceListener.deviceRemoved(deviceId);
                 }
                 return new AbstractMap.SimpleEntry<>(status, deviceId);
+            } else if (type.equals(s11MsgType.DPN_RESPONSE)) {
+//                uint8_t  cause;
+//                uint32_t client_id;
+//                uint32_t op_id;
+                ClientIdentifier clientId = ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 3))));
+                OpIdentifier opId = OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 7)));
+                log.info("cause {}, clientId {}, opId {}", (short) buf[2], clientId.toString(), opId.toString());
+                DefaultConfigResultNotification notify = new DefaultConfigResultNotification();
+                notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+                notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+                DefaultConfigResult result = new DefaultConfigResult();
+                result.causeValue(buf[2]);
+                result.opId(opId);
+                result.opStatus(OpStatusEnum.OK);
+                result.result(Result.of(ResultEnum.OK));
+                result.resultType(new DefaultEmptyCase());
+                notify.value(result);
+
+                sendNotification(clientId.toString(), notify);
             }
             return null;
         }
@@ -234,7 +264,8 @@
                                 notify.notificationId(NotificationId.of(notificationIds.getNewId()));
                                 notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
                                 notify.value((DownlinkDataNotification) key);
-                                sendNotification(notify, ((DownlinkDataNotification) key).clientId());
+
+                                sendNotification(((DownlinkDataNotification) key).clientId().toString(), notify);
                             } else if (key instanceof DpnStatusIndication) {
                                 if (key.equals(DpnStatusIndication.HELLO)) {
                                     byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());