diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 8e1f5c1..f14cf66 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -67,6 +67,9 @@
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
 import org.opencord.aaa.AaaConfig;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaSupplicantMachineStats;
 import org.opencord.aaa.AuthenticationEvent;
 import org.opencord.aaa.AuthenticationEventListener;
 import org.opencord.aaa.AuthenticationService;
@@ -128,6 +131,9 @@
     protected AuthenticationStatisticsService aaaStatisticsManager;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected AaaMachineStatisticsService aaaSupplicantStatsManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -475,9 +481,18 @@
             aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
             return;
         }
+
+        //instance of StateMachine using the sessionId for updating machine stats
+        StateMachine machineStats = StateMachine.lookupStateMachineBySessionId(stateMachine.sessionId());
+
         EAP eapPayload;
         Ethernet eth;
         checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
+
+        //increasing packets and octets received from server
+        machineStats.incrementTotalPacketsReceived();
+        machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
+
         if (outPacketSet.contains(radiusPacket.getIdentifier())) {
             aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
             outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
@@ -502,6 +517,9 @@
                 aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
                 outPacketSupp.add(eapPayload.getIdentifier());
                 aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
+                //increasing packets send to server
+                machineStats.incrementTotalPacketsSent();
+                machineStats.incrementTotalOctetSent(eapPayload.getLength());
                 break;
             case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
                 log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
@@ -521,6 +539,9 @@
 
                 stateMachine.authorizeAccess();
                 aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
+                //increasing packets send to server
+                machineStats.incrementTotalPacketsSent();
+                machineStats.incrementTotalOctetSent(eapPayload.getLength());
                 break;
             case RADIUS.RADIUS_CODE_ACCESS_REJECT:
                 log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
@@ -548,10 +569,20 @@
 
                 stateMachine.denyAccess();
                 aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
+                //increasing packets send to server
+                machineStats.incrementTotalPacketsSent();
+                machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                //pushing machine stats to kafka
+                AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
+                aaaSupplicantStatsManager.getMachineStatsDelegate()
+                        .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, machineObj));
                 break;
             default:
                 log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
                 aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
+                //increasing packets received to server
+                machineStats.incrementTotalPacketsReceived();
+                machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
         }
         aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
     }
@@ -684,6 +715,7 @@
                 stateMachine = new StateMachine(sessionId);
             } else {
                 log.debug("Using existing state-machine for sessionId: {}", sessionId);
+                stateMachine.setEapolTypeVal(eapol.getEapolType());
             }
 
             switch (eapol.getEapolType()) {
@@ -713,6 +745,15 @@
                     break;
                 case EAPOL.EAPOL_LOGOFF:
                     log.debug("EAP packet: EAPOL_LOGOFF");
+                    //posting the machine stat data for current supplicant device.
+                    if (stateMachine.getSessionTerminateReason() == null ||
+                            stateMachine.getSessionTerminateReason().equals("")) {
+                        stateMachine.setSessionTerminateReason(
+                                StateMachine.SessionTerminationReasons.SUPPLICANT_LOGOFF.getReason());
+                    }
+                    AaaSupplicantMachineStats obj = aaaSupplicantStatsManager.getSupplicantStats(stateMachine);
+                    aaaSupplicantStatsManager.getMachineStatsDelegate()
+                            .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, obj));
                     if (stateMachine.state() == StateMachine.STATE_AUTHORIZED) {
                         stateMachine.logoff();
                         aaaStatisticsManager.getAaaStats().incrementEapolLogoffRx();
@@ -750,6 +791,8 @@
                             // change the state to "PENDING"
                             if (stateMachine.state() == StateMachine.STATE_PENDING) {
                                 aaaStatisticsManager.getAaaStats().increaseRequestReTx();
+                                stateMachine.incrementTotalPacketsSent();
+                                stateMachine.incrementTotalOctetSent(eapol.getPacketLength());
                             }
                             stateMachine.requestAccess();
                             break;
@@ -917,6 +960,16 @@
                     PortNumber portNumber = event.port().number();
                     String sessionId = devId.toString() + portNumber.toString();
 
+                    StateMachine stateMachine = StateMachine.lookupStateMachineBySessionId(sessionId);
+                    if (stateMachine != null) {
+                        stateMachine.setSessionTerminateReason(
+                                StateMachine.SessionTerminationReasons.PORT_REMOVED.getReason());
+                    }
+                    //pushing captured machine stats to kafka
+                    AaaSupplicantMachineStats obj = aaaSupplicantStatsManager.getSupplicantStats(stateMachine);
+                    aaaSupplicantStatsManager.getMachineStatsDelegate()
+                            .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, obj));
+
                     Map<String, StateMachine> sessionIdMap = StateMachine.sessionIdMap();
                     StateMachine removed = sessionIdMap.remove(sessionId);
                     if (removed != null) {
