ddn
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index 85d3098..63bcd25 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -16,10 +16,6 @@
 
 package org.onosproject.fpcagent;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -29,19 +25,16 @@
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.config.DynamicConfigStore;
 import org.onosproject.core.CoreService;
-import org.onosproject.core.IdGenerator;
 import org.onosproject.fpcagent.protocols.DpnCommunicationService;
 import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
 import org.onosproject.fpcagent.protocols.DpnP4Communicator;
 import org.onosproject.fpcagent.providers.CpProviderService;
 import org.onosproject.fpcagent.util.CacheManager;
 import org.onosproject.fpcagent.util.FpcUtil;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.device.DeviceStore;
-import org.onosproject.restconf.utils.RestconfUtils;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.DefaultConnectionInfo;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.ZmqDpnControlProtocol;
@@ -95,8 +88,14 @@
 import java.math.BigInteger;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.onosproject.fpcagent.util.Converter.convertContext;
 import static org.onosproject.fpcagent.util.FpcUtil.*;
@@ -134,13 +133,9 @@
     private DeviceService deviceService;
 
     private InternalDeviceListener listener = new InternalDeviceListener();
-    private ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
-    private ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
-    private HashSet<FpcDpnId> dpnInfo = Sets.newHashSet();
 
     // FIXME configurable
     private ExecutorService executorService = Executors.newFixedThreadPool(25);
-    private IdGenerator notificationIds;
 
     @Activate
     protected void activate() {
@@ -991,28 +986,6 @@
         }, executorService).join();
     }
 
-    private void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
-        ResourceData dataNode = modelConverter.createDataNode(
-                DefaultModelObjectData.builder()
-                        .addModelObject(notify)
-                        .build()
-        );
-        ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(notification, dataNode.dataNodes().get(0));
-        ObjectMapper mapper = new ObjectMapper();
-
-        try {
-            log.info("Sending HTTP notification {} to {}", notify, client);
-            HTTPNotifier.getInstance().send(
-                    new AbstractMap.SimpleEntry<>(
-                            clientInfo.get(client).endpointUri().toString(),
-                            mapper.writeValueAsString(jsonNodes)
-                    )
-            );
-        } catch (JsonProcessingException e) {
-            log.error(ExceptionUtils.getFullStackTrace(e));
-        }
-    }
-
     public class InternalDeviceListener implements DeviceListener {
 
         @Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
index efac596..2ee0e11 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
@@ -33,6 +33,7 @@
 
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
 
 import static org.onosproject.fpcagent.util.FpcUtil.getTenant;
 
@@ -118,7 +119,7 @@
                                         log.debug("tenant {}", tenant);
                                         if (tenant.fpcTopology().dpns() != null) {
                                             return tenant.fpcTopology().dpns().stream()
-                                                    .filter(dpns -> s.equals(dpns.nodeId()+"/"+dpns.networkId()))
+                                                    .filter(dpns -> s.equals(dpns.nodeId() + "/" + dpns.networkId()))
                                                     .findFirst()
                                                     .map(Dpns::dpnId);
                                         }
@@ -137,4 +138,8 @@
         return cacheInfo.get(identity);
     }
 
+    public static Stream<CacheManager> getCaches() {
+        return cacheInfo.values().stream();
+    }
+
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 9de92c8..98585bf 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,21 +16,36 @@
 
 package org.onosproject.fpcagent.util;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.config.Filter;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.fpcagent.workers.HTTPNotifier;
 import org.onosproject.net.Device;
 import org.onosproject.net.device.DeviceStore;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
 import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
 import org.onosproject.yang.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.AbstractMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
 
 import static org.onosproject.net.DeviceId.deviceId;
 
@@ -55,6 +70,12 @@
     public static ResourceId module;
     public static ResourceId notification;
 
