VOL-3419: Replicate voltha flows in openolt agent
- Use the flow_id, symmetric_flow_id, replication_flag and pbit_to_gemport_map
  coming in Flow proto messge to replicate the flow as needed
- Use the CreateQueues and RemoveQueues messages to setup and remove gem_ports
- Use latest gRPC version 1.31.1 which allows fine tuning of gRPC threadpools
  which allows for greating performance.
- Performance numbers when tested with openolt-scale-tester has been better with
  using latest gRPC and threadpool tuning when compared to earlier. It is to be
  noted that the performance is better even with openolt-agent replicating the flow
  now.
- Scale tests with 512 subscribers have been successfull with BAL3.4.7.5 version
- Use openolt proto version 4.0.0
- Use openolt-test (for unit test) image version 2.0.1 (which bundles latest gRPC version 1.31.1)
- These changes are NOT backward compatible and openolt-agent will have a major
  version bump to 3.0.0

Change-Id: I715c804bdf342e60d08cab6c59e1c21b8c5ac1f4
diff --git a/agent/src/core_utils.cc b/agent/src/core_utils.cc
index 98ef6a2..6e1b17c 100644
--- a/agent/src/core_utils.cc
+++ b/agent/src/core_utils.cc
@@ -99,15 +99,17 @@
     sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
     int sched_id = -1;
 
+    bcmos_fastlock_lock(&tm_sched_bitset_lock);
+
     std::map<sched_map_key_tuple, int>::const_iterator it = sched_map.find(key);
     if (it != sched_map.end()) {
         sched_id = it->second;
     }
     if (sched_id != -1) {
+        bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
         return sched_id;
     }
 
-    bcmos_fastlock_lock(&data_lock);
     // Complexity of O(n). Is there better way that can avoid linear search?
     for (sched_id = 0; sched_id < MAX_TM_SCHED_ID; sched_id++) {
         if (tm_sched_bitset[sched_id] == 0) {
@@ -115,14 +117,13 @@
             break;
         }
     }
-    bcmos_fastlock_unlock(&data_lock, 0);
 
     if (sched_id < MAX_TM_SCHED_ID) {
-        bcmos_fastlock_lock(&data_lock);
         sched_map[key] = sched_id;
-        bcmos_fastlock_unlock(&data_lock, 0);
+        bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
         return sched_id;
     } else {
+        bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
         return -1;
     }
 }
@@ -140,13 +141,13 @@
 void free_tm_sched_id(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
     sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
     std::map<sched_map_key_tuple, int>::const_iterator it;
-    bcmos_fastlock_lock(&data_lock);
+    bcmos_fastlock_lock(&tm_sched_bitset_lock);
     it = sched_map.find(key);
     if (it != sched_map.end()) {
         tm_sched_bitset[it->second] = 0;
         sched_map.erase(it);
     }
-    bcmos_fastlock_unlock(&data_lock, 0);
+    bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
 }
 
 bool is_tm_sched_id_present(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
@@ -248,10 +249,10 @@
 */
 void update_sched_qmp_id_map(uint32_t sched_id,uint32_t pon_intf_id, uint32_t onu_id, \
                              uint32_t uni_id, int tm_qmp_id) {
-   bcmos_fastlock_lock(&data_lock);
+   bcmos_fastlock_lock(&tm_qmp_bitset_lock);
    sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
    sched_qmp_id_map.insert(make_pair(key, tm_qmp_id));
-   bcmos_fastlock_unlock(&data_lock, 0);
+   bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
 }
 
 /**
@@ -292,7 +293,7 @@
                   std::vector<uint32_t> tmq_map_profile) {
     int tm_qmp_id;
 
-    bcmos_fastlock_lock(&data_lock);
+    bcmos_fastlock_lock(&tm_qmp_bitset_lock);
     /* Complexity of O(n). Is there better way that can avoid linear search? */
     for (tm_qmp_id = 0; tm_qmp_id < MAX_TM_QMP_ID; tm_qmp_id++) {
         if (tm_qmp_bitset[tm_qmp_id] == 0) {
@@ -300,15 +301,14 @@
             break;
         }
     }
-    bcmos_fastlock_unlock(&data_lock, 0);
 
     if (tm_qmp_id < MAX_TM_QMP_ID) {
-        bcmos_fastlock_lock(&data_lock);
         qmp_id_to_qmp_map.insert(make_pair(tm_qmp_id, tmq_map_profile));
-        bcmos_fastlock_unlock(&data_lock, 0);
+        bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
         update_sched_qmp_id_map(sched_id, pon_intf_id, onu_id, uni_id, tm_qmp_id);
         return tm_qmp_id;
     } else {
+        bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
         return -1;
     }
 }
