VOL-4063: Downstream packets dropped when AES encryption enabled with RSYS3200G

Change-Id: I16d2965466bb4427ca06afcb1d530c03c9b17d61
diff --git a/agent/src/core_api_handler.cc b/agent/src/core_api_handler.cc
index babd9ce..600b57f 100644
--- a/agent/src/core_api_handler.cc
+++ b/agent/src/core_api_handler.cc
@@ -362,6 +362,7 @@
         bcmos_fastlock_init(&flow_id_bitset_lock, 0);
         bcmos_fastlock_init(&voltha_flow_to_device_flow_lock, 0);
         bcmos_fastlock_init(&alloc_cfg_wait_lock, 0);
+        bcmos_fastlock_init(&gem_cfg_wait_lock, 0);
         bcmos_fastlock_init(&onu_deactivate_wait_lock, 0);
         bcmos_fastlock_init(&acl_packet_trap_handler_lock, 0);
 
@@ -2762,8 +2763,8 @@
         return err;
     }
 
-    if (direction.compare(upstream) == 0) {
-        Status st = install_gem_port(access_intf_id, onu_id, gemport_id);
+    if (direction == upstream || direction == downstream) {
+        Status st = install_gem_port(access_intf_id, onu_id, uni_id, gemport_id);
         if (st.error_code() != grpc::StatusCode::ALREADY_EXISTS && st.error_code() != grpc::StatusCode::OK) {
             OPENOLT_LOG(ERROR, openolt_log_id, "failed to created gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
             return BCM_ERR_INTERNAL;
@@ -2842,6 +2843,17 @@
     bcmolt_tm_queue_key key = { };
     bcmos_errno err;
 
+    // Gemports are bi-directional (except in multicast case). We create the gem port when we create the
+    // upstream/downstream queue (see CreateQueue function) and it makes sense to delete them when remove the queues.
+    // For multicast case we do not manage the install/remove of gem port in agent application. It is managed by BAL.
+    if (direction == upstream || direction == downstream) {
+        Status st = remove_gem_port(access_intf_id, onu_id, uni_id, gemport_id);
+        if (st.error_code() != grpc::StatusCode::OK) {
+            OPENOLT_LOG(ERROR, openolt_log_id, "failed to remove gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
+            return BCM_ERR_INTERNAL;
+        }
+    }
+
     if (direction == downstream) {
         if (is_tm_sched_id_present(access_intf_id, onu_id, uni_id, direction, tech_profile_id)) {
             key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction, tech_profile_id);
@@ -2851,15 +2863,6 @@
             return BCM_ERR_OK;
         }
     } else {
-        // Gemports are bi-directional (except in multicast case). We create the gem port when we create the
-        // upstream queue (see CreateQueue function) and it makes sense to delete them when remove the upstream queues.
-        // For multicast case we do not manage the install/remove of gem port in agent application. It is managed by BAL.
-        // Moreover it also makes sense to remove when upstream queue is getting removed because the upstream queue MUST exist always.
-        // It is possible that the downstream queues are not created for a subscriber (for ex: upstream EAPoL trap flow only exists
-        // but no other flow, and in this case only upstream scheduler and queues exist. We do not have a scenario where only downstream
-        // subscriber flows exist but no upstream )
-        remove_gem_port(access_intf_id, gemport_id);
-
         /* In the upstream we use pre-created queues on the NNI scheduler that are used by all subscribers.
         They should not be removed. So, lets return OK. */
         return BCM_ERR_OK;
diff --git a/agent/src/core_data.cc b/agent/src/core_data.cc
index a53ca0e..65fc62c 100644
--- a/agent/src/core_data.cc
+++ b/agent/src/core_data.cc
@@ -129,8 +129,21 @@
 // Lock to protect critical section data structure used for handling AllocObject configuration response.
 bcmos_fastlock alloc_cfg_wait_lock;
 
+// Map used to track response from BAL for ITU PON Gem Configuration.
+// The key is gem_cfg_compltd_key and value is a concurrent thread-safe queue which is
+// used for pushing (from BAL) and popping (at application) the results.
+std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *> gem_cfg_compltd_map;
+// Lock to protect critical section data structure used for handling GemObject configuration response.
+bcmos_fastlock gem_cfg_wait_lock;
+
+/* This represents the Key to 'gemport_status_map' map.
+ Represents (pon_intf_id, onu_id, uni_id, gemport_id) */
+typedef std::tuple<uint32_t, uint32_t, uint32_t, uint32_t> gemport_status_map_key_tuple;
+/* 'gemport_status_map' maps gemport_status_map_key_tuple to boolean value */
+std::map<gemport_status_map_key_tuple, bool> gemport_status_map;
+
 // Map used to track response from BAL for Onu Deactivation Completed Indication
-// The key is alloc_cfg_compltd_key and value is a concurrent thread-safe queue which is
+// The key is onu_deact_compltd_key and value is a concurrent thread-safe queue which is
 // used for pushing (from BAL) and popping (at application) the results.
 std::map<onu_deact_compltd_key,  Queue<onu_deactivate_complete_result> *> onu_deact_compltd_map;
 // Lock to protect critical section data structure used for handling Onu Deactivation Completed Indication
diff --git a/agent/src/core_data.h b/agent/src/core_data.h
index 12edb9e..57b2fe2 100644
--- a/agent/src/core_data.h
+++ b/agent/src/core_data.h
@@ -51,6 +51,10 @@
 }
 
 #define ALLOC_CFG_COMPLETE_WAIT_TIMEOUT 5000 // in milli-seconds
+#define GEM_CFG_COMPLETE_WAIT_TIMEOUT 5000 // in milli-seconds
+
+// max retry count to find gem config key in gem_cfg_compltd_map
+#define MAX_GEM_CFG_KEY_CHECK 5
 
 #define ONU_DEACTIVATE_COMPLETE_WAIT_TIMEOUT 5000 // in milli-seconds
 
@@ -118,6 +122,31 @@
     AllocCfgStatus status;
 } alloc_cfg_complete_result;
 
+enum GemCfgAction {
+    GEM_OBJECT_CREATE,
+    GEM_OBJECT_DELETE,
+    GEM_OBJECT_ENCRYPT
+};
+
+enum GemObjectState {
+    GEM_OBJECT_STATE_NOT_CONFIGURED,
+    GEM_OBJECT_STATE_INACTIVE,
+    GEM_OBJECT_STATE_PROCESSING,
+    GEM_OBJECT_STATE_ACTIVE
+};
+
+enum GemCfgStatus {
+    GEM_CFG_STATUS_SUCCESS,
+    GEM_CFG_STATUS_FAIL
+};
+
+typedef struct {
+    uint32_t pon_intf_id;
+    uint32_t gem_port_id;
+    GemObjectState state;
+    GemCfgStatus status;
+} gem_cfg_complete_result;
+
 typedef struct {
     uint32_t pon_intf_id;
     uint32_t onu_id;
@@ -128,6 +157,9 @@
 // key for map used for tracking ITU PON Alloc Configuration results from BAL
 typedef std::tuple<uint32_t, uint32_t> alloc_cfg_compltd_key;
 
+// key for map used for tracking ITU PON Gem Configuration results from BAL
+typedef std::tuple<uint32_t, uint32_t> gem_cfg_compltd_key;
+
 // key for map used for tracking Onu Deactivation Completed Indication
 typedef std::tuple<uint32_t, uint32_t> onu_deact_compltd_key;
 
@@ -240,13 +272,27 @@
 // The key is alloc_cfg_compltd_key and value is a concurrent thread-safe queue which is
 // used for pushing (from BAL) and popping (at application) the results.
 extern std::map<alloc_cfg_compltd_key,  Queue<alloc_cfg_complete_result> *> alloc_cfg_compltd_map;
+
+// Map used to track response from BAL for ITU PON Gem Configuration.
+// The key is gem_cfg_compltd_key and value is a concurrent thread-safe queue which is
+// used for pushing (from BAL) and popping (at application) the results.
+extern std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *> gem_cfg_compltd_map;
+
+/* This represents the Key to 'gemport_status_map' map.
+ Represents (pon_intf_id, onu_id, uni_id, gemport_id) */
+typedef std::tuple<uint32_t, uint32_t, uint32_t, uint32_t> gemport_status_map_key_tuple;
+/* 'gemport_status_map' maps gemport_status_map_key_tuple to boolean value */
+extern std::map<gemport_status_map_key_tuple, bool> gemport_status_map;
+
 // Map used to track response from BAL for Onu Deactivation Completed Indication
-// The key is alloc_cfg_compltd_key and value is a concurrent thread-safe queue which is
+// The key is onu_deact_compltd_key and value is a concurrent thread-safe queue which is
 // used for pushing (from BAL) and popping (at application) the results.
 extern std::map<onu_deact_compltd_key,  Queue<onu_deactivate_complete_result> *> onu_deact_compltd_map;
 
 // Lock to protect critical section data structure used for handling AllocObject configuration response.
 extern bcmos_fastlock alloc_cfg_wait_lock;
+// Lock to protect critical section data structure used for handling GemObject configuration response.
+extern bcmos_fastlock gem_cfg_wait_lock;
 // Lock to protect critical section data structure used for handling Onu deactivation completed Indication
 extern bcmos_fastlock onu_deactivate_wait_lock;
 
diff --git a/agent/src/core_utils.cc b/agent/src/core_utils.cc
index e8060aa..6c027d5 100644
--- a/agent/src/core_utils.cc
+++ b/agent/src/core_utils.cc
@@ -573,7 +573,9 @@
 bcmos_errno wait_for_alloc_action(uint32_t intf_id, uint32_t alloc_id, AllocCfgAction action) {
     Queue<alloc_cfg_complete_result> cfg_result;
     alloc_cfg_compltd_key k(intf_id, alloc_id);
+    bcmos_fastlock_lock(&alloc_cfg_wait_lock);
     alloc_cfg_compltd_map[k] =  &cfg_result;
+    bcmos_fastlock_unlock(&alloc_cfg_wait_lock, 0);
     bcmos_errno err = BCM_ERR_OK;
 
     // Try to pop the result from BAL with a timeout of ALLOC_CFG_COMPLETE_WAIT_TIMEOUT ms
@@ -622,6 +624,71 @@
     return err;
 }
 
+// This method handles waiting for GemObject configuration.
+// Returns error if the GemObject is not in the appropriate state based on action requested.
+bcmos_errno wait_for_gem_action(uint32_t intf_id, uint32_t gem_port_id, GemCfgAction action) {
+    Queue<gem_cfg_complete_result> cfg_result;
+    gem_cfg_compltd_key k(intf_id, gem_port_id);
+    bcmos_fastlock_lock(&gem_cfg_wait_lock);
+    gem_cfg_compltd_map[k] =  &cfg_result;
+    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+    bcmos_errno err = BCM_ERR_OK;
+
+    // Try to pop the result from BAL with a timeout of GEM_CFG_COMPLETE_WAIT_TIMEOUT ms
+    std::pair<gem_cfg_complete_result, bool> result = cfg_result.pop(GEM_CFG_COMPLETE_WAIT_TIMEOUT);
+    if (result.second == false) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "timeout waiting for gem cfg complete indication intf_id %d, gem_port_id %d\n",
+                    intf_id, gem_port_id);
+        // Invalidate the queue pointer.
+        bcmos_fastlock_lock(&gem_cfg_wait_lock);
+        gem_cfg_compltd_map[k] = NULL;
+        bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+        err = BCM_ERR_INTERNAL;
+    }
+    else if (result.first.status == GEM_CFG_STATUS_FAIL) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "error processing gem cfg request intf_id %d, gem_port_id %d\n",
+                    intf_id, gem_port_id);
+        err = BCM_ERR_INTERNAL;
+    }
+
+    if (err == BCM_ERR_OK) {
+        if (action == GEM_OBJECT_CREATE) {
+            if (result.first.state != GEM_OBJECT_STATE_ACTIVE) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "gem object not in active state intf_id %d, gem_port_id %d gem_obj_state %d\n",
+                            intf_id, gem_port_id, result.first.state);
+               err = BCM_ERR_INTERNAL;
+            } else {
+                OPENOLT_LOG(INFO, openolt_log_id, "Create itupon gem object success, intf_id %d, gem_port_id %d\n",
+                            intf_id, gem_port_id);
+            }
+        } else if (action == GEM_OBJECT_ENCRYPT) {
+            if (result.first.state != GEM_OBJECT_STATE_ACTIVE) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "gem object not in active state intf_id %d, gem_port_id %d gem_obj_state %d\n",
+                            intf_id, gem_port_id, result.first.state);
+               err = BCM_ERR_INTERNAL;
+            } else {
+                OPENOLT_LOG(INFO, openolt_log_id, "Enable itupon gem object encryption success, intf_id %d, gem_port_id %d\n",
+                            intf_id, gem_port_id);
+            }
+        } else { // GEM_OBJECT_DELETE
+              if (result.first.state != GEM_OBJECT_STATE_NOT_CONFIGURED) {
+                  OPENOLT_LOG(ERROR, openolt_log_id, "gem object is not reset intf_id %d, gem_port_id %d gem_obj_state %d\n",
+                              intf_id, gem_port_id, result.first.state);
+                  err = BCM_ERR_INTERNAL;
+              } else {
+                  OPENOLT_LOG(INFO, openolt_log_id, "Remove itupon gem object success, intf_id %d, gem_port_id %d\n",
+                              intf_id, gem_port_id);
+              }
+        }
+    }
+
+    // Remove entry from map
+    bcmos_fastlock_lock(&gem_cfg_wait_lock);
+    gem_cfg_compltd_map.erase(k);
+    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+    return err;
+}
+
 // This method handles waiting for OnuDeactivate Completed Indication
 bcmos_errno wait_for_onu_deactivate_complete(uint32_t intf_id, uint32_t onu_id) {
     Queue<onu_deactivate_complete_result> deact_result;
@@ -886,7 +953,20 @@
     return err;
 }
 
