VOL-3419: Replicate voltha flows in openolt agent
- Use the flow_id, symmetric_flow_id, replication_flag and pbit_to_gemport_map
  coming in Flow proto messge to replicate the flow as needed
- Use the CreateQueues and RemoveQueues messages to setup and remove gem_ports
- Use latest gRPC version 1.31.1 which allows fine tuning of gRPC threadpools
  which allows for greating performance.
- Performance numbers when tested with openolt-scale-tester has been better with
  using latest gRPC and threadpool tuning when compared to earlier. It is to be
  noted that the performance is better even with openolt-agent replicating the flow
  now.
- Scale tests with 512 subscribers have been successfull with BAL3.4.7.5 version
- Use openolt proto version 4.0.0
- Use openolt-test (for unit test) image version 2.0.1 (which bundles latest gRPC version 1.31.1)
- These changes are NOT backward compatible and openolt-agent will have a major
  version bump to 3.0.0

Change-Id: I715c804bdf342e60d08cab6c59e1c21b8c5ac1f4
diff --git a/agent/src/core_api_handler.cc b/agent/src/core_api_handler.cc
index aa45b0f..a2b0bcf 100644
--- a/agent/src/core_api_handler.cc
+++ b/agent/src/core_api_handler.cc
@@ -65,9 +65,9 @@
 static std::string firmware_version = "Openolt.2019.07.01";
 
 static bcmos_errno CreateSched(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
-                          uint32_t port_no, uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, \
-                          uint32_t priority, tech_profile::SchedulingPolicy sched_policy,
-                          tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
+                          uint32_t port_no, uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, \
+                          uint32_t priority, ::tech_profile::SchedulingPolicy sched_policy,
+                          ::tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
 static bcmos_errno RemoveSched(int intf_id, int onu_id, int uni_id, int alloc_id, std::string direction, int tech_profile_id);
 static bcmos_errno CreateQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
                                bcmolt_egress_qos_type qos_type, uint32_t priority, uint32_t gemport_id, uint32_t tech_profile_id);
@@ -201,7 +201,7 @@
     return Status::OK;
 }
 
