VOL-3419: Replicate voltha flows in openolt agent
- Use the flow_id, symmetric_flow_id, replication_flag and pbit_to_gemport_map
coming in Flow proto messge to replicate the flow as needed
- Use the CreateQueues and RemoveQueues messages to setup and remove gem_ports
- Use latest gRPC version 1.31.1 which allows fine tuning of gRPC threadpools
which allows for greating performance.
- Performance numbers when tested with openolt-scale-tester has been better with
using latest gRPC and threadpool tuning when compared to earlier. It is to be
noted that the performance is better even with openolt-agent replicating the flow
now.
- Scale tests with 512 subscribers have been successfull with BAL3.4.7.5 version
- Use openolt proto version 4.0.0
- Use openolt-test (for unit test) image version 2.0.1 (which bundles latest gRPC version 1.31.1)
- These changes are NOT backward compatible and openolt-agent will have a major
version bump to 3.0.0
Change-Id: I715c804bdf342e60d08cab6c59e1c21b8c5ac1f4
diff --git a/agent/src/core_utils.cc b/agent/src/core_utils.cc
index 98ef6a2..6e1b17c 100644
--- a/agent/src/core_utils.cc
+++ b/agent/src/core_utils.cc
@@ -99,15 +99,17 @@
sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
int sched_id = -1;
+ bcmos_fastlock_lock(&tm_sched_bitset_lock);
+
std::map<sched_map_key_tuple, int>::const_iterator it = sched_map.find(key);
if (it != sched_map.end()) {
sched_id = it->second;
}
if (sched_id != -1) {
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return sched_id;
}
- bcmos_fastlock_lock(&data_lock);
// Complexity of O(n). Is there better way that can avoid linear search?
for (sched_id = 0; sched_id < MAX_TM_SCHED_ID; sched_id++) {
if (tm_sched_bitset[sched_id] == 0) {
@@ -115,14 +117,13 @@
break;
}
}
- bcmos_fastlock_unlock(&data_lock, 0);
if (sched_id < MAX_TM_SCHED_ID) {
- bcmos_fastlock_lock(&data_lock);
sched_map[key] = sched_id;
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return sched_id;
} else {
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return -1;
}
}
@@ -140,13 +141,13 @@
void free_tm_sched_id(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
std::map<sched_map_key_tuple, int>::const_iterator it;
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_sched_bitset_lock);
it = sched_map.find(key);
if (it != sched_map.end()) {
tm_sched_bitset[it->second] = 0;
sched_map.erase(it);
}
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
}
bool is_tm_sched_id_present(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
@@ -248,10 +249,10 @@
*/
void update_sched_qmp_id_map(uint32_t sched_id,uint32_t pon_intf_id, uint32_t onu_id, \
uint32_t uni_id, int tm_qmp_id) {
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
sched_qmp_id_map.insert(make_pair(key, tm_qmp_id));
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
}
/**
@@ -292,7 +293,7 @@
std::vector<uint32_t> tmq_map_profile) {
int tm_qmp_id;
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
/* Complexity of O(n). Is there better way that can avoid linear search? */
for (tm_qmp_id = 0; tm_qmp_id < MAX_TM_QMP_ID; tm_qmp_id++) {
if (tm_qmp_bitset[tm_qmp_id] == 0) {
@@ -300,15 +301,14 @@
break;
}
}
- bcmos_fastlock_unlock(&data_lock, 0);
if (tm_qmp_id < MAX_TM_QMP_ID) {
- bcmos_fastlock_lock(&data_lock);
qmp_id_to_qmp_map.insert(make_pair(tm_qmp_id, tmq_map_profile));
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
update_sched_qmp_id_map(sched_id, pon_intf_id, onu_id, uni_id, tm_qmp_id);
return tm_qmp_id;
} else {
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
return -1;
}
}
@@ -329,11 +329,10 @@
bool result;
sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it = sched_qmp_id_map.find(key);
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
if (it != sched_qmp_id_map.end()) {
sched_qmp_id_map.erase(it);
}
- bcmos_fastlock_unlock(&data_lock, 0);
uint32_t tm_qmp_ref_count = 0;
std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it2 = sched_qmp_id_map.begin();
@@ -347,10 +346,8 @@
if (tm_qmp_ref_count == 0) {
std::map<int, std::vector < uint32_t > >::const_iterator it3 = qmp_id_to_qmp_map.find(tm_qmp_id);
if (it3 != qmp_id_to_qmp_map.end()) {
- bcmos_fastlock_lock(&data_lock);
tm_qmp_bitset[tm_qmp_id] = 0;
qmp_id_to_qmp_map.erase(it3);
- bcmos_fastlock_unlock(&data_lock, 0);
OPENOLT_LOG(INFO, openolt_log_id, "Reference count for tm qmp profile id %d is : %d. So clearing it\n", \
tm_qmp_id, tm_qmp_ref_count);
result = true;
@@ -360,6 +357,8 @@
tm_qmp_id, tm_qmp_ref_count);
result = false;
}
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
+
return result;
}
@@ -368,6 +367,7 @@
int get_acl_id() {
int acl_id;
+ bcmos_fastlock_lock(&acl_id_bitset_lock);
/* Complexity of O(n). Is there better way that can avoid linear search? */
for (acl_id = 0; acl_id < MAX_ACL_ID; acl_id++) {
if (acl_id_bitset[acl_id] == 0) {
@@ -375,6 +375,8 @@
break;
}
}
+ bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+
if (acl_id < MAX_ACL_ID) {
return acl_id ;
} else {
@@ -385,9 +387,90 @@
/* ACL ID is a shared resource, caller of this function has to ensure atomicity using locks
Frees up the ACL ID. */
void free_acl_id (int acl_id) {
+ bcmos_fastlock_lock(&acl_id_bitset_lock);
if (acl_id < MAX_ACL_ID) {
acl_id_bitset[acl_id] = 0;
}
+ bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+}
+
+/* Gets a free Flow ID if available, else INVALID_FLOW_ID */
+uint16_t get_flow_id() {
+ uint16_t flow_id;
+
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ /* Complexity of O(n). Is there better way that can avoid linear search? */
+ // start flow_id from 1 as 0 is invalid
+ for (flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END; flow_id++) {
+ if (flow_id_bitset[flow_id] == 0) {
+ flow_id_bitset[flow_id] = 1;
+ break;
+ }
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+
+ if (flow_id <= MAX_FLOW_ID) {
+ return flow_id ;
+ } else {
+ return INVALID_FLOW_ID;
+ }
+}
+
+/* Gets requested number of Flow IDs.
+ 'num_of_flow_ids' is number of flow_ids requested. This cannot be more than NUMBER_OF_PBITS
+ 'flow_ids' is pointer to array of size NUMBER_OF_PBITS
+ If the operation is successful, returns true else false
+ The operation is successful if we can allocate fully the number of flow_ids requested.
+ */
+bool get_flow_ids(int num_of_flow_ids, uint16_t *flow_ids) {
+ if (num_of_flow_ids > NUMBER_OF_PBITS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "requested number of flow_ids is more than 8\n");
+ return false;
+ }
+ int cnt = 0;
+
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ /* Complexity of O(n). Is there better way that can avoid linear search? */
+ // start flow_id from 1 as 0 is invalid
+ for (uint16_t flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END && cnt < num_of_flow_ids; flow_id++) {
+ if (flow_id_bitset[flow_id] == 0) {
+ flow_id_bitset[flow_id] = 1;
+ flow_ids[cnt] = flow_id;
+ cnt++;
+ }
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+ // If we could not allocate the requested number of flow_ids free the allocated flow_ids
+ // and return false
+ if (cnt != num_of_flow_ids) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated the rquested number of flows ids. requested=%d, allocated=%d", num_of_flow_ids, cnt);
+ if (cnt > 0) {
+ for(int i=0; i < cnt; i++) {
+ free_flow_id(flow_ids[i]);
+ }
+ }
+ return false;
+ }
+ return true;
+}
+
+/* Frees up the FLOW ID. */
+void free_flow_id (uint16_t flow_id) {
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ if (flow_id <= MAX_FLOW_ID) {
+ flow_id_bitset[flow_id] = 0;
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+}
+
+void free_flow_ids(uint8_t num_flows, uint16_t *flow_ids) {
+ for (uint8_t i = 0; i < num_flows; i++) {
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ if (flow_ids[i] <= MAX_FLOW_ID) {
+ flow_id_bitset[flow_ids[i]] = 0;
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+ }
}
/**
@@ -597,8 +680,8 @@
Status pushOltOperInd(uint32_t intf_id, const char *type, const char *state)
{
- openolt::Indication ind;
- openolt::IntfOperIndication* intf_oper_ind = new openolt::IntfOperIndication;
+ ::openolt::Indication ind;
+ ::openolt::IntfOperIndication* intf_oper_ind = new ::openolt::IntfOperIndication;
intf_oper_ind->set_type(type);
intf_oper_ind->set_intf_id(intf_id);
@@ -835,7 +918,7 @@
err = bcmolt_cfg_set(dev_id, &cfg.hdr);
if(err != BCM_ERR_OK) {
- OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d\n", gemport_id);
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d err_text=%s\n", gemport_id, cfg.hdr.hdr.err_text);
return bcm_to_grpc_err(err, "Access_Control set ITU PON Gem port failed");
}
@@ -930,7 +1013,6 @@
bcmolt_classifier c_val = { };
// hardcode the action for now.
bcmolt_access_control_fwd_action_type action_type = BCMOLT_ACCESS_CONTROL_FWD_ACTION_TYPE_TRAP_TO_HOST;
-
int acl_id = get_acl_id();
if (acl_id < 0) {
OPENOLT_LOG(ERROR, openolt_log_id, "exhausted acl_id for eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d\n",
@@ -1051,9 +1133,9 @@
} else key->dst_port = -1;
}
-Status handle_acl_rule_install(int32_t onu_id, uint32_t flow_id,
+Status handle_acl_rule_install(int32_t onu_id, uint64_t flow_id,
const std::string flow_type, int32_t access_intf_id,
- int32_t network_intf_id, int32_t gemport_id,
+ int32_t network_intf_id,
const ::openolt::Classifier& classifier) {
int acl_id;
int32_t intf_id = flow_type.compare(upstream) == 0? access_intf_id: network_intf_id;
@@ -1064,19 +1146,19 @@
// few map keys we are going to use later.
flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
+
acl_classifier_key acl_key;
formulate_acl_classifier_key(&acl_key, classifier);
const acl_classifier_key acl_key_const = {.ether_type=acl_key.ether_type, .ip_proto=acl_key.ip_proto,
.src_port=acl_key.src_port, .dst_port=acl_key.dst_port};
-
bcmos_fastlock_lock(&data_lock);
// Check if the acl is already installed
if (acl_classifier_to_acl_id_map.count(acl_key_const) > 0) {
// retreive the acl_id
acl_id = acl_classifier_to_acl_id_map[acl_key_const];
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
+
+
if (flow_to_acl_map.count(fl_id_fl_dir)) {
// coult happen if same trap flow is received again
OPENOLT_LOG(INFO, openolt_log_id, "flow and related acl already handled, nothing more to do\n");
@@ -1084,7 +1166,7 @@
return Status::OK;
}
- OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
+ OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
flow_id, acl_key.ether_type, acl_key.ip_proto, acl_key.src_port, acl_key.dst_port, acl_id);
// The acl_ref_cnt is needed to know how many flows refer an ACL.
@@ -1101,7 +1183,7 @@
} else {
resp = install_acl(acl_key_const);
if (!resp.ok()) {
- OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
+ OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
flow_id, acl_key_const.ether_type, acl_key_const.ip_proto, acl_key_const.src_port, acl_key_const.dst_port);
bcmos_fastlock_unlock(&data_lock, 0);
return resp;
@@ -1112,7 +1194,7 @@
// Initialize the acl reference count
acl_ref_cnt[acl_id] = 1;
- OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%u with acl_id=%d\n", flow_id, acl_id);
+ OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%lu with acl_id=%d\n", flow_id, acl_id);
}
// Register the interface for the given acl
@@ -1128,70 +1210,20 @@
OPENOLT_LOG(ERROR, openolt_log_id, "failed to update acl interfaces intf_id=%d, intf_type=%s, acl_id=%d", intf_id, intf_type.c_str(), acl_id);
// TODO: Ideally we should return error from hear and clean up other other stateful
// counters we creaed earlier. Will leave it out for now.
- }
+ }
intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type] = 1;
}
-
- // Install the gem port if needed.
- if (gemport_id > 0 && access_intf_id >= 0) {
- if (gem_ref_cnt.count(gem_intf) > 0) {
- // The gem port is already installed
- // Increment the ref counter indicating number of flows referencing this gem port
- gem_ref_cnt[gem_intf]++;
- OPENOLT_LOG(DEBUG, openolt_log_id, "increment gem_ref_cnt in acl handler, ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
-
- } else {
- // We should ideally never land here. The gem port should have been created the
- // first time ACL was installed.
- // Install the gem port
- Status resp = install_gem_port(access_intf_id, onu_id, gemport_id);
- if (!resp.ok()) {
- // TODO: We might need to reverse all previous data, but leave it out for now.
- OPENOLT_LOG(ERROR, openolt_log_id, "failed to install the gemport=%d for acl_id=%d, intf_id=%d\n", gemport_id, acl_id, access_intf_id);
- bcmos_fastlock_unlock(&data_lock, 0);
- return resp;
- }
- // Initialize the refence count for the gemport.
- gem_ref_cnt[gem_intf] = 1;
- OPENOLT_LOG(DEBUG, openolt_log_id, "intialized gem ref count in acl handler\n");
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt in acl handler flow_id=%d, gemport_id=%d, intf_id=%d\n", flow_id, gemport_id, access_intf_id);
- }
-
- // Update the flow_to_acl_map
- // This info is needed during flow remove. We need to which ACL ID and GEM PORT ID
- // the flow was referring to.
- // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
- // reference counters for those ACL ID and GEMPORT ID.
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
- flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
+ acl_id_intf_id ac_id_if_id(acl_id, intf_id);
+ flow_to_acl_map[fl_id_fl_dir] = ac_id_if_id;
bcmos_fastlock_unlock(&data_lock, 0);
return Status::OK;
}
-void clear_gem_port(int gemport_id, int access_intf_id) {
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
- if (gemport_id > 0 && access_intf_id >= 0 && gem_ref_cnt.count(gem_intf) > 0) {
- OPENOLT_LOG(DEBUG, openolt_log_id, "decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- gem_ref_cnt[gem_intf]--;
- if (gem_ref_cnt[gem_intf] == 0) {
- // For datapath flow this may not be necessary (to be verified)
- remove_gem_port(access_intf_id, gemport_id);
- gem_ref_cnt.erase(gem_intf);
- OPENOLT_LOG(DEBUG, openolt_log_id, "removing gem_ref_cnt entry gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "gem_ref_cnt not zero yet gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- }
-}
-
-Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+//Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+Status handle_acl_rule_cleanup(int16_t acl_id, int32_t intf_id, const std::string flow_type) {
const std::string intf_type= flow_type.compare(upstream) == 0 ? "pon": "nni";
acl_id_intf_id_intf_type ac_id_inf_id_inf_type(acl_id, intf_id, intf_type);
intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type]--;
@@ -1219,8 +1251,6 @@
}
}
- clear_gem_port(gemport_id, intf_id);
-
return Status::OK;
}
@@ -1369,3 +1399,52 @@
return mac_address;
}
+
+void update_voltha_flow_to_cache(uint64_t voltha_flow_id, device_flow dev_flow) {
+ OPENOLT_LOG(DEBUG, openolt_log_id, "updating voltha flow=%lu to cache\n", voltha_flow_id)
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ voltha_flow_to_device_flow[voltha_flow_id] = dev_flow;
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+void remove_voltha_flow_from_cache(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ voltha_flow_to_device_flow.erase(it);
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+bool is_voltha_flow_installed(uint64_t voltha_flow_id ) {
+ int count;
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ count = voltha_flow_to_device_flow.count(voltha_flow_id);
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return count > 0 ? true : false;
+}
+
+const device_flow_params* get_device_flow_params(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+ return it->second.params;
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return NULL;
+}
+
+const device_flow* get_device_flow(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+ return &it->second;
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return NULL;
+}