-Status install_gem_port(int32_t intf_id, int32_t onu_id, int32_t gemport_id) {
+Status install_gem_port(int32_t intf_id, int32_t onu_id, int32_t uni_id, int32_t gemport_id) {
+    gemport_status_map_key_tuple gem_status_key(intf_id, onu_id, uni_id, gemport_id);
+
+    bcmos_fastlock_lock(&data_lock);
+    std::map<gemport_status_map_key_tuple, bool>::const_iterator it = gemport_status_map.find(gem_status_key);
+    if (it != gemport_status_map.end()) {
+        if (it->second) {
+            bcmos_fastlock_unlock(&data_lock, 0);
+            OPENOLT_LOG(INFO, openolt_log_id, "gem port already installed = %d\n", gemport_id);
+            return Status::OK;
+        }
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+
     bcmos_errno err;
     bcmolt_itupon_gem_cfg cfg; /* declare main API struct */
     bcmolt_itupon_gem_key key = {}; /* declare key */
@@ -927,18 +1007,43 @@
         return bcm_to_grpc_err(err, "Access_Control set ITU PON Gem port failed");
     }
 
+#ifndef SCALE_AND_PERF
+    err = wait_for_gem_action(intf_id, gemport_id, GEM_OBJECT_CREATE);
+    if (err) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d err = %s\n", gemport_id, bcmos_strerror(err));
+        return bcm_to_grpc_err(err, "Access_Control set ITU PON Gem port failed");
+    }
+#endif
+
     OPENOLT_LOG(INFO, openolt_log_id, "gem port installed successfully = %d\n", gemport_id);
 
+    bcmos_fastlock_lock(&data_lock);
+    gemport_status_map[gem_status_key] = true;
+    bcmos_fastlock_unlock(&data_lock, 0);
+
     return Status::OK;
 }
 