-Status GetDeviceInfo_(openolt::DeviceInfo* device_info) {
+Status GetDeviceInfo_(::openolt::DeviceInfo* device_info) {
     device_info->set_vendor(VENDOR_ID);
     device_info->set_model(MODEL_ID);
     device_info->set_hardware_version("");
@@ -238,90 +238,49 @@
 
     // Legacy, device-wide ranges. To be deprecated when adapter
     // is upgraded to support per-interface ranges
-    if (board_technology == "XGS-PON") {
-        device_info->set_onu_id_start(1);
-        device_info->set_onu_id_end(255);
-        device_info->set_alloc_id_start(MIN_ALLOC_ID_XGSPON);
-        device_info->set_alloc_id_end(16383);
-        device_info->set_gemport_id_start(1024);
-        device_info->set_gemport_id_end(65535);
-        device_info->set_flow_id_start(1);
-        device_info->set_flow_id_end(16383);
-    }
-    else if (board_technology == "GPON") {
-        device_info->set_onu_id_start(1);
-        device_info->set_onu_id_end(127);
-        device_info->set_alloc_id_start(MIN_ALLOC_ID_GPON);
-        device_info->set_alloc_id_end(767);
-        device_info->set_gemport_id_start(256);
-        device_info->set_gemport_id_end(4095);
-        device_info->set_flow_id_start(1);
-        device_info->set_flow_id_end(16383);
-    }
+    device_info->set_onu_id_start(ONU_ID_START);
+    device_info->set_onu_id_end(ONU_ID_END);
+    device_info->set_alloc_id_start(ALLOC_ID_START);
+    device_info->set_alloc_id_end(ALLOC_ID_END);
+    device_info->set_gemport_id_start(GEM_PORT_ID_START);
+    device_info->set_gemport_id_end(GEM_PORT_ID_END);
+    device_info->set_flow_id_start(FLOW_ID_START);
+    device_info->set_flow_id_end(FLOW_ID_END);
 
-    std::map<std::string, openolt::DeviceInfo::DeviceResourceRanges*> ranges;
+    std::map<std::string, ::openolt::DeviceInfo::DeviceResourceRanges*> ranges;
     for (uint32_t intf_id = 0; intf_id < num_of_pon_ports; ++intf_id) {
         std::string intf_technology = intf_technologies[intf_id];
-        openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
+        ::openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
         if(range == nullptr) {
             range = device_info->add_ranges();
             ranges[intf_technology] = range;
             range->set_technology(intf_technology);
 
-            if (intf_technology == "XGS-PON") {
-                openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
+                ::openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
 
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(1);
-                pool->set_end(255);
+            pool = range->add_pools();
+            pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
+            pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+            pool->set_start(ONU_ID_START);
+            pool->set_end(ONU_ID_END);
 
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(1024);
-                pool->set_end(16383);
+            pool = range->add_pools();
+            pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
+            pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+            pool->set_start(ALLOC_ID_START);
+            pool->set_end(ALLOC_ID_START);
 
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(1024);
-                pool->set_end(65535);
+            pool = range->add_pools();
+            pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
+            pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+            pool->set_start(GEM_PORT_ID_START);
+            pool->set_end(GEM_PORT_ID_END);
 
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
-                pool->set_start(1);
-                pool->set_end(16383);
-            }
-            else if (intf_technology == "GPON") {
-                openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
-
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(1);
-                pool->set_end(127);
-
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(256);
-                pool->set_end(757);
-
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
-                pool->set_start(256);
-                pool->set_end(4095);
-
-                pool = range->add_pools();
-                pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
-                pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
-                pool->set_start(1);
-                pool->set_end(16383);
-            }
+            pool = range->add_pools();
+            pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
+            pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
+            pool->set_start(FLOW_ID_START);
+            pool->set_end(FLOW_ID_END);
         }
 
         range->add_intf_ids(intf_id);
@@ -376,6 +335,11 @@
         }
 
         bcmos_fastlock_init(&data_lock, 0);
+        bcmos_fastlock_init(&acl_id_bitset_lock, 0);
+        bcmos_fastlock_init(&tm_sched_bitset_lock, 0);
+        bcmos_fastlock_init(&tm_qmp_bitset_lock, 0);
+        bcmos_fastlock_init(&flow_id_bitset_lock, 0);
+        bcmos_fastlock_init(&voltha_flow_to_device_flow_lock, 0);
         bcmos_fastlock_init(&alloc_cfg_wait_lock, 0);
         bcmos_fastlock_init(&onu_deactivate_wait_lock, 0);
         OPENOLT_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
@@ -469,8 +433,8 @@
     }
     if (failedCount == 0) {
         state.deactivate();
-        openolt::Indication ind;
-        openolt::OltIndication* olt_ind = new openolt::OltIndication;
+        ::openolt::Indication ind;
+        ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
         olt_ind->set_oper_state("down");
         ind.set_allocated_olt_ind(olt_ind);
         BCM_LOG(INFO, openolt_log_id, "Disable OLT, add an extra indication\n");
@@ -496,8 +460,8 @@
     }
     if (failedCount == 0) {
         state.activate();
-        openolt::Indication ind;
-        openolt::OltIndication* olt_ind = new openolt::OltIndication;
+        ::openolt::Indication ind;
+        ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
         olt_ind->set_oper_state("up");
         ind.set_allocated_olt_ind(olt_ind);
         BCM_LOG(INFO, openolt_log_id, "Reenable OLT, add an extra indication\n");
@@ -1182,7 +1146,7 @@
             return bcm_to_grpc_err(err, "Failed to configure ONU");
         }
     }
