extended dpn api
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcUtil.java
index c3a6a77..6f1ebf1 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcUtil.java
@@ -16,35 +16,65 @@
 
 package org.onosproject.fpcagent;
 
+import com.google.common.collect.Maps;
 import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
 import org.onosproject.yang.model.DataNode;
 import org.onosproject.yang.model.DefaultResourceData;
 import org.onosproject.yang.model.ResourceData;
 import org.onosproject.yang.model.ResourceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.onosproject.fpcagent.helpers.Converter.fromIntToLong;
+import static org.onosproject.fpcagent.helpers.Converter.toBigInt;
 
 /**
  * Helper class which stores all the static variables.
  */
-class FpcUtil {
-    static final int MAX_EVENTS = 1000;
-    static final int MAX_BATCH_MS = 5000;
-    static final int MAX_IDLE_MS = 1000;
-    static final String TIMER = "dynamic-config-fpcagent-timer";
-    static final String UNKNOWN_EVENT = "FPC Agent listener: unknown event: {}";
-    static final String EVENT_NULL = "Event cannot be null";
-    static final String FPC_APP_ID = "org.onosproject.fpcagent";
+public class FpcUtil {
+    protected static final Logger log = LoggerFactory.getLogger(FpcUtil.class);
+
+    public static final int MAX_EVENTS = 1000;
+    public static final int MAX_BATCH_MS = 5000;
+    public static final int MAX_IDLE_MS = 1000;
+    public static final String TIMER = "dynamic-config-fpcagent-timer";
+    public static final String UNKNOWN_EVENT = "FPC Agent listener: unknown event: {}";
+    public static final String EVENT_NULL = "Event cannot be null";
+    public static final String FPC_APP_ID = "org.onosproject.fpcagent";
+
+    private static final Map<String, FpcDpnId> uplinkDpnMap = Maps.newConcurrentMap();
+    private static final Map<String, Short> nodeToTopicMap = Maps.newConcurrentMap();
+
+    private static byte DPN_HELLO = 0b0000_0001;
+    private static byte DPN_BYE = 0b0000_0010;
+    private static byte DOWNLINK_DATA_NOTIFICATION = 0b0000_0101;
+    private static byte DPN_STATUS_INDICATION = 0b0000_1100;
+    private static byte DPN_OVERLOAD_INDICATION = 0b0000_0101;
+    private static byte DPN_REPLY = 0b0000_0100;
+    private static String DOWNLINK_DATA_NOTIFICATION_STRING = "Downlink-Data-Notification";
 
     // Resource ID for Configure DPN RPC command
-    static ResourceId configureDpnResourceId;
+    public static ResourceId configureDpnResourceId;
     // Resource ID for Configure RPC command
-    static ResourceId configureResourceId;
+    public static ResourceId configureResourceId;
     // Resource ID for tenants data
-    static ResourceId tenantsResourceId;
-    static ResourceId registerClientResourceId;
-    static ResourceId deregisterClientResourceId;
+    public static ResourceId tenantsResourceId;
+    public static ResourceId registerClientResourceId;
+    public static ResourceId deregisterClientResourceId;
 
     static {
         try {
@@ -86,4 +116,131 @@
                     .build();
         }
     }
+
+    /**
+     * Ensures the session id is an unsigned 64 bit integer
+     *
+     * @param sessionId - session id received from the DPN
+     * @return unsigned session id
+     */
+    private static BigInteger checkSessionId(BigInteger sessionId) {
+        if (sessionId.compareTo(BigInteger.ZERO) < 0) {
+            sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
+        }
+        return sessionId;
+    }
+
+    /**
+     * Decodes a DownlinkDataNotification
+     *
+     * @param buf - message buffer
+     * @param key - Concatenation of node id + / + network id
+     * @return DownlinkDataNotification or null if it could not be successfully decoded
+     */
+    private static DownlinkDataNotification processDDN(byte[] buf, String key) {
+        DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
+        ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
+        ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
+        ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+        ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+        ddnB.notificationDpnId(uplinkDpnMap.get(key));
+        return ddnB;
+    }
+
+    /**
+     * Decodes a DPN message.
+     *
+     * @param buf - message buffer
+     * @return - A pair with the DPN Id and decoded Object
+     */
+    public static Map.Entry<FpcDpnId, Object> decode(byte[] buf) {
+        if (buf[1] == DPN_REPLY) {
+            return null;
+        } else if (buf[1] == DOWNLINK_DATA_NOTIFICATION) {
+            short nodeIdLen = buf[18];
+            short networkIdLen = buf[19 + nodeIdLen];
+            String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
+            return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
+        } else if (buf[1] == DPN_STATUS_INDICATION) {
+            DPNStatusIndication.Status status = null;
+
+            short nodeIdLen = buf[8];
+            short networkIdLen = buf[9 + nodeIdLen];
+            String key = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
+            if (buf[3] == DPN_OVERLOAD_INDICATION) {
+                status = DPNStatusIndication.Status.OVERLOAD_INDICATION;
+            } else if (buf[3] == DPN_HELLO) {
+                status = DPNStatusIndication.Status.HELLO;
+                log.info("Hello {} on topic {}", key, buf[2]);
+                nodeToTopicMap.put(key, (short) buf[2]);
+            } else if (buf[3] == DPN_BYE) {
+                status = DPNStatusIndication.Status.BYE;
+                log.info("Bye {}", key);
+                nodeToTopicMap.remove(key);
+            }
+            return new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), new DPNStatusIndication(status, key));
+        }
+        return null;
+    }
+
+    /**
+     * Gets the mapping for node id / network id to ZMQ Topic
+     *
+     * @param Key - Concatenation of node id + / + network id
+     * @return - ZMQ Topic
+     */
+    public static Short getTopicFromNode(String Key) {
+        return nodeToTopicMap.get(Key);
+    }
+
+    /**
+     * Provides basic status changes,
+     */
+    public static class DPNStatusIndication {
+        private final Status status;
+        private final String key; //nodeId +"/"+ networkId
+        /**
+         * Node Reference of the DPN
+         */
+        public Short nodeRef;
+
+        /**
+         * Constructor providing the DPN and its associated Status.
+         *
+         * @param status - DPN Status
+         * @param key    - Combination of node id and network id
+         */
+        public DPNStatusIndication(Status status,
+                                   String key) {
+            this.status = status;
+            this.key = key;
+        }
+
+        /**
+         * Provides DPN Status
+         *
+         * @return Status associated to the DPN.
+         */
+        public Status getStatus() {
+            return status;
+        }
+
+        /**
+         * Provides the DPN key - nodeId +"/"+ networkId
+         *
+         * @return FpcDpnId
+         */
+        public String getKey() {
+            return this.key;
+        }
+
+        /**
+         * Basic DPN Status
+         */
+        public enum Status {
+            HELLO,
+            BYE,
+            OVERLOAD_INDICATION
+        }
+    }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
index 3c1fb23..8b72165 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/TenantManager.java
@@ -12,7 +12,8 @@
 import org.onosproject.config.DynamicConfigListener;
 import org.onosproject.config.DynamicConfigService;
 import org.onosproject.config.Filter;