-Status remove_gem_port(int32_t intf_id, int32_t gemport_id) {
+Status remove_gem_port(int32_t intf_id, int32_t onu_id, int32_t uni_id, int32_t gemport_id) {
+    gemport_status_map_key_tuple gem_status_key(intf_id, onu_id, uni_id, gemport_id);
+
+    bcmos_fastlock_lock(&data_lock);
+    std::map<gemport_status_map_key_tuple, bool>::const_iterator it = gemport_status_map.find(gem_status_key);
+    if (it == gemport_status_map.end()) {
+        bcmos_fastlock_unlock(&data_lock, 0);
+        OPENOLT_LOG(INFO, openolt_log_id, "gem port already removed = %d\n", gemport_id);
+        return Status::OK;
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+
     bcmolt_itupon_gem_cfg gem_cfg;
     bcmolt_itupon_gem_key key = {
         .pon_ni = (bcmolt_interface)intf_id,
         .gem_port_id = (bcmolt_gem_port_id)gemport_id
     };
     bcmos_errno err;
+    bcmolt_interface_state state;
+    bcmolt_status los_status;
 
     BCMOLT_CFG_INIT(&gem_cfg, itupon_gem, key);
     err = bcmolt_cfg_clear(dev_id, &gem_cfg.hdr);
@@ -948,8 +1053,42 @@
         return bcm_to_grpc_err(err, "Access_Control clear ITU PON Gem port failed");
     }
 
+    err = get_pon_interface_status((bcmolt_interface)intf_id, &state, &los_status);
+    if (err == BCM_ERR_OK) {
+        if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_OFF) {
+#ifndef SCALE_AND_PERF
+            OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled and LoS status is OFF, waiting for gem cfg clear response\n",
+                intf_id);
+            err = wait_for_gem_action(intf_id, gemport_id, GEM_OBJECT_DELETE);
+            if (err) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "failed to remove gem_port = %d err = %s\n", gemport_id, bcmos_strerror(err));
+                return bcm_to_grpc_err(err, "Access_Control clear ITU PON Gem port failed");
+            }
+#endif
+        }
+        else if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_ON) {
+            OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled but LoS status is ON, not waiting for gem cfg clear response\n",
+                intf_id);
+        }
+        else if (state == BCMOLT_INTERFACE_STATE_INACTIVE) {
+            OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is disabled, not waiting for gem cfg clear response\n",
+                intf_id);
+        }
+    } else {
+        OPENOLT_LOG(ERROR, openolt_log_id, "Failed to fetch PON interface status, intf_id = %d, err = %s\n",
+            intf_id, bcmos_strerror(err));
+        return bcm_to_grpc_err(err, "Access_Control clear ITU PON Gem port failed");
+    }
+
     OPENOLT_LOG(INFO, openolt_log_id, "gem port removed successfully = %d\n", gemport_id);
 