-    
+
     if (omcc_encryption_mode == true) {
         // set the encryption mode for omci port id
         bcmolt_itupon_gem_cfg gem_cfg;
@@ -1442,7 +1406,7 @@
     bcmolt_flow_send_eth_packet oper; /* declare main API struct */
 
     // TODO: flow_id is currently not passed in UplinkPacket message from voltha.
-    bcmolt_flow_id flow_id = 0;
+    bcmolt_flow_id flow_id = INVALID_FLOW_ID;
 
     //validate flow_id and find flow_id/flow type: upstream/ingress type: PON/egress type: NNI
     if (get_flow_status(flow_id, BCMOLT_FLOW_TYPE_UPSTREAM, FLOW_TYPE) == BCMOLT_FLOW_TYPE_UPSTREAM && \
@@ -1492,6 +1456,192 @@
 
     return Status::OK;
 }
+
+Status FlowAddWrapper_(const ::openolt::Flow* request) {
+
+    int32_t access_intf_id = request->access_intf_id();
+    int32_t onu_id = request->onu_id();
+    int32_t uni_id = request->uni_id();
+    uint32_t port_no = request->port_no();
+    uint64_t voltha_flow_id = request->flow_id();
+    uint64_t symmetric_voltha_flow_id = request->symmetric_flow_id();
+    const std::string flow_type = request->flow_type();
+    int32_t alloc_id = request->alloc_id();
+    int32_t network_intf_id = request->network_intf_id();
+    int32_t gemport_id = request->gemport_id();
+    const ::openolt::Classifier& classifier = request->classifier();
+    const ::openolt::Action& action = request->action();
+    int32_t priority = request->priority();
+    uint64_t cookie = request->cookie();
+    int32_t group_id = request->group_id();
+    uint32_t tech_profile_id = request->tech_profile_id();
+    bool replicate_flow = request->replicate_flow();
+    const google::protobuf::Map<unsigned int, unsigned int> &pbit_to_gemport = request->pbit_to_gemport();
+    uint16_t flow_id;
+
+    // The intf_id variable defaults to access(PON) interface ID.
+    // For trap-from-nni flow where access interface ID is not valid , change it to NNI interface ID
+    // This intf_id identifies the pool from which we get the flow_id
+    uint32_t intf_id = access_intf_id;
+    if (onu_id < 1) {
+        onu_id = 1;
+    }
+    if (access_intf_id < 0) {
+        intf_id = network_intf_id;
+    }
+
+    OPENOLT_LOG(INFO, openolt_log_id, "received flow add. voltha_flow_id=%lu, symmetric_voltha_flow_id=%lu, replication=%d\n", voltha_flow_id, symmetric_voltha_flow_id, replicate_flow)
+    // This is the case of voltha_flow_id (not symmetric_voltha_flow_id)
+    if (is_voltha_flow_installed(voltha_flow_id)) {
+        OPENOLT_LOG(INFO, openolt_log_id, "voltha_flow_id=%lu, already installed\n", voltha_flow_id);
+        return ::Status(grpc::StatusCode::ALREADY_EXISTS, "voltha-flow-already-installed");
+    }
+
+    // Trap-to-host voltha flows need not be replicated as they are installed as ACLs, not BAL flows.
+    if (action.cmd().trap_to_host()) {
+        replicate_flow = false;
+    }
+
+    // This is the case of symmetric_voltha_flow_id
+    // If symmetric_voltha_flow_id is available and valid in the Flow message,
+    // check if it is installed, and use the corresponding device_flow_id
+    if (symmetric_voltha_flow_id > 0 && is_voltha_flow_installed(symmetric_voltha_flow_id)) { // symmetric flow found
+        OPENOLT_LOG(INFO, openolt_log_id, "symmetric flow and the symmetric flow is installed\n");
+        const device_flow_params *dev_fl_symm_params;
+        dev_fl_symm_params = get_device_flow_params(symmetric_voltha_flow_id);
+        if (dev_fl_symm_params == NULL) {
+            OPENOLT_LOG(ERROR, openolt_log_id, "symmetric flow device params not found symm-voltha-flow=%lu voltha-flow=%lu\n", symmetric_voltha_flow_id, voltha_flow_id)
+            return ::Status(grpc::StatusCode::INTERNAL, "symmetric-flow-details-not-found");
+        }
+
+        if (!replicate_flow) {  // No flow replication
+                flow_id = dev_fl_symm_params[0].flow_id;
+                gemport_id = dev_fl_symm_params[0].gemport_id; // overwrite the gemport with symmetric flow gemport
+                                                               // Should be same as what is coming in this request.
+                ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+                cl.set_o_pbits(dev_fl_symm_params[0].pbit);
+                Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                                    flow_type, alloc_id, network_intf_id, gemport_id, cl,
+                                    action, priority, cookie, group_id, tech_profile_id);
+                if (st.error_code() == grpc::StatusCode::OK) {
+                    device_flow dev_fl;
+                    dev_fl.is_flow_replicated = false;
+                    dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+                    dev_fl.voltha_flow_id = voltha_flow_id;
+                    memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params));
+                    // update voltha flow to cache
+                    update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+                }
+                return st;
+        } else { // Flow to be replicated
+            OPENOLT_LOG(INFO, openolt_log_id,"symmetric flow and replication is needed\n");
+            for (uint8_t i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+                ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+                flow_id = dev_fl_symm_params[i].flow_id;
+                gemport_id = dev_fl_symm_params[i].gemport_id;
+                cl.set_o_pbits(dev_fl_symm_params[i].pbit);
+                Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                                    flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+                                    action, priority, cookie, group_id, tech_profile_id);
+                if (st.error_code() != grpc::StatusCode::OK && st.error_code() != grpc::StatusCode::ALREADY_EXISTS) {
+                    OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+                    // On failure remove any successfully replicated flows installed so far for the voltha_flow_id
+                    if (i > 0) {
+                        for (int8_t j = i-1; j >= 0; j--) {
+                            flow_id = dev_fl_symm_params[j].flow_id;
+                            FlowRemove_(flow_id, flow_type);
+                        }
+                    }
+                    return st;
+                }
+            }
+            device_flow dev_fl;
+            dev_fl.is_flow_replicated = true;
+            dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+            dev_fl.voltha_flow_id = voltha_flow_id;
+            memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params)*NUMBER_OF_REPLICATED_FLOWS);
+            // update voltha flow to cache
+            update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+        }
+    } else { // No symmetric flow found
+        if (!replicate_flow) { // No flow replication
+            OPENOLT_LOG(INFO, openolt_log_id, "not a symmetric flow and replication is not needed\n");
+            flow_id = get_flow_id();
+            if (flow_id == INVALID_FLOW_ID) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated flow id for voltha-flow-id=%lu\n", voltha_flow_id);
+                return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+            }
+            Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                                flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+                                action, priority, cookie, group_id, tech_profile_id);
+            if (st.error_code() == grpc::StatusCode::OK) {
+                device_flow dev_fl;
+                dev_fl.is_flow_replicated = false;
+                dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // Invalid
+                dev_fl.voltha_flow_id = voltha_flow_id;
+                dev_fl.params[0].flow_id = flow_id;
+                dev_fl.params[0].gemport_id = gemport_id;
+                dev_fl.params[0].pbit = classifier.o_pbits();
+                // update voltha flow to cache
+                update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+            } else {
+                // Free the flow id on failure
+                free_flow_id(flow_id);
+            }
+            return st;
+        } else { // Flow to be replicated
+            OPENOLT_LOG(INFO, openolt_log_id,"not a symmetric flow and replication is needed\n");
+            if (pbit_to_gemport.size() != NUMBER_OF_PBITS) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "invalid pbit-to-gemport map size=%lu", pbit_to_gemport.size())
+                return ::Status(grpc::StatusCode::OUT_OF_RANGE, "pbit-to-gemport-map-len-invalid-for-flow-replication");
+            }
+            uint16_t flow_ids[NUMBER_OF_REPLICATED_FLOWS];
+            device_flow dev_fl;
+            if (get_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids)) {
+                uint8_t cnt = 0;
+                dev_fl.is_flow_replicated = true;
+                dev_fl.voltha_flow_id = voltha_flow_id;
+                dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // invalid
+                for (google::protobuf::Map<unsigned int, unsigned int>::const_iterator it=pbit_to_gemport.begin(); it!=pbit_to_gemport.end(); it++) {
+                    dev_fl.params[cnt].flow_id = flow_ids[cnt];
+                    dev_fl.params[cnt].pbit = it->first;
+                    dev_fl.params[cnt].gemport_id = it->second;
+
+                    ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+                    flow_id = dev_fl.params[cnt].flow_id;
+                    gemport_id = dev_fl.params[cnt].gemport_id;
+                    cl.set_o_pbits(dev_fl.params[cnt].pbit);
+                    Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                                        flow_type, alloc_id, network_intf_id, gemport_id, cl,
+                                        action, priority, cookie, group_id, tech_profile_id);
+                    if (st.error_code() != grpc::StatusCode::OK) {
+                        OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+                        // Remove any successfully replicated flows installed so far for the voltha_flow_id
+                        if (cnt > 0) {
+                            for (int8_t j = cnt-1; j >= 0; j--) {
+                                flow_id = dev_fl.params[j].flow_id;
+                                FlowRemove_(flow_id, flow_type);
+                            }
+                        }
+                        // Free up all the flow IDs on failure
+                        free_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids);
+                        return st;
+                    }
+                    cnt++;
+                }
+                // On successful flow replication update voltha-flow-id to device-flow map to cache
+                update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+            } else {
+                OPENOLT_LOG(ERROR, openolt_log_id, "could not allocate flow ids for replication voltha-flow-id=%lu\n", voltha_flow_id);
+                return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+            }
+        }
+    }
+
+    return Status::OK;
+}
+
+
 Status FlowAdd_(int32_t access_intf_id, int32_t onu_id, int32_t uni_id, uint32_t port_no,
                 uint32_t flow_id, const std::string flow_type,
                 int32_t alloc_id, int32_t network_intf_id,
@@ -1528,7 +1678,7 @@
 
     if (action.cmd().trap_to_host()) {
         Status resp = handle_acl_rule_install(onu_id, flow_id, flow_type, access_intf_id,
-                                              network_intf_id, gemport_id, classifier);
+                                              network_intf_id, classifier);
         return resp;
     }
 
