VOL-3419: Replicate voltha flows in openolt agent
- Use the flow_id, symmetric_flow_id, replication_flag and pbit_to_gemport_map
coming in Flow proto messge to replicate the flow as needed
- Use the CreateQueues and RemoveQueues messages to setup and remove gem_ports
- Use latest gRPC version 1.31.1 which allows fine tuning of gRPC threadpools
which allows for greating performance.
- Performance numbers when tested with openolt-scale-tester has been better with
using latest gRPC and threadpool tuning when compared to earlier. It is to be
noted that the performance is better even with openolt-agent replicating the flow
now.
- Scale tests with 512 subscribers have been successfull with BAL3.4.7.5 version
- Use openolt proto version 4.0.0
- Use openolt-test (for unit test) image version 2.0.1 (which bundles latest gRPC version 1.31.1)
- These changes are NOT backward compatible and openolt-agent will have a major
version bump to 3.0.0
Change-Id: I715c804bdf342e60d08cab6c59e1c21b8c5ac1f4
diff --git a/agent/src/core_api_handler.cc b/agent/src/core_api_handler.cc
index aa45b0f..a2b0bcf 100644
--- a/agent/src/core_api_handler.cc
+++ b/agent/src/core_api_handler.cc
@@ -65,9 +65,9 @@
static std::string firmware_version = "Openolt.2019.07.01";
static bcmos_errno CreateSched(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
- uint32_t port_no, uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, \
- uint32_t priority, tech_profile::SchedulingPolicy sched_policy,
- tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
+ uint32_t port_no, uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, \
+ uint32_t priority, ::tech_profile::SchedulingPolicy sched_policy,
+ ::tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
static bcmos_errno RemoveSched(int intf_id, int onu_id, int uni_id, int alloc_id, std::string direction, int tech_profile_id);
static bcmos_errno CreateQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
bcmolt_egress_qos_type qos_type, uint32_t priority, uint32_t gemport_id, uint32_t tech_profile_id);
@@ -201,7 +201,7 @@
return Status::OK;
}
-Status GetDeviceInfo_(openolt::DeviceInfo* device_info) {
+Status GetDeviceInfo_(::openolt::DeviceInfo* device_info) {
device_info->set_vendor(VENDOR_ID);
device_info->set_model(MODEL_ID);
device_info->set_hardware_version("");
@@ -238,90 +238,49 @@
// Legacy, device-wide ranges. To be deprecated when adapter
// is upgraded to support per-interface ranges
- if (board_technology == "XGS-PON") {
- device_info->set_onu_id_start(1);
- device_info->set_onu_id_end(255);
- device_info->set_alloc_id_start(MIN_ALLOC_ID_XGSPON);
- device_info->set_alloc_id_end(16383);
- device_info->set_gemport_id_start(1024);
- device_info->set_gemport_id_end(65535);
- device_info->set_flow_id_start(1);
- device_info->set_flow_id_end(16383);
- }
- else if (board_technology == "GPON") {
- device_info->set_onu_id_start(1);
- device_info->set_onu_id_end(127);
- device_info->set_alloc_id_start(MIN_ALLOC_ID_GPON);
- device_info->set_alloc_id_end(767);
- device_info->set_gemport_id_start(256);
- device_info->set_gemport_id_end(4095);
- device_info->set_flow_id_start(1);
- device_info->set_flow_id_end(16383);
- }
+ device_info->set_onu_id_start(ONU_ID_START);
+ device_info->set_onu_id_end(ONU_ID_END);
+ device_info->set_alloc_id_start(ALLOC_ID_START);
+ device_info->set_alloc_id_end(ALLOC_ID_END);
+ device_info->set_gemport_id_start(GEM_PORT_ID_START);
+ device_info->set_gemport_id_end(GEM_PORT_ID_END);
+ device_info->set_flow_id_start(FLOW_ID_START);
+ device_info->set_flow_id_end(FLOW_ID_END);
- std::map<std::string, openolt::DeviceInfo::DeviceResourceRanges*> ranges;
+ std::map<std::string, ::openolt::DeviceInfo::DeviceResourceRanges*> ranges;
for (uint32_t intf_id = 0; intf_id < num_of_pon_ports; ++intf_id) {
std::string intf_technology = intf_technologies[intf_id];
- openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
+ ::openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
if(range == nullptr) {
range = device_info->add_ranges();
ranges[intf_technology] = range;
range->set_technology(intf_technology);
- if (intf_technology == "XGS-PON") {
- openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
+ ::openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1);
- pool->set_end(255);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(ONU_ID_START);
+ pool->set_end(ONU_ID_END);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1024);
- pool->set_end(16383);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(ALLOC_ID_START);
+ pool->set_end(ALLOC_ID_START);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1024);
- pool->set_end(65535);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(GEM_PORT_ID_START);
+ pool->set_end(GEM_PORT_ID_END);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
- pool->set_start(1);
- pool->set_end(16383);
- }
- else if (intf_technology == "GPON") {
- openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1);
- pool->set_end(127);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(256);
- pool->set_end(757);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(256);
- pool->set_end(4095);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
- pool->set_start(1);
- pool->set_end(16383);
- }
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
+ pool->set_start(FLOW_ID_START);
+ pool->set_end(FLOW_ID_END);
}
range->add_intf_ids(intf_id);
@@ -376,6 +335,11 @@
}
bcmos_fastlock_init(&data_lock, 0);
+ bcmos_fastlock_init(&acl_id_bitset_lock, 0);
+ bcmos_fastlock_init(&tm_sched_bitset_lock, 0);
+ bcmos_fastlock_init(&tm_qmp_bitset_lock, 0);
+ bcmos_fastlock_init(&flow_id_bitset_lock, 0);
+ bcmos_fastlock_init(&voltha_flow_to_device_flow_lock, 0);
bcmos_fastlock_init(&alloc_cfg_wait_lock, 0);
bcmos_fastlock_init(&onu_deactivate_wait_lock, 0);
OPENOLT_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
@@ -469,8 +433,8 @@
}
if (failedCount == 0) {
state.deactivate();
- openolt::Indication ind;
- openolt::OltIndication* olt_ind = new openolt::OltIndication;
+ ::openolt::Indication ind;
+ ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
olt_ind->set_oper_state("down");
ind.set_allocated_olt_ind(olt_ind);
BCM_LOG(INFO, openolt_log_id, "Disable OLT, add an extra indication\n");
@@ -496,8 +460,8 @@
}
if (failedCount == 0) {
state.activate();
- openolt::Indication ind;
- openolt::OltIndication* olt_ind = new openolt::OltIndication;
+ ::openolt::Indication ind;
+ ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
olt_ind->set_oper_state("up");
ind.set_allocated_olt_ind(olt_ind);
BCM_LOG(INFO, openolt_log_id, "Reenable OLT, add an extra indication\n");
@@ -1182,7 +1146,7 @@
return bcm_to_grpc_err(err, "Failed to configure ONU");
}
}
-
+
if (omcc_encryption_mode == true) {
// set the encryption mode for omci port id
bcmolt_itupon_gem_cfg gem_cfg;
@@ -1442,7 +1406,7 @@
bcmolt_flow_send_eth_packet oper; /* declare main API struct */
// TODO: flow_id is currently not passed in UplinkPacket message from voltha.
- bcmolt_flow_id flow_id = 0;
+ bcmolt_flow_id flow_id = INVALID_FLOW_ID;
//validate flow_id and find flow_id/flow type: upstream/ingress type: PON/egress type: NNI
if (get_flow_status(flow_id, BCMOLT_FLOW_TYPE_UPSTREAM, FLOW_TYPE) == BCMOLT_FLOW_TYPE_UPSTREAM && \
@@ -1492,6 +1456,192 @@
return Status::OK;
}
+
+Status FlowAddWrapper_(const ::openolt::Flow* request) {
+
+ int32_t access_intf_id = request->access_intf_id();
+ int32_t onu_id = request->onu_id();
+ int32_t uni_id = request->uni_id();
+ uint32_t port_no = request->port_no();
+ uint64_t voltha_flow_id = request->flow_id();
+ uint64_t symmetric_voltha_flow_id = request->symmetric_flow_id();
+ const std::string flow_type = request->flow_type();
+ int32_t alloc_id = request->alloc_id();
+ int32_t network_intf_id = request->network_intf_id();
+ int32_t gemport_id = request->gemport_id();
+ const ::openolt::Classifier& classifier = request->classifier();
+ const ::openolt::Action& action = request->action();
+ int32_t priority = request->priority();
+ uint64_t cookie = request->cookie();
+ int32_t group_id = request->group_id();
+ uint32_t tech_profile_id = request->tech_profile_id();
+ bool replicate_flow = request->replicate_flow();
+ const google::protobuf::Map<unsigned int, unsigned int> &pbit_to_gemport = request->pbit_to_gemport();
+ uint16_t flow_id;
+
+ // The intf_id variable defaults to access(PON) interface ID.
+ // For trap-from-nni flow where access interface ID is not valid , change it to NNI interface ID
+ // This intf_id identifies the pool from which we get the flow_id
+ uint32_t intf_id = access_intf_id;
+ if (onu_id < 1) {
+ onu_id = 1;
+ }
+ if (access_intf_id < 0) {
+ intf_id = network_intf_id;
+ }
+
+ OPENOLT_LOG(INFO, openolt_log_id, "received flow add. voltha_flow_id=%lu, symmetric_voltha_flow_id=%lu, replication=%d\n", voltha_flow_id, symmetric_voltha_flow_id, replicate_flow)
+ // This is the case of voltha_flow_id (not symmetric_voltha_flow_id)
+ if (is_voltha_flow_installed(voltha_flow_id)) {
+ OPENOLT_LOG(INFO, openolt_log_id, "voltha_flow_id=%lu, already installed\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::ALREADY_EXISTS, "voltha-flow-already-installed");
+ }
+
+ // Trap-to-host voltha flows need not be replicated as they are installed as ACLs, not BAL flows.
+ if (action.cmd().trap_to_host()) {
+ replicate_flow = false;
+ }
+
+ // This is the case of symmetric_voltha_flow_id
+ // If symmetric_voltha_flow_id is available and valid in the Flow message,
+ // check if it is installed, and use the corresponding device_flow_id
+ if (symmetric_voltha_flow_id > 0 && is_voltha_flow_installed(symmetric_voltha_flow_id)) { // symmetric flow found
+ OPENOLT_LOG(INFO, openolt_log_id, "symmetric flow and the symmetric flow is installed\n");
+ const device_flow_params *dev_fl_symm_params;
+ dev_fl_symm_params = get_device_flow_params(symmetric_voltha_flow_id);
+ if (dev_fl_symm_params == NULL) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "symmetric flow device params not found symm-voltha-flow=%lu voltha-flow=%lu\n", symmetric_voltha_flow_id, voltha_flow_id)
+ return ::Status(grpc::StatusCode::INTERNAL, "symmetric-flow-details-not-found");
+ }
+
+ if (!replicate_flow) { // No flow replication
+ flow_id = dev_fl_symm_params[0].flow_id;
+ gemport_id = dev_fl_symm_params[0].gemport_id; // overwrite the gemport with symmetric flow gemport
+ // Should be same as what is coming in this request.
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ cl.set_o_pbits(dev_fl_symm_params[0].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, cl,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = false;
+ dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params));
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ }
+ return st;
+ } else { // Flow to be replicated
+ OPENOLT_LOG(INFO, openolt_log_id,"symmetric flow and replication is needed\n");
+ for (uint8_t i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ flow_id = dev_fl_symm_params[i].flow_id;
+ gemport_id = dev_fl_symm_params[i].gemport_id;
+ cl.set_o_pbits(dev_fl_symm_params[i].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() != grpc::StatusCode::OK && st.error_code() != grpc::StatusCode::ALREADY_EXISTS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+ // On failure remove any successfully replicated flows installed so far for the voltha_flow_id
+ if (i > 0) {
+ for (int8_t j = i-1; j >= 0; j--) {
+ flow_id = dev_fl_symm_params[j].flow_id;
+ FlowRemove_(flow_id, flow_type);
+ }
+ }
+ return st;
+ }
+ }
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = true;
+ dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params)*NUMBER_OF_REPLICATED_FLOWS);
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ }
+ } else { // No symmetric flow found
+ if (!replicate_flow) { // No flow replication
+ OPENOLT_LOG(INFO, openolt_log_id, "not a symmetric flow and replication is not needed\n");
+ flow_id = get_flow_id();
+ if (flow_id == INVALID_FLOW_ID) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated flow id for voltha-flow-id=%lu\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+ }
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = false;
+ dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // Invalid
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ dev_fl.params[0].flow_id = flow_id;
+ dev_fl.params[0].gemport_id = gemport_id;
+ dev_fl.params[0].pbit = classifier.o_pbits();
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ } else {
+ // Free the flow id on failure
+ free_flow_id(flow_id);
+ }
+ return st;
+ } else { // Flow to be replicated
+ OPENOLT_LOG(INFO, openolt_log_id,"not a symmetric flow and replication is needed\n");
+ if (pbit_to_gemport.size() != NUMBER_OF_PBITS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "invalid pbit-to-gemport map size=%lu", pbit_to_gemport.size())
+ return ::Status(grpc::StatusCode::OUT_OF_RANGE, "pbit-to-gemport-map-len-invalid-for-flow-replication");
+ }
+ uint16_t flow_ids[NUMBER_OF_REPLICATED_FLOWS];
+ device_flow dev_fl;
+ if (get_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids)) {
+ uint8_t cnt = 0;
+ dev_fl.is_flow_replicated = true;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // invalid
+ for (google::protobuf::Map<unsigned int, unsigned int>::const_iterator it=pbit_to_gemport.begin(); it!=pbit_to_gemport.end(); it++) {
+ dev_fl.params[cnt].flow_id = flow_ids[cnt];
+ dev_fl.params[cnt].pbit = it->first;
+ dev_fl.params[cnt].gemport_id = it->second;
+
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ flow_id = dev_fl.params[cnt].flow_id;
+ gemport_id = dev_fl.params[cnt].gemport_id;
+ cl.set_o_pbits(dev_fl.params[cnt].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, cl,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() != grpc::StatusCode::OK) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+ // Remove any successfully replicated flows installed so far for the voltha_flow_id
+ if (cnt > 0) {
+ for (int8_t j = cnt-1; j >= 0; j--) {
+ flow_id = dev_fl.params[j].flow_id;
+ FlowRemove_(flow_id, flow_type);
+ }
+ }
+ // Free up all the flow IDs on failure
+ free_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids);
+ return st;
+ }
+ cnt++;
+ }
+ // On successful flow replication update voltha-flow-id to device-flow map to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ } else {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocate flow ids for replication voltha-flow-id=%lu\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+ }
+ }
+ }
+
+ return Status::OK;
+}
+
+
Status FlowAdd_(int32_t access_intf_id, int32_t onu_id, int32_t uni_id, uint32_t port_no,
uint32_t flow_id, const std::string flow_type,
int32_t alloc_id, int32_t network_intf_id,
@@ -1528,7 +1678,7 @@
if (action.cmd().trap_to_host()) {
Status resp = handle_acl_rule_install(onu_id, flow_id, flow_type, access_intf_id,
- network_intf_id, gemport_id, classifier);
+ network_intf_id, classifier);
return resp;
}
@@ -1569,18 +1719,7 @@
}
bcmos_fastlock_unlock(&data_lock, 0);
}
- if (gemport_id >= 0 && access_intf_id >= 0) {
- // Update the flow_to_acl_map. Note that since this is a datapath flow, acl_id is -1
- // This info is needed during flow remove where we need to retrieve the gemport_id
- // and access_intf id for the given flow id and flow direction.
- // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
- // reference counters for those ACL ID and GEMPORT ID.
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(-1, gemport_id, access_intf_id);
- flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
- bcmos_fastlock_lock(&data_lock);
- flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
- bcmos_fastlock_unlock(&data_lock, 0);
- }
+
if (priority_value >= 0) {
BCMOLT_MSG_FIELD_SET(&cfg, priority, priority_value);
}
@@ -1788,11 +1927,14 @@
BCMOLT_MSG_FIELD_SET(&cfg, state, BCMOLT_FLOW_STATE_ENABLE);
+#ifndef SCALE_AND_PERF
// BAL 3.1 supports statistics only for unicast flows.
if (key.flow_type != BCMOLT_FLOW_TYPE_MULTICAST) {
BCMOLT_MSG_FIELD_SET(&cfg, statistics, BCMOLT_CONTROL_STATE_ENABLE);
}
+#endif // SCALE_AND_PERF
+#ifndef SCALE_AND_PERF
#ifdef FLOW_CHECKER
//Flow Checker, To avoid duplicate flow.
if (flow_id_counters != 0) {
@@ -1838,7 +1980,8 @@
}
}
}
-#endif
+#endif // FLOW_CHECKER
+#endif // SCALE_AND_PERF
bcmos_errno err = bcmolt_cfg_set(dev_id, &cfg.hdr);
if (err) {
@@ -1849,25 +1992,45 @@
bcmos_fastlock_lock(&data_lock);
flow_map[std::pair<int, int>(key.flow_id,key.flow_type)] = flow_map.size();
flow_id_counters = flow_map.size();
- if (gemport_id > 0 && access_intf_id >= 0) {
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
- if (gem_ref_cnt.count(gem_intf) > 0) {
- // The gem port is already installed
- // Increment the ref counter indicating number of flows referencing this gem port
- gem_ref_cnt[gem_intf]++;
- OPENOLT_LOG(DEBUG, openolt_log_id, "incremented gem_ref_cnt, gem_ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
- } else {
- // Initialize the refence count for the gemport.
- gem_ref_cnt[gem_intf] = 1;
- OPENOLT_LOG(DEBUG, openolt_log_id, "initialized gem_ref_cnt\n");
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt flow_id=%d gemport_id=%d access_intf_id=%d\n", flow_id, gemport_id, access_intf_id);
- }
-
bcmos_fastlock_unlock(&data_lock, 0);
+
+ }
+ return Status::OK;
+}
+
+Status FlowRemoveWrapper_(const ::openolt::Flow* request) {
+ const std::string flow_type = request->flow_type();
+ uint64_t voltha_flow_id = request->flow_id();
+ Status st;
+
+ // If Voltha flow is not installed, return fail
+ if (! is_voltha_flow_installed(voltha_flow_id)) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "voltha_flow_id=%lu not found\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::NOT_FOUND, "voltha-flow-not-found");
}
+ const device_flow *dev_fl = get_device_flow(voltha_flow_id);
+ if (dev_fl == NULL) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "device flow for voltha_flow_id=%lu in the cache is NULL\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::INTERNAL, "device-flow-null-in-cache");
+ }
+ if (dev_fl->is_flow_replicated) {
+ // Note: Here we are ignoring FlowRemove failures
+ for (int i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+ st = FlowRemove_(dev_fl->params[i].flow_id, flow_type);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ free_flow_id(dev_fl->params[i].flow_id);
+ }
+ }
+ } else {
+ // Note: Here we are ignoring FlowRemove failures
+ st = FlowRemove_(dev_fl->params[0].flow_id, flow_type);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ free_flow_id(dev_fl->params[0].flow_id);
+ }
+ }
+ // remove the flow from cache on voltha flow removal
+ remove_voltha_flow_from_cache(voltha_flow_id);
return Status::OK;
}
@@ -1898,13 +2061,13 @@
int32_t intf_id = -1;
int16_t acl_id = -1;
if (flow_to_acl_map.count(fl_id_fl_dir) > 0) {
- acl_id_gem_id_intf_id ac_id_gm_id_if_id = flow_to_acl_map[fl_id_fl_dir];
- acl_id = std::get<0>(ac_id_gm_id_if_id);
- gemport_id = std::get<1>(ac_id_gm_id_if_id);
- intf_id = std::get<2>(ac_id_gm_id_if_id);
+
+ acl_id_intf_id ac_id_if_id = flow_to_acl_map[fl_id_fl_dir];
+ acl_id = std::get<0>(ac_id_if_id);
+ intf_id = std::get<1>(ac_id_if_id);
// cleanup acl only if it is a valid acl. If not valid acl, it may be datapath flow.
if (acl_id >= 0) {
- Status resp = handle_acl_rule_cleanup(acl_id, gemport_id, intf_id, flow_type);
+ Status resp = handle_acl_rule_cleanup(acl_id, intf_id, flow_type);
bcmos_fastlock_unlock(&data_lock, 0);
if (resp.ok()) {
OPENOLT_LOG(INFO, openolt_log_id, "acl removed ok for flow_id = %u with acl_id = %d\n", flow_id, acl_id);
@@ -1948,8 +2111,6 @@
}
OPENOLT_LOG(INFO, openolt_log_id, "Flow %d, %s removed\n", flow_id, flow_type.c_str());
- clear_gem_port(gemport_id, intf_id);
-
flow_to_acl_map.erase(fl_id_fl_dir);
bcmos_fastlock_unlock(&data_lock, 0);
@@ -2021,8 +2182,8 @@
}
bcmos_errno CreateSched(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
- uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
- tech_profile::SchedulingPolicy sched_policy, tech_profile::TrafficShapingInfo tf_sh_info,
+ uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
+ ::tech_profile::SchedulingPolicy sched_policy, ::tech_profile::TrafficShapingInfo tf_sh_info,
uint32_t tech_profile_id) {
bcmos_errno err;
@@ -2191,13 +2352,14 @@
port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
return err;
}
-
+#ifndef SCALE_AND_PERF
err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_CREATE);
if (err) {
OPENOLT_LOG(ERROR, openolt_log_id, "Failed to create upstream bandwidth allocation, intf_id %d, onu_id %d, uni_id %d,\
port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
return err;
}
+#endif
OPENOLT_LOG(INFO, openolt_log_id, "create upstream bandwidth allocation success, intf_id %d, onu_id %d, uni_id %d,\
port_no %u, alloc_id %d\n", intf_id, onu_id,uni_id,port_no,alloc_id);
@@ -2207,24 +2369,24 @@
return BCM_ERR_OK;
}
-Status CreateTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status CreateTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
uint32_t intf_id = traffic_scheds->intf_id();
uint32_t onu_id = traffic_scheds->onu_id();
uint32_t uni_id = traffic_scheds->uni_id();
uint32_t port_no = traffic_scheds->port_no();
std::string direction;
unsigned int alloc_id;
- tech_profile::SchedulerConfig sched_config;
- tech_profile::AdditionalBW additional_bw;
+ ::tech_profile::SchedulerConfig sched_config;
+ ::tech_profile::AdditionalBW additional_bw;
uint32_t priority;
uint32_t weight;
- tech_profile::SchedulingPolicy sched_policy;
- tech_profile::TrafficShapingInfo traffic_shaping_info;
+ ::tech_profile::SchedulingPolicy sched_policy;
+ ::tech_profile::TrafficShapingInfo traffic_shaping_info;
uint32_t tech_profile_id;
bcmos_errno err;
for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
- tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+ ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
direction = GetDirection(traffic_sched.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2273,6 +2435,7 @@
err = get_pon_interface_status((bcmolt_interface)intf_id, &state, &los_status);
if (err == BCM_ERR_OK) {
if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_OFF) {
+#ifndef SCALE_AND_PERF
OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled and LoS status is OFF, waiting for alloc cfg clear response\n",
intf_id);
err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_DELETE);
@@ -2281,6 +2444,7 @@
direction.c_str(), intf_id, alloc_id, bcmos_strerror(err));
return err;
}
+#endif
}
else if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_ON) {
OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled but LoS status is ON, not waiting for alloc cfg clear response\n",
@@ -2322,7 +2486,7 @@
return BCM_ERR_OK;
}
-Status RemoveTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status RemoveTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
uint32_t intf_id = traffic_scheds->intf_id();
uint32_t onu_id = traffic_scheds->onu_id();
uint32_t uni_id = traffic_scheds->uni_id();
@@ -2331,7 +2495,7 @@
bcmos_errno err;
for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
- tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+ ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
direction = GetDirection(traffic_sched.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2483,12 +2647,20 @@
return err;
}
+ if (direction.compare(upstream) == 0) {
+ Status st = install_gem_port(access_intf_id, onu_id, gemport_id);
+ if (st.error_code() != grpc::StatusCode::ALREADY_EXISTS && st.error_code() != grpc::StatusCode::OK) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to created gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
+ return BCM_ERR_INTERNAL;
+ }
+ }
+
OPENOLT_LOG(INFO, openolt_log_id, "Created tm_queue, direction %s, id %d, sched_id %d, tm_q_set_id %d, \
intf_id %d, onu_id %d, uni_id %d, tech_profiled_id %d\n", direction.c_str(), key.id, key.sched_id, key.tm_q_set_id, access_intf_id, onu_id, uni_id, tech_profile_id);
return BCM_ERR_OK;
}
-Status CreateTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status CreateTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
uint32_t intf_id = traffic_queues->intf_id();
uint32_t onu_id = traffic_queues->onu_id();
uint32_t uni_id = traffic_queues->uni_id();
@@ -2502,7 +2674,7 @@
uint32_t queues_priority_q[traffic_queues->traffic_queues_size()] = {0};
std::string queues_pbit_map[traffic_queues->traffic_queues_size()];
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2532,7 +2704,7 @@
}
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2556,6 +2728,11 @@
bcmos_errno err;
if (direction == downstream) {
+ // Gem ports are HSI/VoIP/VoD traffic is bidirectional. Multicast gem-port are downstream only. Since the common direction between
+ // bidirectional traffic and multicast gem-port is "downstream" direction and we need to remove the gemport for a given gemport id only once,
+ // we remove the gem port when downstream direction queue is getting removed.
+ remove_gem_port(access_intf_id, gemport_id);
+
if (is_tm_sched_id_present(access_intf_id, onu_id, uni_id, direction, tech_profile_id)) {
key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction, tech_profile_id);
key.id = queue_id_list[priority];
@@ -2596,7 +2773,7 @@
return BCM_ERR_OK;
}
-Status RemoveTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status RemoveTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
uint32_t intf_id = traffic_queues->intf_id();
uint32_t onu_id = traffic_queues->onu_id();
uint32_t uni_id = traffic_queues->uni_id();
@@ -2608,7 +2785,7 @@
bcmolt_egress_qos_type qos_type = get_qos_type(intf_id, onu_id, uni_id, traffic_queues->traffic_queues_size());
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2638,7 +2815,7 @@
return Status::OK;
}
-Status PerformGroupOperation_(const openolt::Group *group_cfg) {
+Status PerformGroupOperation_(const ::openolt::Group *group_cfg) {
bcmos_errno err;
bcmolt_group_key key = {};
@@ -2748,17 +2925,17 @@
}
/* SET GROUP MEMBERS UPDATE COMMAND */
- openolt::Group::GroupMembersCommand command = group_cfg->command();
+ ::openolt::Group::GroupMembersCommand command = group_cfg->command();
switch(command) {
- case openolt::Group::SET_MEMBERS :
+ case ::openolt::Group::SET_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_SET;
OPENOLT_LOG(INFO, openolt_log_id, "Setting %d members for Group %d.\n", members.len, group_id);
break;
- case openolt::Group::ADD_MEMBERS :
+ case ::openolt::Group::ADD_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_ADD;
OPENOLT_LOG(INFO, openolt_log_id, "Adding %d members to Group %d.\n", members.len, group_id);
break;
- case openolt::Group::REMOVE_MEMBERS :
+ case ::openolt::Group::REMOVE_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_REMOVE;
OPENOLT_LOG(INFO, openolt_log_id, "Removing %d members from Group %d.\n", members.len, group_id);
break;
@@ -2771,26 +2948,26 @@
// SET MEMBERS LIST
for (int i = 0; i < members.len; i++) {
- if (command == openolt::Group::REMOVE_MEMBERS) {
+ if (command == ::openolt::Group::REMOVE_MEMBERS) {
OPENOLT_LOG(INFO, openolt_log_id, "Removing group member %d from group %d\n",i,key.id);
} else {
OPENOLT_LOG(INFO, openolt_log_id, "Adding group member %d to group %d\n",i,key.id);
}
- openolt::GroupMember *member = (openolt::GroupMember *) &group_cfg->members()[i];
+ ::openolt::GroupMember *member = (::openolt::GroupMember *) &group_cfg->members()[i];
// Set member interface type
- openolt::GroupMember::InterfaceType if_type = member->interface_type();
+ ::openolt::GroupMember::InterfaceType if_type = member->interface_type();
switch(if_type){
- case openolt::GroupMember::PON :
+ case ::openolt::GroupMember::PON :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_PON);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type PON is assigned to GroupMember %d\n",i);
break;
- case openolt::GroupMember::EPON_1G_PATH :
+ case ::openolt::GroupMember::EPON_1G_PATH :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_1_G);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_1G is assigned to GroupMember %d\n",i);
break;
- case openolt::GroupMember::EPON_10G_PATH :
+ case ::openolt::GroupMember::EPON_10G_PATH :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_10_G);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_10G is assigned to GroupMember %d\n",i);
break;
@@ -2907,7 +3084,7 @@
return Status::OK;
}
-Status GetLogicalOnuDistanceZero_(uint32_t intf_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistanceZero_(uint32_t intf_id, ::openolt::OnuLogicalDistance* response) {
bcmos_errno err = BCM_ERR_OK;
uint32_t mld = 0;
double LD0;
@@ -2925,7 +3102,7 @@
return Status::OK;
}
-Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, ::openolt::OnuLogicalDistance* response) {
bcmos_errno err = BCM_ERR_OK;
bcmolt_itu_onu_params itu = {};
bcmolt_onu_cfg onu_cfg;