+    bcmos_fastlock_lock(&data_lock);
+    it = gemport_status_map.find(gem_status_key);
+    if (it != gemport_status_map.end()) {
+        gemport_status_map.erase(it);
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+
     return Status::OK;
 }
 
@@ -974,6 +1113,14 @@
         return bcm_to_grpc_err(err, "Failed to set encryption on GEM port");;
     }
 
+#ifndef SCALE_AND_PERF
+    err = wait_for_gem_action(intf_id, gemport_id, GEM_OBJECT_ENCRYPT);
+    if (err) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "failed to enable gemport encryption, gem_port = %d err = %s\n", gemport_id, bcmos_strerror(err));
+        return bcm_to_grpc_err(err, "Access_Control ITU PON Gem port encryption failed");
+    }
+#endif
+
     OPENOLT_LOG(INFO, openolt_log_id, "encryption set successfully on pon = %d gem_port = %d\n", intf_id, gemport_id);
 
     return Status::OK;
diff --git a/agent/src/core_utils.h b/agent/src/core_utils.h
index c860644..a7a2295 100644
--- a/agent/src/core_utils.h
+++ b/agent/src/core_utils.h
@@ -77,6 +77,7 @@
 void clear_qos_type(uint32_t pon_intf_id, uint32_t onu_id, uint32_t uni_id);
 std::string GetDirection(int direction);
 bcmos_errno wait_for_alloc_action(uint32_t intf_id, uint32_t alloc_id, AllocCfgAction action);
+bcmos_errno wait_for_gem_action(uint32_t intf_id, uint32_t gem_port_id, GemCfgAction action);
 bcmos_errno wait_for_onu_deactivate_complete(uint32_t intf_id, uint32_t onu_id);
 char* openolt_read_sysinfo(const char* field_name, char* field_val);
 Status pushOltOperInd(uint32_t intf_id, const char *type, const char *state);
@@ -92,8 +93,8 @@
 unsigned NumNniIf_();
 unsigned NumPonIf_();
 bcmos_errno get_nni_interface_status(bcmolt_interface id, bcmolt_interface_state *state);
-Status install_gem_port(int32_t intf_id, int32_t onu_id, int32_t gemport_id);
-Status remove_gem_port(int32_t intf_id, int32_t gemport_id);
+Status install_gem_port(int32_t intf_id, int32_t onu_id, int32_t uni_id, int32_t gemport_id);
+Status remove_gem_port(int32_t intf_id, int32_t onu_id, int32_t uni_id, int32_t gemport_id);
 Status enable_encryption_for_gem_port(int32_t intf_id, int32_t gemport_id);
 Status update_acl_interface(int32_t intf_id, bcmolt_interface_type intf_type, uint32_t access_control_id,
                 bcmolt_members_update_command acl_cmd);
diff --git a/agent/src/indications.cc b/agent/src/indications.cc
index 304cb45..0407260 100644
--- a/agent/src/indications.cc
+++ b/agent/src/indications.cc
@@ -35,10 +35,6 @@
 
 using grpc::Status;
 
-extern Queue<openolt::Indication> oltIndQ;
-extern std::map<alloc_cfg_compltd_key,  Queue<alloc_cfg_complete_result> *> alloc_cfg_compltd_map;
-extern bcmos_fastlock alloc_cfg_wait_lock;
-
 bool subscribed = false;
 uint32_t nni_intf_id = 0;
 #define current_device 0
@@ -596,6 +592,84 @@
     bcmolt_msg_free(msg);
 }
 