@@ -1569,18 +1719,7 @@
             }
             bcmos_fastlock_unlock(&data_lock, 0);
         }
-        if (gemport_id >= 0 && access_intf_id >= 0) {
-            // Update the flow_to_acl_map. Note that since this is a datapath flow, acl_id is -1
-            // This info is needed during flow remove where we need to retrieve the gemport_id
-            // and access_intf id for the given flow id and flow direction.
-            // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
-            // reference counters for those ACL ID and GEMPORT ID.
-            acl_id_gem_id_intf_id ac_id_gm_id_if_id(-1, gemport_id, access_intf_id);
-            flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
-            bcmos_fastlock_lock(&data_lock);
-            flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
-            bcmos_fastlock_unlock(&data_lock, 0);
-        }
+
         if (priority_value >= 0) {
             BCMOLT_MSG_FIELD_SET(&cfg, priority, priority_value);
         }
@@ -1788,11 +1927,14 @@
 
     BCMOLT_MSG_FIELD_SET(&cfg, state, BCMOLT_FLOW_STATE_ENABLE);
 
+#ifndef SCALE_AND_PERF
     // BAL 3.1 supports statistics only for unicast flows.
     if (key.flow_type != BCMOLT_FLOW_TYPE_MULTICAST) {
         BCMOLT_MSG_FIELD_SET(&cfg, statistics, BCMOLT_CONTROL_STATE_ENABLE);
     }