@@ -329,11 +329,10 @@
     bool result;
     sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
     std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it = sched_qmp_id_map.find(key);
-    bcmos_fastlock_lock(&data_lock);
+    bcmos_fastlock_lock(&tm_qmp_bitset_lock);
     if (it != sched_qmp_id_map.end()) {
         sched_qmp_id_map.erase(it);
     }
-    bcmos_fastlock_unlock(&data_lock, 0);
 
     uint32_t tm_qmp_ref_count = 0;
     std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it2 = sched_qmp_id_map.begin();
@@ -347,10 +346,8 @@
     if (tm_qmp_ref_count == 0) {
         std::map<int, std::vector < uint32_t > >::const_iterator it3 = qmp_id_to_qmp_map.find(tm_qmp_id);
         if (it3 != qmp_id_to_qmp_map.end()) {
-            bcmos_fastlock_lock(&data_lock);
             tm_qmp_bitset[tm_qmp_id] = 0;
             qmp_id_to_qmp_map.erase(it3);
-            bcmos_fastlock_unlock(&data_lock, 0);
             OPENOLT_LOG(INFO, openolt_log_id, "Reference count for tm qmp profile id %d is : %d. So clearing it\n", \
                         tm_qmp_id, tm_qmp_ref_count);
             result = true;
@@ -360,6 +357,8 @@
                     tm_qmp_id, tm_qmp_ref_count);
         result = false;
     }
+    bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
+
     return result;
 }
 