-import org.onosproject.fpcagent.helpers.DpnApi;
+import org.onosproject.fpcagent.helpers.DpnCommunicationService;
+import org.onosproject.fpcagent.helpers.DpnNgicCommunicator;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.IetfDmmFpcagentOpParam;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.*;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configure.DefaultConfigureOutput;
@@ -68,8 +69,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private DynamicConfigService dynamicConfigService;
 
+    private DpnCommunicationService dpnCommunicationService;
+
     @Activate
     protected void activate() {
+        dpnCommunicationService = new DpnNgicCommunicator();
+
         dynamicConfigService.addListener(listener);
 
         // Create the Default Tenant on activate
@@ -233,7 +238,7 @@
                         );
 
                         if (key.isPresent()) {
-                            Short dpnTopic = DpnApi.getTopicFromNode(key.get());
+                            Short dpnTopic = FpcUtil.getTopicFromNode(key.get());
 
                             if (dpnTopic != null) {
                                 if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
@@ -250,7 +255,7 @@
 
                                 if (commands.contains("session")) {
                                     tasks.add(Executors.callable(() -> {
-                                        DpnApi.create_session(
+                                        dpnCommunicationService.create_session(
                                                 dpnTopic,
                                                 imsi,
                                                 Ip4Prefix.valueOf(context.delegatingIpPrefixes().get(0).toString()).address(),
@@ -269,11 +274,11 @@
 
                                     if (commands.contains("downlink")) {
                                         tasks.add(Executors.callable(() -> {
-                                            DpnApi.modify_bearer_dl(
+                                            dpnCommunicationService.modify_bearer_dl(
                                                     dpnTopic,
-                                                    s1u_sgw_gtpu_teid,
                                                     dlRemoteAddress,
                                                     s1u_enb_gtpu_teid,
+                                                    s1u_sgw_gtpu_teid,
                                                     cId,
                                                     opId
                                             );
@@ -287,7 +292,7 @@
                                     // TODO - Modify API for Indirect Forwarding to/from another SGW
                                 } else if (commands.contains("uplink")) {
                                     tasks.add(Executors.callable(() -> {
-                                        DpnApi.create_bearer_ul(
+                                        dpnCommunicationService.create_bearer_ul(
                                                 dpnTopic,
                                                 imsi,
                                                 lbi,
@@ -391,7 +396,7 @@
                         );
 
                         if (key.isPresent()) {
-                            Short dpnTopic = DpnApi.getTopicFromNode(key.get());
+                            Short dpnTopic = FpcUtil.getTopicFromNode(key.get());
 
                             if (dpnTopic != null) {
                                 if (context.ul().mobilityTunnelParameters().mobprofileParameters() instanceof ThreegppTunnel) {
@@ -408,19 +413,19 @@
                                 if (commands.contains("downlink")) {
                                     if (context.dl().lifetime() >= 0L) {
                                         tasks.add(Executors.callable(() ->
-                                                DpnApi.modify_bearer_dl(
+                                                dpnCommunicationService.modify_bearer_dl(
                                                         dpnTopic,
                                                         dlRemoteAddress,
                                                         s1u_enb_gtpu_teid,
                                                         dlLocalAddress,
+                                                        contextId,
                                                         cId,
-                                                        opId,
-                                                        contextId
+                                                        opId
                                                 )
                                         ));
                                     } else {
                                         tasks.add(Executors.callable(() ->
-                                                DpnApi.delete_bearer(
+                                                dpnCommunicationService.delete_bearer(
                                                         dpnTopic,
                                                         s1u_enb_gtpu_teid
                                                 )
@@ -430,7 +435,7 @@
                                 if (commands.contains("uplink")) {
                                     if (context.ul().lifetime() >= 0L) {
                                         tasks.add(Executors.callable(() ->
-                                                DpnApi.modify_bearer_ul(
+                                                dpnCommunicationService.modify_bearer_ul(
                                                         dpnTopic,
                                                         ulLocalAddress,
                                                         s1u_enb_gtpu_teid,
@@ -439,7 +444,7 @@
                                         ));
                                     } else {
                                         tasks.add(Executors.callable(() ->
-                                                DpnApi.delete_bearer(
+                                                dpnCommunicationService.delete_bearer(
                                                         dpnTopic,
                                                         s1u_sgw_gtpu_teid
                                                 )
@@ -528,7 +533,7 @@
                                     );
 
                                     if (key.isPresent()) {
-                                        Short dpnTopic = DpnApi.getTopicFromNode(key.get());
+                                        Short dpnTopic = FpcUtil.getTopicFromNode(key.get());
 
                                         if (dpnTopic != null) {
                                             Long teid;
@@ -543,7 +548,7 @@
 
                                             if (targetStr.endsWith("ul") || targetStr.endsWith("dl")) {
                                                 tasks.add(Executors.callable(() -> {
-                                                    DpnApi.delete_bearer(
+                                                    dpnCommunicationService.delete_bearer(
                                                             dpnTopic,
                                                             teid
                                                     );
@@ -553,7 +558,7 @@
                                                 }));
                                             } else {
                                                 tasks.add(Executors.callable(() -> {
-                                                    DpnApi.delete_session(
+                                                    dpnCommunicationService.delete_session(
                                                             dpnTopic,
                                                             context.lbi().uint8(),
                                                             teid,
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java
deleted file mode 100644
index b9ca75c..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnApi.java
+++ /dev/null
@@ -1,484 +0,0 @@
-package org.onosproject.fpcagent.helpers;
-
-
-import com.google.common.collect.Maps;
-import org.onlab.packet.Ip4Address;
-import org.onlab.packet.Ip4Prefix;
-import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
-import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.Map;
-
-import static org.onosproject.fpcagent.helpers.Converter.*;
-
-/**
- * DPDK DPN API over ZeroMQ.
- */
-public class DpnApi {
-    protected static final Logger log = LoggerFactory.getLogger(DpnApi.class);
-    private static final Map<String, FpcDpnId> uplinkDpnMap;
-    private static final Map<String, Short> nodeToTopicMap;
-    /**
-     * Topic for broadcasting
-     */
-    private static byte BROADCAST_TOPIC = 0b0000_0000;
-    private static byte CREATE_SESSION_TYPE = 0b0000_0001;
-    private static byte MODIFY_DL_BEARER_TYPE = 0b0000_0010;
-    private static byte DELETE_SESSION_TYPE = 0b0000_0011;
-    private static byte MODIFY_UL_BEARER_TYPE = 0b0000_0100;
-    private static byte CREATE_UL_BEARER_TYPE = 0b0000_0101;
-    private static byte CREATE_DL_BEARER_TYPE = 0b0000_0110;
-    private static byte DELETE_BEARER_TYPE = 0b0000_0110;
-    private static byte HELLO = 0b0000_1000;
-    private static byte BYE = 0b0000_1001;
-    private static byte SEND_ADC_TYPE = 0b001_0001;
-    private static byte DDN_ACK = 0b0000_0110;
-    private static byte DPN_HELLO = 0b0000_0001;
-    private static byte DPN_BYE = 0b0000_0010;
-    private static byte DOWNLINK_DATA_NOTIFICATION = 0b0000_0101;
-    private static byte DPN_STATUS_INDICATION = 0b0000_1100;
-    private static byte DPN_OVERLOAD_INDICATION = 0b0000_0101;
-    private static byte DPN_REPLY = 0b0000_0100;
-    private static String DOWNLINK_DATA_NOTIFICATION_STRING = "Downlink-Data-Notification";
-
-    static {
-        uplinkDpnMap = Maps.newConcurrentMap();
-        nodeToTopicMap = Maps.newConcurrentMap();
-    }
-
-    /**
-     * Creates Mobility Session
-     *
-     * @param dpn               - DPN
-     * @param imsi              - IMSI
-     * @param ue_ip             - Session IP Address
-     * @param default_ebi       - Default EBI
-     * @param s1u_sgw_gtpu_ipv4 - SGW GTP-U IPv4 Address
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     * @param clientIdentifier  - Client Identifier
-     * @param opIdentifier      - Operation Identifier
-     * @param sessionId         - Session Id
-     */
-    public static void create_session(
-            Short dpn,
-            BigInteger imsi,
-            Ip4Address ue_ip,
-            Short default_ebi,
-            Ip4Address s1u_sgw_gtpu_ipv4,
-            Long s1u_sgw_gtpu_teid,
-            Long clientIdentifier,
-            BigInteger opIdentifier,
-            Long sessionId
-    ) {
-        // TODO: check if subscriber is open.
-        ByteBuffer bb = ByteBuffer.allocate(41)
-                .put(toUint8(dpn))
-                .put(CREATE_SESSION_TYPE)
-                .put(toUint64(imsi))
-                .put(toUint8(default_ebi))
-                .put(toUint32(ue_ip.toInt()))
-                .put(toUint32(s1u_sgw_gtpu_teid))
-                .put(toUint32(s1u_sgw_gtpu_ipv4.toInt()))
-                .put(toUint64(BigInteger.valueOf(sessionId)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
-                .put(toUint32(clientIdentifier))
-                .put(toUint32(opIdentifier.longValue()));
-
-        log.info("create_session: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Modify Downlink Bearer
-     *
-     * @param dpn               - DPN
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
-     * @param s1u_enb_gtpu_teid - ENodeB GTP-U TEID
-     * @param clientIdentifier  - Client Identifier
-     * @param opIdentifier      - Operation Identifier
-     */
-    public static void modify_bearer_dl(
-            Short dpn,
-            Long s1u_sgw_gtpu_teid,
-            Ip4Address s1u_enb_gtpu_ipv4,
-            Long s1u_enb_gtpu_teid,
-            Long clientIdentifier,
-            BigInteger opIdentifier
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(23)
-                .put(toUint8(dpn))
-                .put(MODIFY_DL_BEARER_TYPE)
-                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
-                .put(toUint32(s1u_enb_gtpu_teid))
-                .put(toUint32(s1u_sgw_gtpu_teid))
-                .put(toUint32(clientIdentifier))
-                .put(toUint32(opIdentifier.longValue()));
-
-        log.info("modify_bearer_dl: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * DeleteOrQuery Mobility Session.
-     *
-     * @param dpn               - DPN
-     * @param del_default_ebi   - Default EBI
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     * @param clientIdentifier  - Client Identifier
-     * @param opIdentifier      - Operation Identifier
-     * @param sessionId         - Session Id
-     */
-    public static void delete_session(
-            Short dpn,
-            Short del_default_ebi,
-            Long s1u_sgw_gtpu_teid,
-            Long clientIdentifier,
-            BigInteger opIdentifier,
-            Long sessionId
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(19)
-                .put(toUint8(dpn))
-                .put(DELETE_SESSION_TYPE)
-                .put(toUint64(BigInteger.valueOf(sessionId)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
-                .put(toUint32(clientIdentifier))
-                .put(toUint32(opIdentifier.longValue()));
-
-        log.info("delete_session: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Create Uplink Bearer.
-     *
-     * @param dpn               - DPN
-     * @param imsi              - IMSI
-     * @param default_ebi       - Default EBI
-     * @param dedicated_ebi     - Dedicated EBI
-     * @param s1u_sgw_gtpu_ipv4 - SGW GTP-U IPv4 Address
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     */
-    public static void create_bearer_ul(
-            Short dpn,
-            BigInteger imsi,
-            Short default_ebi,
-            Short dedicated_ebi,
-            Ip4Address s1u_sgw_gtpu_ipv4,
-            Long s1u_sgw_gtpu_teid
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(21)
-                .put(toUint8(dpn))
-                .put(CREATE_UL_BEARER_TYPE)
-                .put(toUint64(imsi))
-                .put(toUint8(default_ebi))
-                .put(toUint8(dedicated_ebi))
-                .put(toUint32(s1u_sgw_gtpu_ipv4.toInt()))
-                .put(toUint32(s1u_sgw_gtpu_teid));
-
-        log.info("create_bearer_ul: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Create Downlink Bearer.
-     *
-     * @param dpn               - DPN
-     * @param dedicated_ebi     - Default EBI
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
-     * @param s1u_enb_gtpu_teid - ENodeB GTP-U TEID
-     */
-    public static void create_bearer_dl(
-            Short dpn,
-            Short dedicated_ebi,
-            Long s1u_sgw_gtpu_teid,
-            Ip4Address s1u_enb_gtpu_ipv4,
-            Long s1u_enb_gtpu_teid
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(16)
-                .put(toUint8(dpn))
-                .put(CREATE_DL_BEARER_TYPE)
-                .put(toUint8(dedicated_ebi))
-                .put(toUint32(s1u_sgw_gtpu_teid))
-                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
-                .put(toUint32(s1u_enb_gtpu_teid));
-
-        log.info("create_bearer_dl: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Modify Downlink Bearer.
-     *
-     * @param dpn               - DPN
-     * @param s1u_sgw_gtpu_ipv4 - SGW GTP-U IPv4 Address
-     * @param s1u_enb_gtpu_teid - ENodeB TEID
-     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
-     * @param clientIdentifier  - Client Identifier
-     * @param opIdentifier      - Operation Identifier
-     * @param sessionId         - Session Id
-     */
-    public static void modify_bearer_dl(
-            Short dpn,
-            Ip4Address s1u_enb_gtpu_ipv4,
-            Long s1u_enb_gtpu_teid,
-            Ip4Address s1u_sgw_gtpu_ipv4,
-            Long clientIdentifier,
-            BigInteger opIdentifier,
-            Long sessionId
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(32)
-                .put(toUint8(dpn))
-                .put(MODIFY_DL_BEARER_TYPE)
-                .put(toUint32(s1u_sgw_gtpu_ipv4.toInt()))
-                .put(toUint32(s1u_enb_gtpu_teid))
-                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
-                .put(toUint64(BigInteger.valueOf(sessionId)))
-                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
-                .put(toUint32(clientIdentifier))
-                .put(toUint32(opIdentifier.longValue()));
-
-        log.info("modify_bearer_dl: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Modify Uplink Bearer.
-     *
-     * @param dpn               - DPN
-     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
-     * @param s1u_enb_gtpu_teid - ENodeB GTP-U TEID
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     */
-    public static void modify_bearer_ul(
-            Short dpn,
-            Ip4Address s1u_enb_gtpu_ipv4,
-            Long s1u_enb_gtpu_teid,
-            Long s1u_sgw_gtpu_teid
-    ) {
-        ByteBuffer bb = ByteBuffer.allocate(15)
-                .put(toUint8(dpn))
-                .put(MODIFY_UL_BEARER_TYPE)
-                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
-                .put(toUint32(s1u_enb_gtpu_teid))
-                .put(toUint32(s1u_sgw_gtpu_teid));
-
-        log.info("modify_bearer_ul: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * DeleteOrQuery Bearer.
-     *
-     * @param dpnTopic          - DPN
-     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
-     */
-    public static void delete_bearer(
-            Short dpnTopic,
-            Long s1u_sgw_gtpu_teid) {
-        ByteBuffer bb = ByteBuffer.allocate(7)
-                .put(toUint8(dpnTopic))
-                .put(DELETE_BEARER_TYPE)
-                .put(toUint32(s1u_sgw_gtpu_teid));
-
-        log.info("delete_bearer: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Creates the byte buffer to send ADC rules over ZMQ
-     *
-     * @param topic        - DPN Topic
-     * @param domain_name  - domain
-     * @param ip           - ipaddress/ipprefix (i.e. 127.0.0.1/32)
-     * @param drop         - Drop if 1
-     * @param rating_group - Rating Group
-     * @param service_ID   - Service ID
-     * @param sponsor_ID   - Sponsor ID
-     */
-    public static void send_ADC_rules(Short topic,
-                                      String domain_name, String ip,
-                                      Short drop, Long rating_group,
-                                      Long service_ID, String sponsor_ID) {
-        Ip4Prefix ip_prefix = null;
-        if (ip != null) {
-            ip_prefix = Ip4Prefix.valueOf(ip);
-        }
-        Short selector_type = (short) (domain_name != null ? 0 : ip_prefix != null ? 2 : ip_prefix.address() != null ? 1 : 255);
-        if (selector_type == 255) {
-            log.warn("Domain/IP not found, failed to send rules");
-            return;
-        }
-        ByteBuffer bb = ByteBuffer.allocate(200);
-        bb.put(toUint8(topic))
-                .put(SEND_ADC_TYPE)
-                .put(toUint8(selector_type));
-        if (selector_type == 0) {
-            bb.put(toUint8((short) domain_name.length()))
-                    .put(domain_name.getBytes());
-        }
-        if ((selector_type == 1) || (selector_type == 2)) {
-            int ip_address_long = ip_prefix.address().toInt();
-            bb.put(toUint32(ip_address_long));
-        }
-        if (selector_type == 2) {
-            bb.put(toUint16(ip_prefix.prefixLength()));
-        }
-        if (drop != null)
-            bb.put(toUint8(drop));
-        if (rating_group != null)
-            bb.put(toUint32(rating_group));
-        if (service_ID != null)
-            bb.put(toUint32(service_ID));
-        if (sponsor_ID != null && (short) sponsor_ID.length() > 0) {
-            bb.put(toUint8((short) sponsor_ID.length()))
-                    .put(sponsor_ID.getBytes());
-        }
-        bb.put(toUint8(ZMQSBSubscriberManager.getControllerTopic()));
-
-        log.info("send_ADC_rules: {}", bb.array());
-        ZMQSBPublisherManager.getInstance().send(bb);
-    }
-
-    /**
-     * Ensures the session id is an unsigned 64 bit integer
-     *
-     * @param sessionId - session id received from the DPN
-     * @return unsigned session id
-     */
-    private static BigInteger checkSessionId(BigInteger sessionId) {
-        if (sessionId.compareTo(BigInteger.ZERO) < 0) {
-            sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
-        }
-        return sessionId;
-    }
-
-    /**
-     * Decodes a DownlinkDataNotification
-     *
-     * @param buf - message buffer
-     * @param key - Concatenation of node id + / + network id
-     * @return DownlinkDataNotification or null if it could not be successfully decoded
-     */
-    private static DownlinkDataNotification processDDN(byte[] buf, String key) {
-        DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
-        ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
-        ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
-        ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
-        ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
-        ddnB.notificationDpnId(uplinkDpnMap.get(key));
-        return ddnB;
-    }
-
-    /**
-     * Decodes a DPN message.
-     *
-     * @param buf - message buffer
-     * @return - A pair with the DPN Id and decoded Object
-     */
-    public static Map.Entry<FpcDpnId, Object> decode(byte[] buf) {
-        if (buf[1] == DPN_REPLY) {
-            return null;
-        } else if (buf[1] == DOWNLINK_DATA_NOTIFICATION) {
-            short nodeIdLen = buf[18];
-            short networkIdLen = buf[19 + nodeIdLen];
-            String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
-            return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
-        } else if (buf[1] == DPN_STATUS_INDICATION) {
-            DPNStatusIndication.Status status = null;
-
-            short nodeIdLen = buf[8];
-            short networkIdLen = buf[9 + nodeIdLen];
-            String key = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
-            if (buf[3] == DPN_OVERLOAD_INDICATION) {
-                status = DPNStatusIndication.Status.OVERLOAD_INDICATION;
-            } else if (buf[3] == DPN_HELLO) {
-                status = DPNStatusIndication.Status.HELLO;
-                log.info("Hello {} on topic {}", key, buf[2]);
-                nodeToTopicMap.put(key, (short) buf[2]);
-            } else if (buf[3] == DPN_BYE) {
-                status = DPNStatusIndication.Status.BYE;
-                log.info("Bye {}", key);
-                nodeToTopicMap.remove(key);
-            }
-            return new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), new DPNStatusIndication(status, key));
-        }
-        return null;
-    }
-
-    /**
-     * Gets the mapping for node id / network id to ZMQ Topic
-     *
-     * @param Key - Concatenation of node id + / + network id
-     * @return - ZMQ Topic
-     */
-    public static Short getTopicFromNode(String Key) {
-        Short aShort = nodeToTopicMap.get(Key);
-        return aShort;
-    }
-
-    /**
-     * Provides basic status changes,
-     */
-    public static class DPNStatusIndication {
-        private final Status status;
-        private final String key; //nodeId +"/"+ networkId
-        /**
-         * Node Reference of the DPN
-         */
-        public Short nodeRef;
-
-        /**
-         * Constructor providing the DPN and its associated Status.
-         *
-         * @param status - DPN Status
-         * @param key    - Combination of node id and network id
-         */
-        public DPNStatusIndication(Status status,
-                                   String key) {
-            this.status = status;
-            this.key = key;
-        }
-
-        /**
-         * Provides DPN Status
-         *
-         * @return Status associated to the DPN.
-         */
-        public Status getStatus() {
-            return status;
-        }
-
-        /**
-         * Provides the DPN key - nodeId +"/"+ networkId
-         *
-         * @return FpcDpnId
-         */
-        public String getKey() {
-            return this.key;
-        }
-
-        /**
-         * Basic DPN Status
-         */
-        public enum Status {
-            HELLO,
-            BYE,
-            OVERLOAD_INDICATION
-        }
-    }
-}
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnCommunicationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnCommunicationService.java
new file mode 100644
index 0000000..415e475
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnCommunicationService.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.fpcagent.helpers;
+
+import org.onlab.packet.Ip4Address;
+
+import java.math.BigInteger;
+
+public interface DpnCommunicationService {
+    /**
+     * Creates Mobility Session
+     *
+     * @param topicId           - DPN
+     * @param imsi              - IMSI
+     * @param ue_ip             - Session IP Address
+     * @param default_ebi       - Default EBI
+     * @param s1u_sgw_gtpu_ipv4 - SGW GTP-U IPv4 Address
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     * @param clientIdentifier  - Client Identifier
+     * @param opIdentifier      - Operation Identifier
+     * @param sessionId         - Session Id
+     */
+    void create_session(
+            Short topicId,
+            BigInteger imsi,
+            Ip4Address ue_ip,
+            Short default_ebi,
+            Ip4Address s1u_sgw_gtpu_ipv4,
+            Long s1u_sgw_gtpu_teid,
+            Long clientIdentifier,
+            BigInteger opIdentifier,
+            Long sessionId
+    );
+
+    /**
+     * DeleteOrQuery Mobility Session.
+     *
+     * @param dpn               - DPN
+     * @param del_default_ebi   - Default EBI
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     * @param clientIdentifier  - Client Identifier
+     * @param opIdentifier      - Operation Identifier
+     * @param sessionId         - Session Id
+     */
+    void delete_session(
+            Short dpn,
+            Short del_default_ebi,
+            Long s1u_sgw_gtpu_teid,
+            Long clientIdentifier,
+            BigInteger opIdentifier,
+            Long sessionId
+    );
+
+    /**
+     * Create Uplink Bearer.
+     *
+     * @param dpn               - DPN
+     * @param imsi              - IMSI
+     * @param default_ebi       - Default EBI
+     * @param dedicated_ebi     - Dedicated EBI
+     * @param s1u_sgw_gtpu_ipv4 - SGW GTP-U IPv4 Address
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     */
+    void create_bearer_ul(
+            Short dpn,
+            BigInteger imsi,
+            Short default_ebi,
+            Short dedicated_ebi,
+            Ip4Address s1u_sgw_gtpu_ipv4,
+            Long s1u_sgw_gtpu_teid
+    );
+
+    /**
+     * Create Downlink Bearer.
+     *
+     * @param dpn               - DPN
+     * @param dedicated_ebi     - Default EBI
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
+     * @param s1u_enb_gtpu_teid - ENodeB GTP-U TEID
+     */
+    void create_bearer_dl(
+            Short dpn,
+            Short dedicated_ebi,
+            Long s1u_sgw_gtpu_teid,
+            Ip4Address s1u_enb_gtpu_ipv4,
+            Long s1u_enb_gtpu_teid
+    );
+
+    /**
+     * Modify Downlink Bearer.
+     *  @param topicId               - DPN
+     * @param s1u_enodeb_ipv4 - ENodeB GTP-U IPv4 Address
+     * @param s1u_enodeb_teid - ENodeB TEID
+     * @param s1u_sgw_ipv4 - SGW GTP-U IPv4 Address
+     * @param sessionId         - Session Id
+     * @param clientId      - Operation Identifier
+     * @param opId         - Session Id
+     */
+    void modify_bearer_dl(
+            Short topicId,
+            Ip4Address s1u_enodeb_ipv4,
+            Long s1u_enodeb_teid,
+            Ip4Address s1u_sgw_ipv4,
+            Long sessionId,
+            Long clientId,
+            BigInteger opId
+    );
+
+    /**
+     * Modify Uplink Bearer.
+     *
+     * @param dpn               - DPN
+     * @param s1u_enb_gtpu_ipv4 - ENodeB GTP-U IPv4 Address
+     * @param s1u_enb_gtpu_teid - ENodeB GTP-U TEID
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     */
+    void modify_bearer_ul(
+            Short dpn,
+            Ip4Address s1u_enb_gtpu_ipv4,
+            Long s1u_enb_gtpu_teid,
+            Long s1u_sgw_gtpu_teid
+    );
+
+    /**
+     * DeleteOrQuery Bearer.
+     *
+     * @param dpnTopic          - DPN
+     * @param s1u_sgw_gtpu_teid - SGW GTP-U TEID
+     */
+    void delete_bearer(
+            Short dpnTopic,
+            Long s1u_sgw_gtpu_teid
+    );
+
+    /**
+     * Creates the byte buffer to send ADC rules over ZMQ
+     *
+     * @param topic        - DPN Topic
+     * @param domain_name  - domain
+     * @param ip           - ipaddress/ipprefix (i.e. 127.0.0.1/32)
+     * @param drop         - Drop if 1
+     * @param rating_group - Rating Group
+     * @param service_ID   - Service ID
+     * @param sponsor_ID   - Sponsor ID
+     */
+    void send_ADC_rules(
+            Short topic,
+            String domain_name, String ip,
+            Short drop, Long rating_group,
+            Long service_ID, String sponsor_ID
+    );
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnNgicCommunicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnNgicCommunicator.java
new file mode 100644
index 0000000..b12945c
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/helpers/DpnNgicCommunicator.java
@@ -0,0 +1,281 @@
+package org.onosproject.fpcagent.helpers;
+
+
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
+import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
+import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import static org.onosproject.fpcagent.helpers.Converter.*;
+
+/**
+ * DPDK DPN API over ZeroMQ.
+ */
+public class DpnNgicCommunicator implements DpnCommunicationService {
+    protected static final Logger log = LoggerFactory.getLogger(DpnNgicCommunicator.class);
+    /**
+     * Topic for broadcasting
+     */
+    private static byte BROADCAST_TOPIC = 0b0000_0000;
+    private static byte CREATE_SESSION_TYPE = 0b0000_0001;
+    private static byte MODIFY_DL_BEARER_TYPE = 0b0000_0010;
+    private static byte DELETE_SESSION_TYPE = 0b0000_0011;
+    private static byte MODIFY_UL_BEARER_TYPE = 0b0000_0100;
+    private static byte CREATE_UL_BEARER_TYPE = 0b0000_0101;
+    private static byte CREATE_DL_BEARER_TYPE = 0b0000_0110;
+    private static byte DELETE_BEARER_TYPE = 0b0000_0110;
+    private static byte HELLO = 0b0000_1000;
+    private static byte BYE = 0b0000_1001;
+    private static byte SEND_ADC_TYPE = 0b001_0001;
+    private static byte DDN_ACK = 0b0000_0110;
+
+    enum s11MsgType {
+        CREATE_SESSION(1),
+        MODIFY_BEARER(2),
+        DELETE_SESSION(3),
+        DPN_RESPONSE(4),
+        DDN(5),
+        ASSIGN_TOPIC(10),
+        ASSIGN_CONFLICT(11),
+        DPN_STATUS_INDICATION(12),
+        DPN_STATUS_ACK(13),
+        CONTROLLER_STATUS_INDICATION(14),
+        ADC_RULE(17),
+        PCC_RULE(18),
+        METER_RULE(19),
+        SDF_RULE(20);
+
+        private byte type;
+
+        s11MsgType(int type) {
+            this.type = (byte) type;
+        }
+
+        public byte getType() {
+            return type;
+        }
+    }
+
+    @Override
+    public void create_session(
+            Short topicId,
+            BigInteger imsi,
+            Ip4Address ue_ip,
+            Short default_ebi,
+            Ip4Address s1u_sgw_gtpu_ipv4,
+            Long s1u_sgw_gtpu_teid,
+            Long clientIdentifier,
+            BigInteger opIdentifier,
+            Long sessionId
+    ) {
+        /* NGIC Create Session expected buffer:
+            value: topic_id          bytes: 8
+            value: type              bytes: 8
+            value: imsi              bytes: 64
+            value: default_ebi       bytes: 8
+            value: ue_ipv4           bytes: 32
+            value: s1u_sgw_teid      bytes: 32
+            value: s1u_sgw_ipv4      bytes: 32
+            value: session_id        bytes: 64
+            value: controller_topic  bytes: 32
+            value: client_id         bytes: 32
+            value: op_id             bytes: 32
+         */
+        // TODO: check if subscriber is open.
+        ByteBuffer bb = ByteBuffer.allocate(41)
+                .put(toUint8(topicId))
+                .put(s11MsgType.CREATE_SESSION.getType())
+                .put(toUint64(imsi))
+                .put(toUint8(default_ebi))
+                .put(toUint32(ue_ip.toInt()))
+                .put(toUint32(s1u_sgw_gtpu_teid))
+                .put(toUint32(s1u_sgw_gtpu_ipv4.toInt()))
+                .put(toUint64(BigInteger.valueOf(sessionId)))
+                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(toUint32(clientIdentifier))
+                .put(toUint32(opIdentifier.longValue()));
+
+        log.info("create_session: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void delete_session(
+            Short dpn,
+            Short del_default_ebi,
+            Long s1u_sgw_gtpu_teid,
+            Long clientIdentifier,
+            BigInteger opIdentifier,
+            Long sessionId
+    ) {
+        ByteBuffer bb = ByteBuffer.allocate(19)
+                .put(toUint8(dpn))
+                .put(s11MsgType.DELETE_SESSION.getType())
+                .put(toUint64(BigInteger.valueOf(sessionId)))
+                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(toUint32(clientIdentifier))
+                .put(toUint32(opIdentifier.longValue()));
+
+        log.info("delete_session: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void create_bearer_ul(
+            Short dpn,
+            BigInteger imsi,
+            Short default_ebi,
+            Short dedicated_ebi,
+            Ip4Address s1u_sgw_gtpu_ipv4,
+            Long s1u_sgw_gtpu_teid
+    ) {
+        ByteBuffer bb = ByteBuffer.allocate(21)
+                .put(toUint8(dpn))
+                .put(CREATE_UL_BEARER_TYPE)
+                .put(toUint64(imsi))
+                .put(toUint8(default_ebi))
+                .put(toUint8(dedicated_ebi))
+                .put(toUint32(s1u_sgw_gtpu_ipv4.toInt()))
+                .put(toUint32(s1u_sgw_gtpu_teid));
+
+        log.info("create_bearer_ul: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void create_bearer_dl(
+            Short dpn,
+            Short dedicated_ebi,
+            Long s1u_sgw_gtpu_teid,
+            Ip4Address s1u_enb_gtpu_ipv4,
+            Long s1u_enb_gtpu_teid
+    ) {
+        ByteBuffer bb = ByteBuffer.allocate(16)
+                .put(toUint8(dpn))
+                .put(CREATE_DL_BEARER_TYPE)
+                .put(toUint8(dedicated_ebi))
+                .put(toUint32(s1u_sgw_gtpu_teid))
+                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
+                .put(toUint32(s1u_enb_gtpu_teid));
+
+        log.info("create_bearer_dl: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void modify_bearer_dl(
+            Short topicId,
+            Ip4Address s1u_enodeb_ipv4,
+            Long s1u_enodeb_teid,
+            Ip4Address s1u_sgw_ipv4,
+            Long sessionId,
+            Long clientId,
+            BigInteger opId
+    ) {
+        /* NGIC Modify Session expected buffer:
+            value: topic_id          bytes: 8
+            value: type              bytes: 8
+            value: s1u_enodeb_ipv4   bytes: 32
+            value: s1u_enodeb_teid   bytes: 32
+            value: s1u_sgw_ipv4      bytes: 32
+            value: session_id        bytes: 64
+            value: controller_topic  bytes: 8
+            value: client_id         bytes: 32
+            value: op_id             bytes: 32
+         */
+        ByteBuffer bb = ByteBuffer.allocate(32)
+                .put(toUint8(topicId))
+                .put(MODIFY_DL_BEARER_TYPE)
+                .put(toUint32(s1u_sgw_ipv4.toInt()))
+                .put(toUint32(s1u_enodeb_teid))
+                .put(toUint32(s1u_enodeb_ipv4.toInt()))
+                .put(toUint64(BigInteger.valueOf(sessionId)))
+                .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
+                .put(toUint32(clientId))
+                .put(toUint32(opId.longValue()));
+
+        log.info("modify_bearer_dl: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void modify_bearer_ul(
+            Short dpn,
+            Ip4Address s1u_enb_gtpu_ipv4,
+            Long s1u_enb_gtpu_teid,
+            Long s1u_sgw_gtpu_teid
+    ) {
+        ByteBuffer bb = ByteBuffer.allocate(15)
+                .put(toUint8(dpn))
+                .put(MODIFY_UL_BEARER_TYPE)
+                .put(toUint32(s1u_enb_gtpu_ipv4.toInt()))
+                .put(toUint32(s1u_enb_gtpu_teid))
+                .put(toUint32(s1u_sgw_gtpu_teid));
+
+        log.info("modify_bearer_ul: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void delete_bearer(
+            Short dpnTopic,
+            Long s1u_sgw_gtpu_teid) {
+        ByteBuffer bb = ByteBuffer.allocate(7)
+                .put(toUint8(dpnTopic))
+                .put(DELETE_BEARER_TYPE)
+                .put(toUint32(s1u_sgw_gtpu_teid));
+
+        log.info("delete_bearer: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+
+    @Override
+    public void send_ADC_rules(Short topic,
+                               String domain_name, String ip,
+                               Short drop, Long rating_group,
+                               Long service_ID, String sponsor_ID) {
+        Ip4Prefix ip_prefix = null;
+        if (ip != null) {
+            ip_prefix = Ip4Prefix.valueOf(ip);
+        }
+        Short selector_type = (short) (domain_name != null ? 0 : ip_prefix != null ? 2 : ip_prefix.address() != null ? 1 : 255);
+        if (selector_type == 255) {
+            log.warn("Domain/IP not found, failed to send rules");
+            return;
+        }
+        ByteBuffer bb = ByteBuffer.allocate(200);
+        bb.put(toUint8(topic))
+                .put(SEND_ADC_TYPE)
+                .put(toUint8(selector_type));
+        if (selector_type == 0) {
+            bb.put(toUint8((short) domain_name.length()))
+                    .put(domain_name.getBytes());
+        }
+        if ((selector_type == 1) || (selector_type == 2)) {
+            int ip_address_long = ip_prefix.address().toInt();
+            bb.put(toUint32(ip_address_long));
+        }
+        if (selector_type == 2) {
+            bb.put(toUint16(ip_prefix.prefixLength()));
+        }
+        if (drop != null)
+            bb.put(toUint8(drop));
+        if (rating_group != null)
+            bb.put(toUint32(rating_group));
+        if (service_ID != null)
+            bb.put(toUint32(service_ID));
+        if (sponsor_ID != null && (short) sponsor_ID.length() > 0) {
+            bb.put(toUint8((short) sponsor_ID.length()))
+                    .put(sponsor_ID.getBytes());
+        }
+        bb.put(toUint8(ZMQSBSubscriberManager.getControllerTopic()));
+
+        log.info("send_ADC_rules: {}", bb.array());
+        ZMQSBPublisherManager.getInstance().send(bb);
+    }
+}
\ No newline at end of file
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index d2fbba3..f259883 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,7 +1,7 @@
 package org.onosproject.fpcagent.workers;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.onosproject.fpcagent.helpers.DpnApi;
+import org.onosproject.fpcagent.FpcUtil;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
 import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
 import org.slf4j.Logger;
@@ -173,10 +173,10 @@
          *
          * @param dpnStatus - DPN Status Indication message received from the DPN
          */
-        protected void sendHelloReply(DpnApi.DPNStatusIndication dpnStatus) {
-            if (DpnApi.getTopicFromNode(dpnStatus.getKey()) != null) {
+        protected void sendHelloReply(FpcUtil.DPNStatusIndication dpnStatus) {
+            if (FpcUtil.getTopicFromNode(dpnStatus.getKey()) != null) {
                 ByteBuffer bb = ByteBuffer.allocate(9 + nodeId.length() + networkId.length())
-                        .put(toUint8(DpnApi.getTopicFromNode(dpnStatus.getKey())))
+                        .put(toUint8(FpcUtil.getTopicFromNode(dpnStatus.getKey())))
                         .put(HELLO_REPLY)
                         .put(toUint8(ZMQSBSubscriberManager.getControllerTopic()))
                         .put(toUint32(ZMQSBSubscriberManager.getControllerSourceId()))
@@ -210,12 +210,13 @@
                         }
                         break;
                     default:
-                        Map.Entry<FpcDpnId, Object> entry = DpnApi.decode(contents);
+                        Map.Entry<FpcDpnId, Object> entry = FpcUtil.decode(contents);
                         if (entry != null) {
                             if (entry.getValue() instanceof DownlinkDataNotification) {
-                            } else if (entry.getValue() instanceof DpnApi.DPNStatusIndication) {
-                                DpnApi.DPNStatusIndication dpnStatus = (DpnApi.DPNStatusIndication) entry.getValue();
-                                if (dpnStatus.getStatus() == DpnApi.DPNStatusIndication.Status.HELLO) {
+                                // TODO handle DL notification
+                            } else if (entry.getValue() instanceof FpcUtil.DPNStatusIndication) {
+                                FpcUtil.DPNStatusIndication dpnStatus = (FpcUtil.DPNStatusIndication) entry.getValue();
+                                if (dpnStatus.getStatus() == FpcUtil.DPNStatusIndication.Status.HELLO) {
                                     sendHelloReply(dpnStatus);
                                 }
                             }