+#endif // SCALE_AND_PERF
 
+#ifndef SCALE_AND_PERF
 #ifdef FLOW_CHECKER
     //Flow Checker, To avoid duplicate flow.
     if (flow_id_counters != 0) {
@@ -1838,7 +1980,8 @@
             }
         }
     }
-#endif
+#endif // FLOW_CHECKER
+#endif // SCALE_AND_PERF
 
     bcmos_errno err = bcmolt_cfg_set(dev_id, &cfg.hdr);
     if (err) {
@@ -1849,25 +1992,45 @@
         bcmos_fastlock_lock(&data_lock);
         flow_map[std::pair<int, int>(key.flow_id,key.flow_type)] = flow_map.size();
         flow_id_counters = flow_map.size();
-        if (gemport_id > 0 && access_intf_id >= 0) {
-            gem_id_intf_id gem_intf(gemport_id, access_intf_id);
-            if (gem_ref_cnt.count(gem_intf) > 0) {
-                // The gem port is already installed
-                // Increment the ref counter indicating number of flows referencing this gem port
-                gem_ref_cnt[gem_intf]++;
-                OPENOLT_LOG(DEBUG, openolt_log_id, "incremented gem_ref_cnt, gem_ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
-            } else {
-                // Initialize the refence count for the gemport.
-                gem_ref_cnt[gem_intf] = 1;
-                OPENOLT_LOG(DEBUG, openolt_log_id, "initialized gem_ref_cnt\n");
-            }
-        } else {
-            OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt flow_id=%d gemport_id=%d access_intf_id=%d\n", flow_id, gemport_id, access_intf_id);
-        }
-
         bcmos_fastlock_unlock(&data_lock, 0);
+
+    }
+    return Status::OK;
+}
+
+Status FlowRemoveWrapper_(const ::openolt::Flow* request) {
+    const std::string flow_type = request->flow_type();
+    uint64_t voltha_flow_id = request->flow_id();
+    Status st;
+
+    // If Voltha flow is not installed, return fail
+    if (! is_voltha_flow_installed(voltha_flow_id)) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "voltha_flow_id=%lu not found\n", voltha_flow_id);
+        return ::Status(grpc::StatusCode::NOT_FOUND, "voltha-flow-not-found");
     }
 