@@ -368,6 +367,7 @@
 int get_acl_id() {
     int acl_id;
 
+    bcmos_fastlock_lock(&acl_id_bitset_lock);
     /* Complexity of O(n). Is there better way that can avoid linear search? */
     for (acl_id = 0; acl_id < MAX_ACL_ID; acl_id++) {
         if (acl_id_bitset[acl_id] == 0) {
@@ -375,6 +375,8 @@
             break;
         }
     }
+    bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+
     if (acl_id < MAX_ACL_ID) {
         return acl_id ;
     } else {
@@ -385,9 +387,90 @@
 /* ACL ID is a shared resource, caller of this function has to ensure atomicity using locks
    Frees up the ACL ID. */
 void free_acl_id (int acl_id) {
+    bcmos_fastlock_lock(&acl_id_bitset_lock);
     if (acl_id < MAX_ACL_ID) {
         acl_id_bitset[acl_id] = 0;
     }
+    bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+}
+
+/*  Gets a free Flow ID if available, else INVALID_FLOW_ID */
+uint16_t get_flow_id() {
+    uint16_t flow_id;
+
+    bcmos_fastlock_lock(&flow_id_bitset_lock);
+    /* Complexity of O(n). Is there better way that can avoid linear search? */
+    // start flow_id from 1 as 0 is invalid
+    for (flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END; flow_id++) {
+        if (flow_id_bitset[flow_id] == 0) {
+            flow_id_bitset[flow_id] = 1;
+            break;
+        }
+    }
+    bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+
+    if (flow_id <= MAX_FLOW_ID) {
+        return flow_id ;
+    } else {
+        return INVALID_FLOW_ID;
+    }
+}
+
+/*  Gets requested number of Flow IDs.
+    'num_of_flow_ids' is number of flow_ids requested. This cannot be more than NUMBER_OF_PBITS
+    'flow_ids' is pointer to array of size NUMBER_OF_PBITS
+    If the operation is successful, returns true else false
+    The operation is successful if we can allocate fully the number of flow_ids requested.
+ */
+bool get_flow_ids(int num_of_flow_ids, uint16_t *flow_ids) {
+    if (num_of_flow_ids > NUMBER_OF_PBITS) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "requested number of flow_ids is more than 8\n");
+        return false;
+    }
+    int cnt = 0;
+
+    bcmos_fastlock_lock(&flow_id_bitset_lock);
+    /* Complexity of O(n). Is there better way that can avoid linear search? */
+    // start flow_id from 1 as 0 is invalid
+    for (uint16_t flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END && cnt < num_of_flow_ids; flow_id++) {
+        if (flow_id_bitset[flow_id] == 0) {
+            flow_id_bitset[flow_id] = 1;
+            flow_ids[cnt] = flow_id;
+            cnt++;
+        }
+    }
+    bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+    // If we could not allocate the requested number of flow_ids free the allocated flow_ids
+    // and return false
+    if (cnt != num_of_flow_ids) {
+        OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated the rquested number of flows ids. requested=%d, allocated=%d", num_of_flow_ids, cnt);
+        if (cnt > 0) {
+            for(int i=0; i < cnt; i++) {
+                free_flow_id(flow_ids[i]);
+            }
+        }
+        return false;
+    }
+    return true;
+}
+
+/*  Frees up the FLOW ID. */
+void free_flow_id (uint16_t flow_id) {
+    bcmos_fastlock_lock(&flow_id_bitset_lock);
+    if (flow_id <= MAX_FLOW_ID) {
+        flow_id_bitset[flow_id] = 0;
+    }
+    bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+}
+
+void free_flow_ids(uint8_t num_flows, uint16_t *flow_ids) {
+    for (uint8_t i = 0; i < num_flows; i++) {
+        bcmos_fastlock_lock(&flow_id_bitset_lock);
+        if (flow_ids[i] <= MAX_FLOW_ID) {
+            flow_id_bitset[flow_ids[i]] = 0;
+        }
+        bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+    }
 }
 
 /**
@@ -597,8 +680,8 @@
 
 Status pushOltOperInd(uint32_t intf_id, const char *type, const char *state)
 {
-    openolt::Indication ind;
-    openolt::IntfOperIndication* intf_oper_ind = new openolt::IntfOperIndication;
+    ::openolt::Indication ind;
+    ::openolt::IntfOperIndication* intf_oper_ind = new ::openolt::IntfOperIndication;
 
     intf_oper_ind->set_type(type);
     intf_oper_ind->set_intf_id(intf_id);
@@ -835,7 +918,7 @@
 
     err = bcmolt_cfg_set(dev_id, &cfg.hdr);
     if(err != BCM_ERR_OK) {
-        OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d\n", gemport_id);
+        OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d err_text=%s\n", gemport_id, cfg.hdr.hdr.err_text);
         return bcm_to_grpc_err(err, "Access_Control set ITU PON Gem port failed");
     }
 
@@ -930,7 +1013,6 @@
     bcmolt_classifier c_val = { };
     // hardcode the action for now.
     bcmolt_access_control_fwd_action_type action_type = BCMOLT_ACCESS_CONTROL_FWD_ACTION_TYPE_TRAP_TO_HOST;
-
     int acl_id = get_acl_id();
     if (acl_id < 0) {
         OPENOLT_LOG(ERROR, openolt_log_id, "exhausted acl_id for eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d\n",
@@ -1051,9 +1133,9 @@
         } else key->dst_port = -1;
 }
 
-Status handle_acl_rule_install(int32_t onu_id, uint32_t flow_id,
+Status handle_acl_rule_install(int32_t onu_id, uint64_t flow_id,
                                const std::string flow_type, int32_t access_intf_id,
-                               int32_t network_intf_id, int32_t gemport_id,
+                               int32_t network_intf_id,
                                const ::openolt::Classifier& classifier) {
     int acl_id;
     int32_t intf_id = flow_type.compare(upstream) == 0? access_intf_id: network_intf_id;
@@ -1064,19 +1146,19 @@
 
     // few map keys we are going to use later.
     flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
-    gem_id_intf_id gem_intf(gemport_id, access_intf_id);
+
     acl_classifier_key acl_key;
     formulate_acl_classifier_key(&acl_key, classifier);
     const acl_classifier_key acl_key_const = {.ether_type=acl_key.ether_type, .ip_proto=acl_key.ip_proto,
         .src_port=acl_key.src_port, .dst_port=acl_key.dst_port};
-
     bcmos_fastlock_lock(&data_lock);
 
     // Check if the acl is already installed
     if (acl_classifier_to_acl_id_map.count(acl_key_const) > 0) {
         // retreive the acl_id
         acl_id = acl_classifier_to_acl_id_map[acl_key_const];
-        acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
+
+
         if (flow_to_acl_map.count(fl_id_fl_dir)) {
             // coult happen if same trap flow is received again
             OPENOLT_LOG(INFO, openolt_log_id, "flow and related acl already handled, nothing more to do\n");
@@ -1084,7 +1166,7 @@
             return Status::OK;
         }
 
-        OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
+        OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
                 flow_id, acl_key.ether_type, acl_key.ip_proto, acl_key.src_port, acl_key.dst_port, acl_id);
 
         // The acl_ref_cnt is needed to know how many flows refer an ACL.
@@ -1101,7 +1183,7 @@
     } else {
         resp = install_acl(acl_key_const);
         if (!resp.ok()) {
-            OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
+            OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
                     flow_id, acl_key_const.ether_type, acl_key_const.ip_proto, acl_key_const.src_port, acl_key_const.dst_port);
             bcmos_fastlock_unlock(&data_lock, 0);
             return resp;
@@ -1112,7 +1194,7 @@
         // Initialize the acl reference count
         acl_ref_cnt[acl_id] = 1;
 
-        OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%u with acl_id=%d\n", flow_id, acl_id);
+        OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%lu with acl_id=%d\n", flow_id, acl_id);
     }
 
     // Register the interface for the given acl
@@ -1128,70 +1210,20 @@
             OPENOLT_LOG(ERROR, openolt_log_id, "failed to update acl interfaces intf_id=%d, intf_type=%s, acl_id=%d", intf_id, intf_type.c_str(), acl_id);
             // TODO: Ideally we should return error from hear and clean up other other stateful
             // counters we creaed earlier. Will leave it out for now.
-        } 
+        }
         intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type] = 1;
     }
 
-
-    // Install the gem port if needed.
-    if (gemport_id > 0 && access_intf_id >= 0) {
-        if (gem_ref_cnt.count(gem_intf) > 0) {
-            // The gem port is already installed
-            // Increment the ref counter indicating number of flows referencing this gem port
-            gem_ref_cnt[gem_intf]++;
-            OPENOLT_LOG(DEBUG, openolt_log_id, "increment gem_ref_cnt in acl handler, ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
-
-        } else {
-            // We should ideally never land here. The gem port should have been created the
-            // first time ACL was installed.
-            // Install the gem port
-            Status resp = install_gem_port(access_intf_id, onu_id, gemport_id);
-            if (!resp.ok()) {
-                // TODO: We might need to reverse all previous data, but leave it out for now.
-                OPENOLT_LOG(ERROR, openolt_log_id, "failed to install the gemport=%d for acl_id=%d, intf_id=%d\n", gemport_id, acl_id, access_intf_id);
-                bcmos_fastlock_unlock(&data_lock, 0);
-                return resp;
-            }
-            // Initialize the refence count for the gemport.
-            gem_ref_cnt[gem_intf] = 1;
-            OPENOLT_LOG(DEBUG, openolt_log_id, "intialized gem ref count in acl handler\n");
-        }
-    } else {
-        OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt in acl handler flow_id=%d, gemport_id=%d, intf_id=%d\n", flow_id, gemport_id, access_intf_id);
-    }
-
-    // Update the flow_to_acl_map
-    // This info is needed during flow remove. We need to which ACL ID and GEM PORT ID
-    // the flow was referring to.
-    // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
-    // reference counters for those ACL ID and GEMPORT ID.
-    acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
-    flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
+    acl_id_intf_id ac_id_if_id(acl_id, intf_id);
+    flow_to_acl_map[fl_id_fl_dir] = ac_id_if_id;
 
     bcmos_fastlock_unlock(&data_lock, 0);
 
     return Status::OK;
 }
 
-void clear_gem_port(int gemport_id, int access_intf_id) {
-    gem_id_intf_id gem_intf(gemport_id, access_intf_id);
-    if (gemport_id > 0 && access_intf_id >= 0 && gem_ref_cnt.count(gem_intf) > 0) {
-        OPENOLT_LOG(DEBUG, openolt_log_id, "decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
-        gem_ref_cnt[gem_intf]--;
-        if (gem_ref_cnt[gem_intf] == 0) {
-            // For datapath flow this may not be necessary (to be verified)
-            remove_gem_port(access_intf_id, gemport_id);
-            gem_ref_cnt.erase(gem_intf);
-            OPENOLT_LOG(DEBUG, openolt_log_id, "removing gem_ref_cnt entry gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
-        } else {
-            OPENOLT_LOG(DEBUG, openolt_log_id, "gem_ref_cnt  not zero yet gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
-        }
-    } else {
-        OPENOLT_LOG(DEBUG, openolt_log_id, "not decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
-    }
-}
-
-Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+//Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+Status handle_acl_rule_cleanup(int16_t acl_id, int32_t intf_id, const std::string flow_type) {
     const std::string intf_type= flow_type.compare(upstream) == 0 ? "pon": "nni";
     acl_id_intf_id_intf_type ac_id_inf_id_inf_type(acl_id, intf_id, intf_type);
     intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type]--;
@@ -1219,8 +1251,6 @@
         }
     }
 
-    clear_gem_port(gemport_id, intf_id);
-
     return Status::OK;
 }
 
@@ -1369,3 +1399,52 @@
 
     return mac_address;
 }
+
+void update_voltha_flow_to_cache(uint64_t voltha_flow_id, device_flow dev_flow) {
+    OPENOLT_LOG(DEBUG, openolt_log_id, "updating voltha flow=%lu to cache\n", voltha_flow_id)
+    bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+    voltha_flow_to_device_flow[voltha_flow_id] = dev_flow;
+    bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+void remove_voltha_flow_from_cache(uint64_t voltha_flow_id) {
+    bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+    std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+    if (it != voltha_flow_to_device_flow.end()) {
+        voltha_flow_to_device_flow.erase(it);
+    }
+    bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+bool is_voltha_flow_installed(uint64_t voltha_flow_id ) {
+    int count;
+    bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+    count = voltha_flow_to_device_flow.count(voltha_flow_id);
+    bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+    return count > 0 ? true : false;
+}
+
+const device_flow_params* get_device_flow_params(uint64_t voltha_flow_id) {
+    bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+    std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+    if (it != voltha_flow_to_device_flow.end()) {
+        bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+        return it->second.params;
+    }
+    bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+    return NULL;
+}
+
+const device_flow* get_device_flow(uint64_t voltha_flow_id) {
+    bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+    std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+    if (it != voltha_flow_to_device_flow.end()) {
+        bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+        return &it->second;
+    }
+    bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+    return NULL;
+}