+    public static IdGenerator notificationIds;
+
+    public static final ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
+    public static final ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
+    public static final HashSet<FpcDpnId> dpnInfo = Sets.newHashSet();
+
     /**
      * Returns resource id from model converter.
      *
@@ -275,4 +296,26 @@
                 node -> dynamicConfigService.updateNode(dataNode.resourceId(), node)
         );
     }
+
+    public static void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
+        ResourceData dataNode = modelConverter.createDataNode(
+                DefaultModelObjectData.builder()
+                        .addModelObject(notify)
+                        .build()
+        );
+        ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+        ObjectMapper mapper = new ObjectMapper();
+
+        try {
+            log.info("Sending HTTP notification {} to {}", notify, client);
+            HTTPNotifier.getInstance().send(
+                    new AbstractMap.SimpleEntry<>(
+                            clientInfo.get(client).endpointUri().toString(),
+                            mapper.writeValueAsString(jsonNodes)
+                    )
+            );
+        } catch (JsonProcessingException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index a8e93e4..14ebe91 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,22 +1,37 @@
 package org.onosproject.fpcagent.workers;
 
-import javafx.util.Pair;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.onosproject.fpcagent.providers.DpnDeviceListener;
+import org.onosproject.fpcagent.util.CacheManager;
 import org.onosproject.fpcagent.util.FpcUtil;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZContext;
 import org.zeromq.ZMQ;
 
+import java.math.BigInteger;
+import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
-import static org.onosproject.fpcagent.util.Converter.toInt;
+import static org.onosproject.fpcagent.util.Converter.*;
+import static org.onosproject.fpcagent.util.FpcUtil.notificationIds;
+import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
 
 public class ZMQSBSubscriberManager implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(ZMQSBSubscriberManager.class);
@@ -123,37 +138,46 @@
             this.ctx = new ZContext();
         }
 
-//        /**
-//         * Ensures the session id is an unsigned 64 bit integer
-//         *
-//         * @param sessionId - session id received from the DPN
-//         * @return unsigned session id
-//         */
-//        private static BigInteger checkSessionId(BigInteger sessionId) {
-//            if (sessionId.compareTo(BigInteger.ZERO) < 0) {
-//                sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
-//            }
-//            return sessionId;
-//        }
-//
-//        /**
-//         * Decodes a DownlinkDataNotification
-//         *
-//         * @param buf - message buffer
-//         * @param key - Concatenation of node id + / + network id
-//         * @return DownlinkDataNotification or null if it could not be successfully decoded
-//         */
-//        private static DownlinkDataNotification processDDN(byte[] buf, String key) {
-//            DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
-//            ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
-//            ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
-//            ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
-//            ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
-//            ddnB.notificationDpnId(uplinkDpnMap.get(key));
-//            return ddnB;
-//        }
+        /**
+         * Ensures the session id is an unsigned 64 bit integer
+         *
+         * @param sessionId - session id received from the DPN
+         * @return unsigned session id
+         */
+        private BigInteger checkSessionId(BigInteger sessionId) {
+            if (sessionId.compareTo(BigInteger.ZERO) < 0) {
+                sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
+            }
+            return sessionId;
+        }
 
-        public Pair<Object, Object> decode(byte[] buf) {
+        /**
+         * Decodes a DownlinkDataNotification
+         *
+         * @param buf - message buffer
+         * @param key - Concatenation of node id + / + network id
+         * @return DownlinkDataNotification or null if it could not be successfully decoded
+         */
+        private DownlinkDataNotification processDDN(byte[] buf, String key) {
+            DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
+            ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
+            ddnB.notificationMessageType("Downlink-Data-Notification");
+            ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+            ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+            CacheManager.getCaches().forEach(
+                    cacheManager -> {
+                        try {
+                            Optional<FpcDpnId> fpcDpnId = cacheManager.nodeNetworkCache.get(key);
+                            fpcDpnId.ifPresent(ddnB::notificationDpnId);
+                        } catch (ExecutionException e) {
+                            log.error(ExceptionUtils.getFullStackTrace(e));
+                        }
+                    }
+            );
+            return ddnB;
+        }
+
+        public Map.Entry<Object, Object> decode(byte[] buf) {
             s11MsgType type;
             type = s11MsgType.getEnum(buf[1]);
             if (type.equals(s11MsgType.DDN)) {
@@ -161,7 +185,7 @@
                 short networkIdLen = buf[19 + nodeIdLen];
                 String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
 
-//                return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
+                return new AbstractMap.SimpleEntry<>(processDDN(buf, key), null);
             } else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
                 DpnStatusIndication status;
 
@@ -177,7 +201,7 @@
                     log.info("Bye {}", deviceId);
                     dpnDeviceListener.deviceRemoved(deviceId);
                 }
-                return new Pair<>(status, deviceId);
+                return new AbstractMap.SimpleEntry<>(status, deviceId);
             }
             return null;
         }
@@ -204,11 +228,15 @@
                         }
                         break;
                     default:
-                        Pair msg = decode(contents);
+                        Map.Entry msg = decode(contents);
                         if (msg != null) {
                             Object key = msg.getKey();
                             if (key instanceof DownlinkDataNotification) {
-                                // TODO handle DL notification
+                                DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+                                notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+                                notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+                                notify.value((DownlinkDataNotification) key);
+                                sendNotification(notify , ((DownlinkDataNotification) key).clientId());
                             } else if (key instanceof DpnStatusIndication) {
                                 if (key.equals(DpnStatusIndication.HELLO)) {
                                     byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());