+    const device_flow *dev_fl = get_device_flow(voltha_flow_id);
+    if (dev_fl == NULL) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "device flow for voltha_flow_id=%lu in the cache is NULL\n", voltha_flow_id);
+        return ::Status(grpc::StatusCode::INTERNAL, "device-flow-null-in-cache");
+    }
+    if (dev_fl->is_flow_replicated) {
+        // Note: Here we are ignoring FlowRemove failures
+        for (int i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+            st = FlowRemove_(dev_fl->params[i].flow_id, flow_type);
+            if (st.error_code() == grpc::StatusCode::OK) {
+                free_flow_id(dev_fl->params[i].flow_id);
+            }
+        }
+    } else {
+        // Note: Here we are ignoring FlowRemove failures
+        st = FlowRemove_(dev_fl->params[0].flow_id, flow_type);
+        if (st.error_code() == grpc::StatusCode::OK) {
+            free_flow_id(dev_fl->params[0].flow_id);
+        }
+    }
+    // remove the flow from cache on voltha flow removal
+    remove_voltha_flow_from_cache(voltha_flow_id);
     return Status::OK;
 }
 
@@ -1898,13 +2061,13 @@
     int32_t intf_id = -1;
     int16_t acl_id = -1;
     if (flow_to_acl_map.count(fl_id_fl_dir) > 0) {
-        acl_id_gem_id_intf_id ac_id_gm_id_if_id = flow_to_acl_map[fl_id_fl_dir];
-        acl_id = std::get<0>(ac_id_gm_id_if_id);
-        gemport_id = std::get<1>(ac_id_gm_id_if_id);
-        intf_id = std::get<2>(ac_id_gm_id_if_id);
+
+        acl_id_intf_id ac_id_if_id = flow_to_acl_map[fl_id_fl_dir];
+        acl_id = std::get<0>(ac_id_if_id);
+        intf_id = std::get<1>(ac_id_if_id);
         // cleanup acl only if it is a valid acl. If not valid acl, it may be datapath flow.
         if (acl_id >= 0) {
-            Status resp = handle_acl_rule_cleanup(acl_id, gemport_id, intf_id, flow_type);
+            Status resp = handle_acl_rule_cleanup(acl_id, intf_id, flow_type);
             bcmos_fastlock_unlock(&data_lock, 0);
             if (resp.ok()) {
                 OPENOLT_LOG(INFO, openolt_log_id, "acl removed ok for flow_id = %u with acl_id = %d\n", flow_id, acl_id);
@@ -1948,8 +2111,6 @@
     }
     OPENOLT_LOG(INFO, openolt_log_id, "Flow %d, %s removed\n", flow_id, flow_type.c_str());
 
-    clear_gem_port(gemport_id, intf_id);
-
     flow_to_acl_map.erase(fl_id_fl_dir);
 
     bcmos_fastlock_unlock(&data_lock, 0);
@@ -2021,8 +2182,8 @@
 }
 
 bcmos_errno CreateSched(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
-                 uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
-                 tech_profile::SchedulingPolicy sched_policy, tech_profile::TrafficShapingInfo tf_sh_info,
+                 uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
+                 ::tech_profile::SchedulingPolicy sched_policy, ::tech_profile::TrafficShapingInfo tf_sh_info,
                  uint32_t tech_profile_id) {
 
     bcmos_errno err;
@@ -2191,13 +2352,14 @@
 port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
             return err;
         }
-
+#ifndef SCALE_AND_PERF
         err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_CREATE);
         if (err) {
             OPENOLT_LOG(ERROR, openolt_log_id, "Failed to create upstream bandwidth allocation, intf_id %d, onu_id %d, uni_id %d,\
 port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
             return err;
         }
+#endif
 
         OPENOLT_LOG(INFO, openolt_log_id, "create upstream bandwidth allocation success, intf_id %d, onu_id %d, uni_id %d,\
 port_no %u, alloc_id %d\n", intf_id, onu_id,uni_id,port_no,alloc_id);
@@ -2207,24 +2369,24 @@
     return BCM_ERR_OK;
 }
 
-Status CreateTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status CreateTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
     uint32_t intf_id = traffic_scheds->intf_id();
     uint32_t onu_id = traffic_scheds->onu_id();
     uint32_t uni_id = traffic_scheds->uni_id();
     uint32_t port_no = traffic_scheds->port_no();
     std::string direction;
     unsigned int alloc_id;