+static void ItuPonGemConfigCompletedInd(bcmolt_devid olt, bcmolt_msg *msg) {
+
+    switch (msg->obj_type) {
+        case BCMOLT_OBJ_ID_ITUPON_GEM:
+            switch (msg->subgroup) {
+                case BCMOLT_ITUPON_GEM_AUTO_SUBGROUP_CONFIGURATION_COMPLETED:
+                {
+                    bcmolt_itupon_gem_configuration_completed *pkt =
+                        (bcmolt_itupon_gem_configuration_completed*)msg;
+                    bcmolt_itupon_gem_configuration_completed_data *pkt_data =
+                        &((bcmolt_itupon_gem_configuration_completed*)msg)->data;
+
+                    gem_cfg_compltd_key key((uint32_t)pkt->key.pon_ni, (uint32_t) pkt->key.gem_port_id);
+                    gem_cfg_complete_result res;
+                    res.pon_intf_id = pkt->key.pon_ni;
+                    res.gem_port_id = pkt->key.gem_port_id;
+
+                    pkt_data->status == BCMOLT_RESULT_SUCCESS ? res.status = GEM_CFG_STATUS_SUCCESS: res.status = GEM_CFG_STATUS_FAIL;
+                    switch (pkt_data->new_state) {
+                        case BCMOLT_ACTIVATION_STATE_NOT_CONFIGURED:
+                            res.state = GEM_OBJECT_STATE_NOT_CONFIGURED;
+                            break;
+                        case BCMOLT_ACTIVATION_STATE_INACTIVE:
+                            res.state = GEM_OBJECT_STATE_INACTIVE;
+                            break;
+                        case BCMOLT_ACTIVATION_STATE_PROCESSING:
+                            res.state = GEM_OBJECT_STATE_PROCESSING;
+                            break;
+                        case BCMOLT_ACTIVATION_STATE_ACTIVE:
+                            res.state = GEM_OBJECT_STATE_ACTIVE;
+                            break;
+                        default:
+                            OPENOLT_LOG(ERROR, openolt_log_id, "invalid itu pon gem activation new_state, pon_intf %u, gem_port_id %u, new_state %d\n",
+                                    pkt->key.pon_ni, pkt->key.gem_port_id, pkt_data->new_state);
+                            res.state = GEM_OBJECT_STATE_NOT_CONFIGURED;
+                    }
+                    OPENOLT_LOG(INFO, openolt_log_id, "received itu pon gem cfg complete ind, pon intf %u, gem_port_id %u, status %u, new_state %u\n",
+                            pkt->key.pon_ni, pkt->key.gem_port_id, pkt_data->status, pkt_data->new_state);
+
+                    uint32_t gem_cfg_key_check_counter = 1;
+                    std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *>::iterator it;
+                    while(true) {
+                        bcmos_fastlock_lock(&gem_cfg_wait_lock);
+                        // Push the result from BAL to queue
+                        it = gem_cfg_compltd_map.find(key);
+
+                        if (it != gem_cfg_compltd_map.end()) {
+                            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                            break;
+                        } else if (it == gem_cfg_compltd_map.end() && gem_cfg_key_check_counter < MAX_GEM_CFG_KEY_CHECK) {
+                            /*During removal of gemport, indication from BAL arriving soon even before we start waiting for gemport cfg completion
+                            by pushing empty cfg_result for gem_cfg_compltd_key. To handle this scenario delaying to push gem cfg completion indication
+                            to Queue by 6ms.*/
+                            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                            bcmos_usleep(6000);
+                        } else {
+                            // could be case of spurious aysnc response, OR, the application timed-out waiting for response and cleared the key.
+                            bcmolt_msg_free(msg);
+                            OPENOLT_LOG(ERROR, openolt_log_id, "gem config key not found for gem_port_id = %u, pon_intf = %u\n", pkt->key.gem_port_id, pkt->key.pon_ni);
+                            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                            return;
+                        }
+                        gem_cfg_key_check_counter++;
+                    }
+
+                    bcmos_fastlock_lock(&gem_cfg_wait_lock);
+                    if (it->second) {
+                        // Push the result
+                        it->second->push(res);
+                    }
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                }
+            }
+    }
+
+    bcmolt_msg_free(msg);
+}
+
 static void FlowOperIndication(bcmolt_devid olt, bcmolt_msg *msg) {
     openolt::Indication ind;
     OPENOLT_LOG(DEBUG, openolt_log_id, "flow oper state indication\n");
@@ -1371,6 +1445,14 @@
     rc = bcmolt_ind_subscribe(current_device, &rx_cfg);
     if(rc != BCM_ERR_OK)
         return Status(grpc::StatusCode::INTERNAL, "ITU PON Alloc Configuration Complete Indication subscribe failed");
+
+    rx_cfg.obj_type = BCMOLT_OBJ_ID_ITUPON_GEM;
+    rx_cfg.rx_cb = ItuPonGemConfigCompletedInd;
+    rx_cfg.flags = BCMOLT_AUTO_FLAGS_NONE;
+    rx_cfg.subgroup = bcmolt_itupon_gem_auto_subgroup_configuration_completed;
+    rc = bcmolt_ind_subscribe(current_device, &rx_cfg);
+    if(rc != BCM_ERR_OK)
+        return Status(grpc::StatusCode::INTERNAL, "ITU PON Gem Configuration Complete Indication subscribe failed");
 #endif
 
     rx_cfg.obj_type = BCMOLT_OBJ_ID_GROUP;
diff --git a/agent/test/src/test_core.cc b/agent/test/src/test_core.cc
index 650b497..6df510b 100644
--- a/agent/test/src/test_core.cc
+++ b/agent/test/src/test_core.cc
@@ -25,10 +25,6 @@
 using namespace testing;
 using namespace std;
 
-extern std::map<alloc_cfg_compltd_key,  Queue<alloc_cfg_complete_result> *> alloc_cfg_compltd_map;
-extern dev_log_id openolt_log_id;
-extern bcmos_fastlock alloc_cfg_wait_lock;
-
 class TestOltEnable : public Test {
  protected:
   virtual void SetUp() {
@@ -1431,6 +1427,47 @@
 
         virtual void TearDown() {
         }
+
+    public:
+        static int PushGemCfgResult(GemObjectState state, GemCfgStatus status, uint32_t gem_port_id) {
+            gem_cfg_compltd_key k(0, gem_port_id);
+            gem_cfg_complete_result res;
+            res.pon_intf_id = 0;
+            res.gem_port_id = gem_port_id;
+            res.state = state;
+            res.status = status;
+
+            uint32_t gem_cfg_key_check_counter = 1;
+            std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *>::iterator it;
+            while(true) {
+                bcmos_fastlock_lock(&gem_cfg_wait_lock);
+                it = gem_cfg_compltd_map.find(k);
+
+                if (it != gem_cfg_compltd_map.end()) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    break;
+                } else if (it == gem_cfg_compltd_map.end() && gem_cfg_key_check_counter < MAX_GEM_CFG_KEY_CHECK) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+
+                    // We need to wait for some time to allow the Gem Cfg Request to be triggered
+                    // before we push the result.
+                    bcmos_usleep(6000);
+                } else {
+                    OPENOLT_LOG(ERROR, openolt_log_id, "gem config key not found for gem_port_id = %u, pon_intf = %u\n", gem_port_id, 0);
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    return 0;
+                }
+                gem_cfg_key_check_counter++;
+            }
+
+            bcmos_fastlock_lock(&gem_cfg_wait_lock);
+            if (it->second) {
+                it->second->push(res);
+                OPENOLT_LOG(INFO, openolt_log_id, "Pushed mocked gem cfg result\n");
+            }
+            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+            return 0;
+        }
 };
 
 // Test 1 - FlowAdd - success case(HSIA-upstream FixedQueue)
@@ -1574,9 +1611,16 @@
     EXPECT_GLOBAL_CALL(bcmolt_cfg_get__flow_stub, bcmolt_cfg_get__flow_stub(_, _))
                      .WillRepeatedly(DoAll(SetArg1ToBcmOltFlowCfg(flow_cfg), Return(flow_cfg_get_stub_res)));
     ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