-    tech_profile::SchedulerConfig sched_config;
-    tech_profile::AdditionalBW additional_bw;
+    ::tech_profile::SchedulerConfig sched_config;
+    ::tech_profile::AdditionalBW additional_bw;
     uint32_t priority;
     uint32_t weight;
-    tech_profile::SchedulingPolicy sched_policy;
-    tech_profile::TrafficShapingInfo traffic_shaping_info;
+    ::tech_profile::SchedulingPolicy sched_policy;
+    ::tech_profile::TrafficShapingInfo traffic_shaping_info;
     uint32_t tech_profile_id;
     bcmos_errno err;
 
     for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
-        tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+        ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
 
         direction = GetDirection(traffic_sched.direction());
         if (direction.compare("direction-not-supported") == 0)
@@ -2273,6 +2435,7 @@
         err = get_pon_interface_status((bcmolt_interface)intf_id, &state, &los_status);
         if (err == BCM_ERR_OK) {
             if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_OFF) {
+#ifndef SCALE_AND_PERF
                 OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled and LoS status is OFF, waiting for alloc cfg clear response\n",
                     intf_id);
                 err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_DELETE);
@@ -2281,6 +2444,7 @@
                         direction.c_str(), intf_id, alloc_id, bcmos_strerror(err));
                     return err;
                 }
+#endif
             }
             else if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_ON) {
                 OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled but LoS status is ON, not waiting for alloc cfg clear response\n",
@@ -2322,7 +2486,7 @@
     return BCM_ERR_OK;
 }
 
-Status RemoveTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status RemoveTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
     uint32_t intf_id = traffic_scheds->intf_id();
     uint32_t onu_id = traffic_scheds->onu_id();
     uint32_t uni_id = traffic_scheds->uni_id();
@@ -2331,7 +2495,7 @@
     bcmos_errno err;
 
     for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
-        tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+        ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
 
         direction = GetDirection(traffic_sched.direction());
         if (direction.compare("direction-not-supported") == 0)
@@ -2483,12 +2647,20 @@
         return err;
     }
 
+    if (direction.compare(upstream) == 0) {
+        Status st = install_gem_port(access_intf_id, onu_id, gemport_id);
+        if (st.error_code() != grpc::StatusCode::ALREADY_EXISTS && st.error_code() != grpc::StatusCode::OK) {
+            OPENOLT_LOG(ERROR, openolt_log_id, "failed to created gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
+            return BCM_ERR_INTERNAL;
+        }
+    }
+
     OPENOLT_LOG(INFO, openolt_log_id, "Created tm_queue, direction %s, id %d, sched_id %d, tm_q_set_id %d, \
 intf_id %d, onu_id %d, uni_id %d, tech_profiled_id %d\n", direction.c_str(), key.id, key.sched_id, key.tm_q_set_id, access_intf_id, onu_id, uni_id, tech_profile_id);
     return BCM_ERR_OK;
 }
 
-Status CreateTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status CreateTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
     uint32_t intf_id = traffic_queues->intf_id();
     uint32_t onu_id = traffic_queues->onu_id();
     uint32_t uni_id = traffic_queues->uni_id();
@@ -2502,7 +2674,7 @@
         uint32_t queues_priority_q[traffic_queues->traffic_queues_size()] = {0};
         std::string queues_pbit_map[traffic_queues->traffic_queues_size()];
         for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
-            tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+            ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
 
             direction = GetDirection(traffic_queue.direction());
             if (direction.compare("direction-not-supported") == 0)
@@ -2532,7 +2704,7 @@
     }
 
     for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
-        tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+        ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
 
         direction = GetDirection(traffic_queue.direction());
         if (direction.compare("direction-not-supported") == 0)
@@ -2556,6 +2728,11 @@
     bcmos_errno err;
 
     if (direction == downstream) {
+        // Gem ports are HSI/VoIP/VoD traffic is bidirectional. Multicast gem-port are downstream only. Since the common direction between
+        // bidirectional traffic and multicast gem-port is "downstream" direction and we need to remove the gemport for a given gemport id only once,
+        // we remove the gem port when downstream direction queue is getting removed.
+        remove_gem_port(access_intf_id, gemport_id);
+
         if (is_tm_sched_id_present(access_intf_id, onu_id, uni_id, direction, tech_profile_id)) {
             key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction, tech_profile_id);
             key.id = queue_id_list[priority];
@@ -2596,7 +2773,7 @@
     return BCM_ERR_OK;
 }
 
-Status RemoveTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status RemoveTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
     uint32_t intf_id = traffic_queues->intf_id();
     uint32_t onu_id = traffic_queues->onu_id();
     uint32_t uni_id = traffic_queues->uni_id();
@@ -2608,7 +2785,7 @@
     bcmolt_egress_qos_type qos_type = get_qos_type(intf_id, onu_id, uni_id, traffic_queues->traffic_queues_size());
 
     for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
-        tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+        ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
 
         direction = GetDirection(traffic_queue.direction());
         if (direction.compare("direction-not-supported") == 0)
@@ -2638,7 +2815,7 @@
     return Status::OK;
 }
 
-Status PerformGroupOperation_(const openolt::Group *group_cfg) {
+Status PerformGroupOperation_(const ::openolt::Group *group_cfg) {
 
     bcmos_errno err;
     bcmolt_group_key key = {};
@@ -2748,17 +2925,17 @@
     }
 
     /* SET GROUP MEMBERS UPDATE COMMAND */
-    openolt::Group::GroupMembersCommand command = group_cfg->command();
+    ::openolt::Group::GroupMembersCommand command = group_cfg->command();
     switch(command) {
-        case openolt::Group::SET_MEMBERS :
+        case ::openolt::Group::SET_MEMBERS :
             grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_SET;
             OPENOLT_LOG(INFO, openolt_log_id, "Setting %d members for Group %d.\n", members.len, group_id);
             break;
-        case openolt::Group::ADD_MEMBERS :
+        case ::openolt::Group::ADD_MEMBERS :
             grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_ADD;
             OPENOLT_LOG(INFO, openolt_log_id, "Adding %d members to Group %d.\n", members.len, group_id);
             break;
-        case openolt::Group::REMOVE_MEMBERS :
+        case ::openolt::Group::REMOVE_MEMBERS :
             grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_REMOVE;
             OPENOLT_LOG(INFO, openolt_log_id, "Removing %d members from Group %d.\n", members.len, group_id);
             break;
@@ -2771,26 +2948,26 @@
     // SET MEMBERS LIST
     for (int i = 0; i < members.len; i++) {
 
-        if (command ==  openolt::Group::REMOVE_MEMBERS) {
+        if (command ==  ::openolt::Group::REMOVE_MEMBERS) {
             OPENOLT_LOG(INFO, openolt_log_id, "Removing group member %d from group %d\n",i,key.id);
         } else {
             OPENOLT_LOG(INFO, openolt_log_id, "Adding group member %d to group %d\n",i,key.id);
         }
 
-        openolt::GroupMember *member = (openolt::GroupMember *) &group_cfg->members()[i];
+        ::openolt::GroupMember *member = (::openolt::GroupMember *) &group_cfg->members()[i];
 
         // Set member interface type
-        openolt::GroupMember::InterfaceType if_type = member->interface_type();
+        ::openolt::GroupMember::InterfaceType if_type = member->interface_type();
         switch(if_type){
-            case openolt::GroupMember::PON :
+            case ::openolt::GroupMember::PON :
                 BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_PON);
                 OPENOLT_LOG(INFO, openolt_log_id, "Interface type PON is assigned to GroupMember %d\n",i);
                 break;
-            case openolt::GroupMember::EPON_1G_PATH :
+            case ::openolt::GroupMember::EPON_1G_PATH :
                 BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_1_G);
                 OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_1G is assigned to GroupMember %d\n",i);
                 break;
-            case openolt::GroupMember::EPON_10G_PATH :
+            case ::openolt::GroupMember::EPON_10G_PATH :
                 BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_10_G);
                 OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_10G is assigned to GroupMember %d\n",i);
                 break;
@@ -2907,7 +3084,7 @@
     return Status::OK;
 }
 
-Status GetLogicalOnuDistanceZero_(uint32_t intf_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistanceZero_(uint32_t intf_id, ::openolt::OnuLogicalDistance* response) {
     bcmos_errno err = BCM_ERR_OK;
     uint32_t mld = 0;
     double LD0;
@@ -2925,7 +3102,7 @@
     return Status::OK;
 }
 
-Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, ::openolt::OnuLogicalDistance* response) {
     bcmos_errno err = BCM_ERR_OK;
     bcmolt_itu_onu_params itu = {};
     bcmolt_onu_cfg onu_cfg;