-    CreateTrafficQueues_(traffic_queues);
 
-    Status status = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type, alloc_id, network_intf_id,
+    future<Status> future_res = async(launch::async, CreateTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+                async(launch::async, TestFlowAdd::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+    push_gem_cfg_complt = \
+                async(launch::async, TestFlowAdd::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1025);
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
+
+    status = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type, alloc_id, network_intf_id,
         gemport_id, *classifier, *action, priority_value, cookie, group_id, tech_profile_id);
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
@@ -1626,8 +1670,13 @@
                      .WillRepeatedly(DoAll(SetArg1ToBcmOltFlowCfg(flow_cfg), Return(flow_cfg_get_stub_res)));
     ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
 
-    Status status = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type, alloc_id, network_intf_id,
+    future<Status> future_res = async(launch::async, FlowAdd_, access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type, alloc_id, network_intf_id,
         gemport_id, *classifier, *action, priority_value, cookie, group_id, tech_profile_id, enable_encryption);
+
+    future<int> push_gem_cfg_complt = \
+                async(launch::async, TestFlowAdd::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
 
@@ -2531,11 +2580,51 @@
 
         virtual void TearDown() {
         }
+
+    public:
+        static int PushGemCfgResult(GemObjectState state, GemCfgStatus status, uint32_t gem_port_id) {
+            gem_cfg_compltd_key k(0, gem_port_id);
+            gem_cfg_complete_result res;
+            res.pon_intf_id = 0;
+            res.gem_port_id = gem_port_id;
+            res.state = state;
+            res.status = status;
+
+            uint32_t gem_cfg_key_check_counter = 1;
+            std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *>::iterator it;
+            while(true) {
+                bcmos_fastlock_lock(&gem_cfg_wait_lock);
+                it = gem_cfg_compltd_map.find(k);
+
+                if (it != gem_cfg_compltd_map.end()) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    break;
+                } else if (it == gem_cfg_compltd_map.end() && gem_cfg_key_check_counter < MAX_GEM_CFG_KEY_CHECK) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+
+                    // We need to wait for some time to allow the Gem Cfg Request to be triggered
+                    // before we push the result.
+                    bcmos_usleep(6000);
+                } else {
+                    OPENOLT_LOG(ERROR, openolt_log_id, "gem config key not found for gem_port_id = %u, pon_intf = %u\n", gem_port_id, 0);
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    return 0;
+                }
+                gem_cfg_key_check_counter++;
+            }
+
+            bcmos_fastlock_lock(&gem_cfg_wait_lock);
+            if (it->second) {
+                it->second->push(res);
+                OPENOLT_LOG(INFO, openolt_log_id, "Pushed mocked gem cfg result\n");
+            }
+            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+            return 0;
+        }
 };
 
 // Test 1 - CreateTrafficQueues-Upstream/Downstream FIXED_QUEUE success case
 TEST_F(TestCreateTrafficQueues, CreateUpstreamDownstreamFixedQueueSuccess) {
-    Status status;
     traffic_queues->set_uni_id(0);
     traffic_queues->set_port_no(16);
     traffic_queue_1->set_direction(tech_profile::Direction::UPSTREAM);
@@ -2543,7 +2632,11 @@
     bcmos_errno olt_cfg_set_res = BCM_ERR_OK;
     ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
 
-    status = CreateTrafficQueues_(traffic_queues);
+    future<Status> future_res = async(launch::async, CreateTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+                async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 
     traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
@@ -2576,7 +2669,14 @@
     bcmos_errno olt_cfg_set_res = BCM_ERR_OK;
     ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
 
-    Status status = CreateTrafficQueues_(traffic_queues);
+    future<Status> future_res = async(launch::async, CreateTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+                async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    push_gem_cfg_complt = \
+                async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1025);
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
 
@@ -2634,12 +2734,19 @@
     bcmos_errno olt_cfg_set_res = BCM_ERR_OK;
     ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
 
-    Status status = CreateTrafficQueues_(traffic_queues);
+    future<Status> future_res = async(launch::async, CreateTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+                async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    push_gem_cfg_complt = \
+                async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1025);
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
 
-// Test 5 - CreateTrafficQueues-Upstream PRIORITY_TO_QUEUE TM QMP Max count reached case
-TEST_F(TestCreateTrafficQueues, CreateUpstreamPriorityQueueReachedMaxTMQMPCount) {
+// Test 5 - CreateTrafficQueues-Downstream PRIORITY_TO_QUEUE TM QMP Max count reached case
+TEST_F(TestCreateTrafficQueues, CreateDownstreamPriorityQueueReachedMaxTMQMPCount) {
     int uni_ids[17] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17};
     int port_nos[17] = {16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 272};
     std::string pbit_maps[17] = {"0b00001010", "0b10001010", "0b00000001", "0b00000010", "0b00000100", "0b00001000", "0b00010000", "0b00100000", "0b01000000", "0b10000000", "0b10000001", "0b10000010", "0b10000100", "0b10001000", "0b10010000", "0b10100000", "0b11000000"};
@@ -2648,7 +2755,7 @@
     for(int i=0; i<sizeof(uni_ids)/sizeof(uni_ids[0]); i++) {
         traffic_queues->set_uni_id(uni_ids[i]);
         traffic_queues->set_port_no(port_nos[i]);
-        traffic_queue_1->set_direction(tech_profile::Direction::UPSTREAM);
+        traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
 
         traffic_queue_2->set_gemport_id(1025);
         traffic_queue_2->set_pbit_map(pbit_maps[i]);
@@ -2663,12 +2770,20 @@
         tail_drop_discard_config_2->set_queue_size(8);
         discard_config_2->set_allocated_tail_drop_discard_config(tail_drop_discard_config_2);
         traffic_queue_2->set_allocated_discard_config(discard_config_2);
-        traffic_queue_2->set_direction(tech_profile::Direction::UPSTREAM);
+        traffic_queue_2->set_direction(tech_profile::Direction::DOWNSTREAM);
 
         bcmos_errno olt_cfg_set_res = BCM_ERR_OK;
         ON_CALL(balMock, bcmolt_cfg_set(_, _)).WillByDefault(Return(olt_cfg_set_res));
 
-        Status status = CreateTrafficQueues_(traffic_queues);
+        future<Status> future_res = async(launch::async, CreateTrafficQueues_, traffic_queues);
+        future<int> push_gem_cfg_complt = \
+                    async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1024);
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        push_gem_cfg_complt = \
+                    async(launch::async, TestCreateTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_ACTIVE, GEM_CFG_STATUS_SUCCESS, 1025);
+        Status status = future_res.get();
+        int res = push_gem_cfg_complt.get();
+
         if(i==16)
             ASSERT_TRUE( status.error_message() != Status::OK.error_message() );
         else
@@ -2699,6 +2814,7 @@
         tech_profile::TrafficQueues* traffic_queues;
         tech_profile::TrafficQueue* traffic_queue_1;
         tech_profile::TrafficQueue* traffic_queue_2;
+        uint32_t pon_id = 0;
 
         virtual void SetUp() {
             traffic_queues = new tech_profile::TrafficQueues;
@@ -2711,11 +2827,51 @@
 
         virtual void TearDown() {
         }
+
+    public:
+        static int PushGemCfgResult(GemObjectState state, GemCfgStatus status, uint32_t gem_port_id) {
+            gem_cfg_compltd_key k(0, gem_port_id);
+            gem_cfg_complete_result res;
+            res.pon_intf_id = 0;
+            res.gem_port_id = gem_port_id;
+            res.state = state;
+            res.status = status;
+
+            uint32_t gem_cfg_key_check_counter = 1;
+            std::map<gem_cfg_compltd_key,  Queue<gem_cfg_complete_result> *>::iterator it;
+            while(true) {
+                bcmos_fastlock_lock(&gem_cfg_wait_lock);
+                it = gem_cfg_compltd_map.find(k);
+
+                if (it != gem_cfg_compltd_map.end()) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    break;
+                } else if (it == gem_cfg_compltd_map.end() && gem_cfg_key_check_counter < MAX_GEM_CFG_KEY_CHECK) {
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+
+                    // We need to wait for some time to allow the Gem Cfg Request to be triggered
+                    // before we push the result.
+                    bcmos_usleep(6000);
+                } else {
+                    OPENOLT_LOG(ERROR, openolt_log_id, "gem config key not found for gem_port_id = %u, pon_intf = %u\n", gem_port_id, 0);
+                    bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+                    return 0;
+                }
+                gem_cfg_key_check_counter++;
+            }
+
+            bcmos_fastlock_lock(&gem_cfg_wait_lock);
+            if (it->second) {
+                it->second->push(res);
+                OPENOLT_LOG(INFO, openolt_log_id, "Pushed mocked gem cfg result\n");
+            }
+            bcmos_fastlock_unlock(&gem_cfg_wait_lock, 0);
+            return 0;
+        }
 };
 
 // Test 1 - RemoveTrafficQueues-Upstream/Downstream FIXED_QUEUE success case
 TEST_F(TestRemoveTrafficQueues, RemoveUpstreamDownstreamFixedQueueSuccess) {
-    Status status;
     traffic_queues->set_uni_id(0);
     traffic_queues->set_port_no(16);
     traffic_queue_1->set_direction(tech_profile::Direction::UPSTREAM);
@@ -2723,7 +2879,21 @@
     bcmos_errno olt_cfg_clear_res = BCM_ERR_OK;
     ON_CALL(balMock, bcmolt_cfg_clear(_, _)).WillByDefault(Return(olt_cfg_clear_res));
 
-    status = RemoveTrafficQueues_(traffic_queues);
+    bcmolt_pon_interface_key pon_key;
+    bcmolt_pon_interface_cfg pon_cfg;
+    pon_key.pon_ni = pon_id;
+    BCMOLT_CFG_INIT(&pon_cfg, pon_interface, pon_key);
+    pon_cfg.data.state = BCMOLT_INTERFACE_STATE_ACTIVE_WORKING;
+    bcmos_errno olt_cfg_get_pon_stub_res = BCM_ERR_OK;
+    EXPECT_GLOBAL_CALL(bcmolt_cfg_get__pon_intf_stub, bcmolt_cfg_get__pon_intf_stub(_, _))
+                     .WillOnce(DoAll(SetArg1ToBcmOltPonCfg(pon_cfg), Return(olt_cfg_get_pon_stub_res)));
+
+    future<Status> future_res = async(launch::async, RemoveTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1024);
+
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 
     traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
@@ -2733,7 +2903,6 @@
 
 // Test 2 - RemoveTrafficQueues-Downstream FIXED_QUEUE failure case
 TEST_F(TestRemoveTrafficQueues, RemoveUpstreamDownstreamFixedQueueFailure) {
-    Status status;
     traffic_queues->set_uni_id(0);
     traffic_queues->set_port_no(16);
     traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
@@ -2741,7 +2910,7 @@
     bcmos_errno olt_cfg_clear_res = BCM_ERR_INTERNAL;
     ON_CALL(balMock, bcmolt_cfg_clear(_, _)).WillByDefault(Return(olt_cfg_clear_res));
 
-    status = RemoveTrafficQueues_(traffic_queues);
+    Status status = RemoveTrafficQueues_(traffic_queues);
     ASSERT_TRUE( status.error_message() != Status::OK.error_message() );
 }
 
@@ -2782,41 +2951,100 @@
     traffic_queue_2->set_priority(1);
     traffic_queue_2->set_direction(tech_profile::Direction::UPSTREAM);
 
-    Status status = RemoveTrafficQueues_(traffic_queues);
+    bcmolt_pon_interface_key pon_key;
+    bcmolt_pon_interface_cfg pon_cfg;
+    pon_key.pon_ni = pon_id;
+    BCMOLT_CFG_INIT(&pon_cfg, pon_interface, pon_key);
+    pon_cfg.data.state = BCMOLT_INTERFACE_STATE_ACTIVE_WORKING;
+    bcmos_errno olt_cfg_get_pon_stub_res = BCM_ERR_OK;
+    EXPECT_GLOBAL_CALL(bcmolt_cfg_get__pon_intf_stub, bcmolt_cfg_get__pon_intf_stub(_, _))
+                     .WillRepeatedly(DoAll(SetArg1ToBcmOltPonCfg(pon_cfg), Return(olt_cfg_get_pon_stub_res)));
+
+    future<Status> future_res = async(launch::async, RemoveTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1024);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1025);
+
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
 
-/* Test 5 - RemoveTrafficQueues-Upstream PRIORITY_TO_QUEUE, removing TM QMP as it
+/* Test 5 - RemoveTrafficQueues-Downstream PRIORITY_TO_QUEUE, removing TM QMP as it
 is not getting referred by any other queues case */
-TEST_F(TestRemoveTrafficQueues, RemoveUpstreamPriorityQueueRemovingTMQMP) {
-    traffic_queues->set_uni_id(1);
-    traffic_queues->set_port_no(32);
-    traffic_queue_1->set_direction(tech_profile::Direction::UPSTREAM);
+TEST_F(TestRemoveTrafficQueues, RemoveDownstreamPriorityQueueRemovingTMQMP) {
+    traffic_queues->set_uni_id(5);
+    traffic_queues->set_port_no(80);
+    traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
     traffic_queue_2 = traffic_queues->add_traffic_queues();
     traffic_queue_2->set_gemport_id(1025);
     traffic_queue_2->set_priority(1);
+    traffic_queue_2->set_direction(tech_profile::Direction::DOWNSTREAM);
+
+    bcmolt_pon_interface_key pon_key;
+    bcmolt_pon_interface_cfg pon_cfg;
+    pon_key.pon_ni = pon_id;
+    BCMOLT_CFG_INIT(&pon_cfg, pon_interface, pon_key);
+    pon_cfg.data.state = BCMOLT_INTERFACE_STATE_ACTIVE_WORKING;
+    bcmos_errno olt_cfg_get_pon_stub_res = BCM_ERR_OK;
+    EXPECT_GLOBAL_CALL(bcmolt_cfg_get__pon_intf_stub, bcmolt_cfg_get__pon_intf_stub(_, _))
+                     .WillRepeatedly(DoAll(SetArg1ToBcmOltPonCfg(pon_cfg), Return(olt_cfg_get_pon_stub_res)));
 
     bcmos_errno olt_cfg_clear_res = BCM_ERR_OK;
     ON_CALL(balMock, bcmolt_cfg_clear(_, _)).WillByDefault(Return(olt_cfg_clear_res));
 
-    Status status = RemoveTrafficQueues_(traffic_queues);
+    future<Status> future_res = async(launch::async, RemoveTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1024);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1025);
+
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() == Status::OK.error_message() );
 }
 
-/* Test 6 - RemoveTrafficQueues-Upstream PRIORITY_TO_QUEUE, error while removing TM QMP
+/* Test 6 - RemoveTrafficQueues-Downstream PRIORITY_TO_QUEUE, error while removing TM QMP
 having no reference to any other queues case */
-TEST_F(TestRemoveTrafficQueues, RemoveUpstreamPriorityQueueErrorRemovingTMQMP) {
+TEST_F(TestRemoveTrafficQueues, RemoveDownstreamPriorityQueueErrorRemovingTMQMP) {
     traffic_queues->set_uni_id(4);
     traffic_queues->set_port_no(64);
-    traffic_queue_1->set_direction(tech_profile::Direction::UPSTREAM);
+    traffic_queue_1->set_direction(tech_profile::Direction::DOWNSTREAM);
     traffic_queue_2 = traffic_queues->add_traffic_queues();
     traffic_queue_2->set_gemport_id(1025);
     traffic_queue_2->set_priority(1);
+    traffic_queue_2->set_direction(tech_profile::Direction::DOWNSTREAM);
 
-    bcmos_errno olt_cfg_clear_res = BCM_ERR_INTERNAL;
-    ON_CALL(balMock, bcmolt_cfg_clear(_, _)).WillByDefault(Return(olt_cfg_clear_res));
+    bcmolt_pon_interface_key pon_key;
+    bcmolt_pon_interface_cfg pon_cfg;
+    pon_key.pon_ni = pon_id;
+    BCMOLT_CFG_INIT(&pon_cfg, pon_interface, pon_key);
+    pon_cfg.data.state = BCMOLT_INTERFACE_STATE_ACTIVE_WORKING;
+    bcmos_errno olt_cfg_get_pon_stub_res = BCM_ERR_OK;
+    EXPECT_GLOBAL_CALL(bcmolt_cfg_get__pon_intf_stub, bcmolt_cfg_get__pon_intf_stub(_, _))
+                     .WillRepeatedly(DoAll(SetArg1ToBcmOltPonCfg(pon_cfg), Return(olt_cfg_get_pon_stub_res)));
 
-    Status status = RemoveTrafficQueues_(traffic_queues);
+    bcmos_errno olt_cfg_clear_res_success = BCM_ERR_OK;
+    bcmos_errno olt_cfg_clear_res_failure = BCM_ERR_INTERNAL;
+    EXPECT_CALL(balMock, bcmolt_cfg_clear(_, _))
+                         .WillOnce(Return(olt_cfg_clear_res_success))
+                         .WillOnce(Return(olt_cfg_clear_res_success))
+                         .WillOnce(Return(olt_cfg_clear_res_success))
+                         .WillOnce(Return(olt_cfg_clear_res_success))
+                         .WillRepeatedly(Return(olt_cfg_clear_res_failure));
+
+    future<Status> future_res = async(launch::async, RemoveTrafficQueues_, traffic_queues);
+    future<int> push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1024);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    push_gem_cfg_complt = \
+    async(launch::async, TestRemoveTrafficQueues::PushGemCfgResult, GEM_OBJECT_STATE_NOT_CONFIGURED, GEM_CFG_STATUS_SUCCESS, 1025);
+
+    Status status = future_res.get();
+    int res = push_gem_cfg_complt.get();
     ASSERT_TRUE( status.error_message() != Status::OK.error_message() );
 }