VOL-3419: Replicate voltha flows in openolt agent
- Use the flow_id, symmetric_flow_id, replication_flag and pbit_to_gemport_map
coming in Flow proto messge to replicate the flow as needed
- Use the CreateQueues and RemoveQueues messages to setup and remove gem_ports
- Use latest gRPC version 1.31.1 which allows fine tuning of gRPC threadpools
which allows for greating performance.
- Performance numbers when tested with openolt-scale-tester has been better with
using latest gRPC and threadpool tuning when compared to earlier. It is to be
noted that the performance is better even with openolt-agent replicating the flow
now.
- Scale tests with 512 subscribers have been successfull with BAL3.4.7.5 version
- Use openolt proto version 4.0.0
- Use openolt-test (for unit test) image version 2.0.1 (which bundles latest gRPC version 1.31.1)
- These changes are NOT backward compatible and openolt-agent will have a major
version bump to 3.0.0
Change-Id: I715c804bdf342e60d08cab6c59e1c21b8c5ac1f4
diff --git a/agent/src/core_api_handler.cc b/agent/src/core_api_handler.cc
index aa45b0f..a2b0bcf 100644
--- a/agent/src/core_api_handler.cc
+++ b/agent/src/core_api_handler.cc
@@ -65,9 +65,9 @@
static std::string firmware_version = "Openolt.2019.07.01";
static bcmos_errno CreateSched(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
- uint32_t port_no, uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, \
- uint32_t priority, tech_profile::SchedulingPolicy sched_policy,
- tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
+ uint32_t port_no, uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, \
+ uint32_t priority, ::tech_profile::SchedulingPolicy sched_policy,
+ ::tech_profile::TrafficShapingInfo traffic_shaping_info, uint32_t tech_profile_id);
static bcmos_errno RemoveSched(int intf_id, int onu_id, int uni_id, int alloc_id, std::string direction, int tech_profile_id);
static bcmos_errno CreateQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
bcmolt_egress_qos_type qos_type, uint32_t priority, uint32_t gemport_id, uint32_t tech_profile_id);
@@ -201,7 +201,7 @@
return Status::OK;
}
-Status GetDeviceInfo_(openolt::DeviceInfo* device_info) {
+Status GetDeviceInfo_(::openolt::DeviceInfo* device_info) {
device_info->set_vendor(VENDOR_ID);
device_info->set_model(MODEL_ID);
device_info->set_hardware_version("");
@@ -238,90 +238,49 @@
// Legacy, device-wide ranges. To be deprecated when adapter
// is upgraded to support per-interface ranges
- if (board_technology == "XGS-PON") {
- device_info->set_onu_id_start(1);
- device_info->set_onu_id_end(255);
- device_info->set_alloc_id_start(MIN_ALLOC_ID_XGSPON);
- device_info->set_alloc_id_end(16383);
- device_info->set_gemport_id_start(1024);
- device_info->set_gemport_id_end(65535);
- device_info->set_flow_id_start(1);
- device_info->set_flow_id_end(16383);
- }
- else if (board_technology == "GPON") {
- device_info->set_onu_id_start(1);
- device_info->set_onu_id_end(127);
- device_info->set_alloc_id_start(MIN_ALLOC_ID_GPON);
- device_info->set_alloc_id_end(767);
- device_info->set_gemport_id_start(256);
- device_info->set_gemport_id_end(4095);
- device_info->set_flow_id_start(1);
- device_info->set_flow_id_end(16383);
- }
+ device_info->set_onu_id_start(ONU_ID_START);
+ device_info->set_onu_id_end(ONU_ID_END);
+ device_info->set_alloc_id_start(ALLOC_ID_START);
+ device_info->set_alloc_id_end(ALLOC_ID_END);
+ device_info->set_gemport_id_start(GEM_PORT_ID_START);
+ device_info->set_gemport_id_end(GEM_PORT_ID_END);
+ device_info->set_flow_id_start(FLOW_ID_START);
+ device_info->set_flow_id_end(FLOW_ID_END);
- std::map<std::string, openolt::DeviceInfo::DeviceResourceRanges*> ranges;
+ std::map<std::string, ::openolt::DeviceInfo::DeviceResourceRanges*> ranges;
for (uint32_t intf_id = 0; intf_id < num_of_pon_ports; ++intf_id) {
std::string intf_technology = intf_technologies[intf_id];
- openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
+ ::openolt::DeviceInfo::DeviceResourceRanges *range = ranges[intf_technology];
if(range == nullptr) {
range = device_info->add_ranges();
ranges[intf_technology] = range;
range->set_technology(intf_technology);
- if (intf_technology == "XGS-PON") {
- openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
+ ::openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1);
- pool->set_end(255);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(ONU_ID_START);
+ pool->set_end(ONU_ID_END);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1024);
- pool->set_end(16383);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(ALLOC_ID_START);
+ pool->set_end(ALLOC_ID_START);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1024);
- pool->set_end(65535);
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
+ pool->set_start(GEM_PORT_ID_START);
+ pool->set_end(GEM_PORT_ID_END);
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
- pool->set_start(1);
- pool->set_end(16383);
- }
- else if (intf_technology == "GPON") {
- openolt::DeviceInfo::DeviceResourceRanges::Pool* pool;
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ONU_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(1);
- pool->set_end(127);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(256);
- pool->set_end(757);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
- pool->set_start(256);
- pool->set_end(4095);
-
- pool = range->add_pools();
- pool->set_type(openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
- pool->set_sharing(openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
- pool->set_start(1);
- pool->set_end(16383);
- }
+ pool = range->add_pools();
+ pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::FLOW_ID);
+ pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::SHARED_BY_ALL_INTF_ALL_TECH);
+ pool->set_start(FLOW_ID_START);
+ pool->set_end(FLOW_ID_END);
}
range->add_intf_ids(intf_id);
@@ -376,6 +335,11 @@
}
bcmos_fastlock_init(&data_lock, 0);
+ bcmos_fastlock_init(&acl_id_bitset_lock, 0);
+ bcmos_fastlock_init(&tm_sched_bitset_lock, 0);
+ bcmos_fastlock_init(&tm_qmp_bitset_lock, 0);
+ bcmos_fastlock_init(&flow_id_bitset_lock, 0);
+ bcmos_fastlock_init(&voltha_flow_to_device_flow_lock, 0);
bcmos_fastlock_init(&alloc_cfg_wait_lock, 0);
bcmos_fastlock_init(&onu_deactivate_wait_lock, 0);
OPENOLT_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
@@ -469,8 +433,8 @@
}
if (failedCount == 0) {
state.deactivate();
- openolt::Indication ind;
- openolt::OltIndication* olt_ind = new openolt::OltIndication;
+ ::openolt::Indication ind;
+ ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
olt_ind->set_oper_state("down");
ind.set_allocated_olt_ind(olt_ind);
BCM_LOG(INFO, openolt_log_id, "Disable OLT, add an extra indication\n");
@@ -496,8 +460,8 @@
}
if (failedCount == 0) {
state.activate();
- openolt::Indication ind;
- openolt::OltIndication* olt_ind = new openolt::OltIndication;
+ ::openolt::Indication ind;
+ ::openolt::OltIndication* olt_ind = new ::openolt::OltIndication;
olt_ind->set_oper_state("up");
ind.set_allocated_olt_ind(olt_ind);
BCM_LOG(INFO, openolt_log_id, "Reenable OLT, add an extra indication\n");
@@ -1182,7 +1146,7 @@
return bcm_to_grpc_err(err, "Failed to configure ONU");
}
}
-
+
if (omcc_encryption_mode == true) {
// set the encryption mode for omci port id
bcmolt_itupon_gem_cfg gem_cfg;
@@ -1442,7 +1406,7 @@
bcmolt_flow_send_eth_packet oper; /* declare main API struct */
// TODO: flow_id is currently not passed in UplinkPacket message from voltha.
- bcmolt_flow_id flow_id = 0;
+ bcmolt_flow_id flow_id = INVALID_FLOW_ID;
//validate flow_id and find flow_id/flow type: upstream/ingress type: PON/egress type: NNI
if (get_flow_status(flow_id, BCMOLT_FLOW_TYPE_UPSTREAM, FLOW_TYPE) == BCMOLT_FLOW_TYPE_UPSTREAM && \
@@ -1492,6 +1456,192 @@
return Status::OK;
}
+
+Status FlowAddWrapper_(const ::openolt::Flow* request) {
+
+ int32_t access_intf_id = request->access_intf_id();
+ int32_t onu_id = request->onu_id();
+ int32_t uni_id = request->uni_id();
+ uint32_t port_no = request->port_no();
+ uint64_t voltha_flow_id = request->flow_id();
+ uint64_t symmetric_voltha_flow_id = request->symmetric_flow_id();
+ const std::string flow_type = request->flow_type();
+ int32_t alloc_id = request->alloc_id();
+ int32_t network_intf_id = request->network_intf_id();
+ int32_t gemport_id = request->gemport_id();
+ const ::openolt::Classifier& classifier = request->classifier();
+ const ::openolt::Action& action = request->action();
+ int32_t priority = request->priority();
+ uint64_t cookie = request->cookie();
+ int32_t group_id = request->group_id();
+ uint32_t tech_profile_id = request->tech_profile_id();
+ bool replicate_flow = request->replicate_flow();
+ const google::protobuf::Map<unsigned int, unsigned int> &pbit_to_gemport = request->pbit_to_gemport();
+ uint16_t flow_id;
+
+ // The intf_id variable defaults to access(PON) interface ID.
+ // For trap-from-nni flow where access interface ID is not valid , change it to NNI interface ID
+ // This intf_id identifies the pool from which we get the flow_id
+ uint32_t intf_id = access_intf_id;
+ if (onu_id < 1) {
+ onu_id = 1;
+ }
+ if (access_intf_id < 0) {
+ intf_id = network_intf_id;
+ }
+
+ OPENOLT_LOG(INFO, openolt_log_id, "received flow add. voltha_flow_id=%lu, symmetric_voltha_flow_id=%lu, replication=%d\n", voltha_flow_id, symmetric_voltha_flow_id, replicate_flow)
+ // This is the case of voltha_flow_id (not symmetric_voltha_flow_id)
+ if (is_voltha_flow_installed(voltha_flow_id)) {
+ OPENOLT_LOG(INFO, openolt_log_id, "voltha_flow_id=%lu, already installed\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::ALREADY_EXISTS, "voltha-flow-already-installed");
+ }
+
+ // Trap-to-host voltha flows need not be replicated as they are installed as ACLs, not BAL flows.
+ if (action.cmd().trap_to_host()) {
+ replicate_flow = false;
+ }
+
+ // This is the case of symmetric_voltha_flow_id
+ // If symmetric_voltha_flow_id is available and valid in the Flow message,
+ // check if it is installed, and use the corresponding device_flow_id
+ if (symmetric_voltha_flow_id > 0 && is_voltha_flow_installed(symmetric_voltha_flow_id)) { // symmetric flow found
+ OPENOLT_LOG(INFO, openolt_log_id, "symmetric flow and the symmetric flow is installed\n");
+ const device_flow_params *dev_fl_symm_params;
+ dev_fl_symm_params = get_device_flow_params(symmetric_voltha_flow_id);
+ if (dev_fl_symm_params == NULL) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "symmetric flow device params not found symm-voltha-flow=%lu voltha-flow=%lu\n", symmetric_voltha_flow_id, voltha_flow_id)
+ return ::Status(grpc::StatusCode::INTERNAL, "symmetric-flow-details-not-found");
+ }
+
+ if (!replicate_flow) { // No flow replication
+ flow_id = dev_fl_symm_params[0].flow_id;
+ gemport_id = dev_fl_symm_params[0].gemport_id; // overwrite the gemport with symmetric flow gemport
+ // Should be same as what is coming in this request.
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ cl.set_o_pbits(dev_fl_symm_params[0].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, cl,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = false;
+ dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params));
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ }
+ return st;
+ } else { // Flow to be replicated
+ OPENOLT_LOG(INFO, openolt_log_id,"symmetric flow and replication is needed\n");
+ for (uint8_t i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ flow_id = dev_fl_symm_params[i].flow_id;
+ gemport_id = dev_fl_symm_params[i].gemport_id;
+ cl.set_o_pbits(dev_fl_symm_params[i].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() != grpc::StatusCode::OK && st.error_code() != grpc::StatusCode::ALREADY_EXISTS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+ // On failure remove any successfully replicated flows installed so far for the voltha_flow_id
+ if (i > 0) {
+ for (int8_t j = i-1; j >= 0; j--) {
+ flow_id = dev_fl_symm_params[j].flow_id;
+ FlowRemove_(flow_id, flow_type);
+ }
+ }
+ return st;
+ }
+ }
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = true;
+ dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params)*NUMBER_OF_REPLICATED_FLOWS);
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ }
+ } else { // No symmetric flow found
+ if (!replicate_flow) { // No flow replication
+ OPENOLT_LOG(INFO, openolt_log_id, "not a symmetric flow and replication is not needed\n");
+ flow_id = get_flow_id();
+ if (flow_id == INVALID_FLOW_ID) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated flow id for voltha-flow-id=%lu\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+ }
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, classifier,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ device_flow dev_fl;
+ dev_fl.is_flow_replicated = false;
+ dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // Invalid
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ dev_fl.params[0].flow_id = flow_id;
+ dev_fl.params[0].gemport_id = gemport_id;
+ dev_fl.params[0].pbit = classifier.o_pbits();
+ // update voltha flow to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ } else {
+ // Free the flow id on failure
+ free_flow_id(flow_id);
+ }
+ return st;
+ } else { // Flow to be replicated
+ OPENOLT_LOG(INFO, openolt_log_id,"not a symmetric flow and replication is needed\n");
+ if (pbit_to_gemport.size() != NUMBER_OF_PBITS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "invalid pbit-to-gemport map size=%lu", pbit_to_gemport.size())
+ return ::Status(grpc::StatusCode::OUT_OF_RANGE, "pbit-to-gemport-map-len-invalid-for-flow-replication");
+ }
+ uint16_t flow_ids[NUMBER_OF_REPLICATED_FLOWS];
+ device_flow dev_fl;
+ if (get_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids)) {
+ uint8_t cnt = 0;
+ dev_fl.is_flow_replicated = true;
+ dev_fl.voltha_flow_id = voltha_flow_id;
+ dev_fl.symmetric_voltha_flow_id = INVALID_FLOW_ID; // invalid
+ for (google::protobuf::Map<unsigned int, unsigned int>::const_iterator it=pbit_to_gemport.begin(); it!=pbit_to_gemport.end(); it++) {
+ dev_fl.params[cnt].flow_id = flow_ids[cnt];
+ dev_fl.params[cnt].pbit = it->first;
+ dev_fl.params[cnt].gemport_id = it->second;
+
+ ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+ flow_id = dev_fl.params[cnt].flow_id;
+ gemport_id = dev_fl.params[cnt].gemport_id;
+ cl.set_o_pbits(dev_fl.params[cnt].pbit);
+ Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+ flow_type, alloc_id, network_intf_id, gemport_id, cl,
+ action, priority, cookie, group_id, tech_profile_id);
+ if (st.error_code() != grpc::StatusCode::OK) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu. Undoing any device flows installed.", flow_id, voltha_flow_id);
+ // Remove any successfully replicated flows installed so far for the voltha_flow_id
+ if (cnt > 0) {
+ for (int8_t j = cnt-1; j >= 0; j--) {
+ flow_id = dev_fl.params[j].flow_id;
+ FlowRemove_(flow_id, flow_type);
+ }
+ }
+ // Free up all the flow IDs on failure
+ free_flow_ids(NUMBER_OF_REPLICATED_FLOWS, flow_ids);
+ return st;
+ }
+ cnt++;
+ }
+ // On successful flow replication update voltha-flow-id to device-flow map to cache
+ update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
+ } else {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocate flow ids for replication voltha-flow-id=%lu\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
+ }
+ }
+ }
+
+ return Status::OK;
+}
+
+
Status FlowAdd_(int32_t access_intf_id, int32_t onu_id, int32_t uni_id, uint32_t port_no,
uint32_t flow_id, const std::string flow_type,
int32_t alloc_id, int32_t network_intf_id,
@@ -1528,7 +1678,7 @@
if (action.cmd().trap_to_host()) {
Status resp = handle_acl_rule_install(onu_id, flow_id, flow_type, access_intf_id,
- network_intf_id, gemport_id, classifier);
+ network_intf_id, classifier);
return resp;
}
@@ -1569,18 +1719,7 @@
}
bcmos_fastlock_unlock(&data_lock, 0);
}
- if (gemport_id >= 0 && access_intf_id >= 0) {
- // Update the flow_to_acl_map. Note that since this is a datapath flow, acl_id is -1
- // This info is needed during flow remove where we need to retrieve the gemport_id
- // and access_intf id for the given flow id and flow direction.
- // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
- // reference counters for those ACL ID and GEMPORT ID.
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(-1, gemport_id, access_intf_id);
- flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
- bcmos_fastlock_lock(&data_lock);
- flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
- bcmos_fastlock_unlock(&data_lock, 0);
- }
+
if (priority_value >= 0) {
BCMOLT_MSG_FIELD_SET(&cfg, priority, priority_value);
}
@@ -1788,11 +1927,14 @@
BCMOLT_MSG_FIELD_SET(&cfg, state, BCMOLT_FLOW_STATE_ENABLE);
+#ifndef SCALE_AND_PERF
// BAL 3.1 supports statistics only for unicast flows.
if (key.flow_type != BCMOLT_FLOW_TYPE_MULTICAST) {
BCMOLT_MSG_FIELD_SET(&cfg, statistics, BCMOLT_CONTROL_STATE_ENABLE);
}
+#endif // SCALE_AND_PERF
+#ifndef SCALE_AND_PERF
#ifdef FLOW_CHECKER
//Flow Checker, To avoid duplicate flow.
if (flow_id_counters != 0) {
@@ -1838,7 +1980,8 @@
}
}
}
-#endif
+#endif // FLOW_CHECKER
+#endif // SCALE_AND_PERF
bcmos_errno err = bcmolt_cfg_set(dev_id, &cfg.hdr);
if (err) {
@@ -1849,25 +1992,45 @@
bcmos_fastlock_lock(&data_lock);
flow_map[std::pair<int, int>(key.flow_id,key.flow_type)] = flow_map.size();
flow_id_counters = flow_map.size();
- if (gemport_id > 0 && access_intf_id >= 0) {
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
- if (gem_ref_cnt.count(gem_intf) > 0) {
- // The gem port is already installed
- // Increment the ref counter indicating number of flows referencing this gem port
- gem_ref_cnt[gem_intf]++;
- OPENOLT_LOG(DEBUG, openolt_log_id, "incremented gem_ref_cnt, gem_ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
- } else {
- // Initialize the refence count for the gemport.
- gem_ref_cnt[gem_intf] = 1;
- OPENOLT_LOG(DEBUG, openolt_log_id, "initialized gem_ref_cnt\n");
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt flow_id=%d gemport_id=%d access_intf_id=%d\n", flow_id, gemport_id, access_intf_id);
- }
-
bcmos_fastlock_unlock(&data_lock, 0);
+
+ }
+ return Status::OK;
+}
+
+Status FlowRemoveWrapper_(const ::openolt::Flow* request) {
+ const std::string flow_type = request->flow_type();
+ uint64_t voltha_flow_id = request->flow_id();
+ Status st;
+
+ // If Voltha flow is not installed, return fail
+ if (! is_voltha_flow_installed(voltha_flow_id)) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "voltha_flow_id=%lu not found\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::NOT_FOUND, "voltha-flow-not-found");
}
+ const device_flow *dev_fl = get_device_flow(voltha_flow_id);
+ if (dev_fl == NULL) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "device flow for voltha_flow_id=%lu in the cache is NULL\n", voltha_flow_id);
+ return ::Status(grpc::StatusCode::INTERNAL, "device-flow-null-in-cache");
+ }
+ if (dev_fl->is_flow_replicated) {
+ // Note: Here we are ignoring FlowRemove failures
+ for (int i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
+ st = FlowRemove_(dev_fl->params[i].flow_id, flow_type);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ free_flow_id(dev_fl->params[i].flow_id);
+ }
+ }
+ } else {
+ // Note: Here we are ignoring FlowRemove failures
+ st = FlowRemove_(dev_fl->params[0].flow_id, flow_type);
+ if (st.error_code() == grpc::StatusCode::OK) {
+ free_flow_id(dev_fl->params[0].flow_id);
+ }
+ }
+ // remove the flow from cache on voltha flow removal
+ remove_voltha_flow_from_cache(voltha_flow_id);
return Status::OK;
}
@@ -1898,13 +2061,13 @@
int32_t intf_id = -1;
int16_t acl_id = -1;
if (flow_to_acl_map.count(fl_id_fl_dir) > 0) {
- acl_id_gem_id_intf_id ac_id_gm_id_if_id = flow_to_acl_map[fl_id_fl_dir];
- acl_id = std::get<0>(ac_id_gm_id_if_id);
- gemport_id = std::get<1>(ac_id_gm_id_if_id);
- intf_id = std::get<2>(ac_id_gm_id_if_id);
+
+ acl_id_intf_id ac_id_if_id = flow_to_acl_map[fl_id_fl_dir];
+ acl_id = std::get<0>(ac_id_if_id);
+ intf_id = std::get<1>(ac_id_if_id);
// cleanup acl only if it is a valid acl. If not valid acl, it may be datapath flow.
if (acl_id >= 0) {
- Status resp = handle_acl_rule_cleanup(acl_id, gemport_id, intf_id, flow_type);
+ Status resp = handle_acl_rule_cleanup(acl_id, intf_id, flow_type);
bcmos_fastlock_unlock(&data_lock, 0);
if (resp.ok()) {
OPENOLT_LOG(INFO, openolt_log_id, "acl removed ok for flow_id = %u with acl_id = %d\n", flow_id, acl_id);
@@ -1948,8 +2111,6 @@
}
OPENOLT_LOG(INFO, openolt_log_id, "Flow %d, %s removed\n", flow_id, flow_type.c_str());
- clear_gem_port(gemport_id, intf_id);
-
flow_to_acl_map.erase(fl_id_fl_dir);
bcmos_fastlock_unlock(&data_lock, 0);
@@ -2021,8 +2182,8 @@
}
bcmos_errno CreateSched(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
- uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
- tech_profile::SchedulingPolicy sched_policy, tech_profile::TrafficShapingInfo tf_sh_info,
+ uint32_t alloc_id, ::tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
+ ::tech_profile::SchedulingPolicy sched_policy, ::tech_profile::TrafficShapingInfo tf_sh_info,
uint32_t tech_profile_id) {
bcmos_errno err;
@@ -2191,13 +2352,14 @@
port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
return err;
}
-
+#ifndef SCALE_AND_PERF
err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_CREATE);
if (err) {
OPENOLT_LOG(ERROR, openolt_log_id, "Failed to create upstream bandwidth allocation, intf_id %d, onu_id %d, uni_id %d,\
port_no %u, alloc_id %d, err = %s\n", intf_id, onu_id,uni_id,port_no,alloc_id, bcmos_strerror(err));
return err;
}
+#endif
OPENOLT_LOG(INFO, openolt_log_id, "create upstream bandwidth allocation success, intf_id %d, onu_id %d, uni_id %d,\
port_no %u, alloc_id %d\n", intf_id, onu_id,uni_id,port_no,alloc_id);
@@ -2207,24 +2369,24 @@
return BCM_ERR_OK;
}
-Status CreateTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status CreateTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
uint32_t intf_id = traffic_scheds->intf_id();
uint32_t onu_id = traffic_scheds->onu_id();
uint32_t uni_id = traffic_scheds->uni_id();
uint32_t port_no = traffic_scheds->port_no();
std::string direction;
unsigned int alloc_id;
- tech_profile::SchedulerConfig sched_config;
- tech_profile::AdditionalBW additional_bw;
+ ::tech_profile::SchedulerConfig sched_config;
+ ::tech_profile::AdditionalBW additional_bw;
uint32_t priority;
uint32_t weight;
- tech_profile::SchedulingPolicy sched_policy;
- tech_profile::TrafficShapingInfo traffic_shaping_info;
+ ::tech_profile::SchedulingPolicy sched_policy;
+ ::tech_profile::TrafficShapingInfo traffic_shaping_info;
uint32_t tech_profile_id;
bcmos_errno err;
for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
- tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+ ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
direction = GetDirection(traffic_sched.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2273,6 +2435,7 @@
err = get_pon_interface_status((bcmolt_interface)intf_id, &state, &los_status);
if (err == BCM_ERR_OK) {
if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_OFF) {
+#ifndef SCALE_AND_PERF
OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled and LoS status is OFF, waiting for alloc cfg clear response\n",
intf_id);
err = wait_for_alloc_action(intf_id, alloc_id, ALLOC_OBJECT_DELETE);
@@ -2281,6 +2444,7 @@
direction.c_str(), intf_id, alloc_id, bcmos_strerror(err));
return err;
}
+#endif
}
else if (state == BCMOLT_INTERFACE_STATE_ACTIVE_WORKING && los_status == BCMOLT_STATUS_ON) {
OPENOLT_LOG(INFO, openolt_log_id, "PON interface: %d is enabled but LoS status is ON, not waiting for alloc cfg clear response\n",
@@ -2322,7 +2486,7 @@
return BCM_ERR_OK;
}
-Status RemoveTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+Status RemoveTrafficSchedulers_(const ::tech_profile::TrafficSchedulers *traffic_scheds) {
uint32_t intf_id = traffic_scheds->intf_id();
uint32_t onu_id = traffic_scheds->onu_id();
uint32_t uni_id = traffic_scheds->uni_id();
@@ -2331,7 +2495,7 @@
bcmos_errno err;
for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
- tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+ ::tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
direction = GetDirection(traffic_sched.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2483,12 +2647,20 @@
return err;
}
+ if (direction.compare(upstream) == 0) {
+ Status st = install_gem_port(access_intf_id, onu_id, gemport_id);
+ if (st.error_code() != grpc::StatusCode::ALREADY_EXISTS && st.error_code() != grpc::StatusCode::OK) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to created gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
+ return BCM_ERR_INTERNAL;
+ }
+ }
+
OPENOLT_LOG(INFO, openolt_log_id, "Created tm_queue, direction %s, id %d, sched_id %d, tm_q_set_id %d, \
intf_id %d, onu_id %d, uni_id %d, tech_profiled_id %d\n", direction.c_str(), key.id, key.sched_id, key.tm_q_set_id, access_intf_id, onu_id, uni_id, tech_profile_id);
return BCM_ERR_OK;
}
-Status CreateTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status CreateTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
uint32_t intf_id = traffic_queues->intf_id();
uint32_t onu_id = traffic_queues->onu_id();
uint32_t uni_id = traffic_queues->uni_id();
@@ -2502,7 +2674,7 @@
uint32_t queues_priority_q[traffic_queues->traffic_queues_size()] = {0};
std::string queues_pbit_map[traffic_queues->traffic_queues_size()];
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2532,7 +2704,7 @@
}
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2556,6 +2728,11 @@
bcmos_errno err;
if (direction == downstream) {
+ // Gem ports are HSI/VoIP/VoD traffic is bidirectional. Multicast gem-port are downstream only. Since the common direction between
+ // bidirectional traffic and multicast gem-port is "downstream" direction and we need to remove the gemport for a given gemport id only once,
+ // we remove the gem port when downstream direction queue is getting removed.
+ remove_gem_port(access_intf_id, gemport_id);
+
if (is_tm_sched_id_present(access_intf_id, onu_id, uni_id, direction, tech_profile_id)) {
key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction, tech_profile_id);
key.id = queue_id_list[priority];
@@ -2596,7 +2773,7 @@
return BCM_ERR_OK;
}
-Status RemoveTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+Status RemoveTrafficQueues_(const ::tech_profile::TrafficQueues *traffic_queues) {
uint32_t intf_id = traffic_queues->intf_id();
uint32_t onu_id = traffic_queues->onu_id();
uint32_t uni_id = traffic_queues->uni_id();
@@ -2608,7 +2785,7 @@
bcmolt_egress_qos_type qos_type = get_qos_type(intf_id, onu_id, uni_id, traffic_queues->traffic_queues_size());
for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
- tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+ ::tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
direction = GetDirection(traffic_queue.direction());
if (direction.compare("direction-not-supported") == 0)
@@ -2638,7 +2815,7 @@
return Status::OK;
}
-Status PerformGroupOperation_(const openolt::Group *group_cfg) {
+Status PerformGroupOperation_(const ::openolt::Group *group_cfg) {
bcmos_errno err;
bcmolt_group_key key = {};
@@ -2748,17 +2925,17 @@
}
/* SET GROUP MEMBERS UPDATE COMMAND */
- openolt::Group::GroupMembersCommand command = group_cfg->command();
+ ::openolt::Group::GroupMembersCommand command = group_cfg->command();
switch(command) {
- case openolt::Group::SET_MEMBERS :
+ case ::openolt::Group::SET_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_SET;
OPENOLT_LOG(INFO, openolt_log_id, "Setting %d members for Group %d.\n", members.len, group_id);
break;
- case openolt::Group::ADD_MEMBERS :
+ case ::openolt::Group::ADD_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_ADD;
OPENOLT_LOG(INFO, openolt_log_id, "Adding %d members to Group %d.\n", members.len, group_id);
break;
- case openolt::Group::REMOVE_MEMBERS :
+ case ::openolt::Group::REMOVE_MEMBERS :
grp_mem_upd_cmd = BCMOLT_MEMBERS_UPDATE_COMMAND_REMOVE;
OPENOLT_LOG(INFO, openolt_log_id, "Removing %d members from Group %d.\n", members.len, group_id);
break;
@@ -2771,26 +2948,26 @@
// SET MEMBERS LIST
for (int i = 0; i < members.len; i++) {
- if (command == openolt::Group::REMOVE_MEMBERS) {
+ if (command == ::openolt::Group::REMOVE_MEMBERS) {
OPENOLT_LOG(INFO, openolt_log_id, "Removing group member %d from group %d\n",i,key.id);
} else {
OPENOLT_LOG(INFO, openolt_log_id, "Adding group member %d to group %d\n",i,key.id);
}
- openolt::GroupMember *member = (openolt::GroupMember *) &group_cfg->members()[i];
+ ::openolt::GroupMember *member = (::openolt::GroupMember *) &group_cfg->members()[i];
// Set member interface type
- openolt::GroupMember::InterfaceType if_type = member->interface_type();
+ ::openolt::GroupMember::InterfaceType if_type = member->interface_type();
switch(if_type){
- case openolt::GroupMember::PON :
+ case ::openolt::GroupMember::PON :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_PON);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type PON is assigned to GroupMember %d\n",i);
break;
- case openolt::GroupMember::EPON_1G_PATH :
+ case ::openolt::GroupMember::EPON_1G_PATH :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_1_G);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_1G is assigned to GroupMember %d\n",i);
break;
- case openolt::GroupMember::EPON_10G_PATH :
+ case ::openolt::GroupMember::EPON_10G_PATH :
BCMOLT_FIELD_SET(&interface_ref, intf_ref, intf_type, BCMOLT_INTERFACE_TYPE_EPON_10_G);
OPENOLT_LOG(INFO, openolt_log_id, "Interface type EPON_10G is assigned to GroupMember %d\n",i);
break;
@@ -2907,7 +3084,7 @@
return Status::OK;
}
-Status GetLogicalOnuDistanceZero_(uint32_t intf_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistanceZero_(uint32_t intf_id, ::openolt::OnuLogicalDistance* response) {
bcmos_errno err = BCM_ERR_OK;
uint32_t mld = 0;
double LD0;
@@ -2925,7 +3102,7 @@
return Status::OK;
}
-Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, openolt::OnuLogicalDistance* response) {
+Status GetLogicalOnuDistance_(uint32_t intf_id, uint32_t onu_id, ::openolt::OnuLogicalDistance* response) {
bcmos_errno err = BCM_ERR_OK;
bcmolt_itu_onu_params itu = {};
bcmolt_onu_cfg onu_cfg;
diff --git a/agent/src/core_data.cc b/agent/src/core_data.cc
index 4f2be6d..1836263 100644
--- a/agent/src/core_data.cc
+++ b/agent/src/core_data.cc
@@ -146,9 +146,10 @@
(a2.ether_type + 2*a2.ip_proto + 3*a2.src_port + 4*a2.dst_port));
}
-typedef std::tuple<uint16_t, std::string> flow_id_flow_direction;
-typedef std::tuple<int16_t, uint16_t, int32_t> acl_id_gem_id_intf_id;
-std::map<flow_id_flow_direction, acl_id_gem_id_intf_id> flow_to_acl_map;
+typedef std::tuple<uint64_t, std::string> flow_id_flow_direction;
+
+typedef std::tuple<int16_t, int32_t> acl_id_intf_id;
+std::map<flow_id_flow_direction, acl_id_intf_id> flow_to_acl_map;
// Keeps a reference count of how many flows are referencing a given ACL ID.
// Key represents the ACL-ID and value is number of flows referencing the given ACL-ID.
@@ -156,12 +157,6 @@
// When there are no flows referencing the ACL-ID, the ACL should be removed.
std::map<uint16_t, uint16_t> acl_ref_cnt;
-typedef std::tuple<uint16_t, uint16_t> gem_id_intf_id; // key to gem_ref_cnt
-// Keeps a reference count of how many ACL related flows are referencing a given (gem-id, pon_intf_id).
-// When there is at least on flow, we should install the gem. When there are no flows
-// the gem should be removed.
-std::map<gem_id_intf_id, uint16_t> gem_ref_cnt;
-
// Needed to keep track of how many flows for a given acl_id, intf_id and intf_type are
// installed. When there is at least on flow for this key, we should have interface registered
// for the given ACL-ID. When there are no flows, the intf should unregister itself from
@@ -170,13 +165,29 @@
std::map<acl_id_intf_id_intf_type, uint16_t> intf_acl_registration_ref_cnt;
std::bitset<MAX_ACL_ID> acl_id_bitset;
+bcmos_fastlock acl_id_bitset_lock;
/*** ACL Handling related data end ***/
+// Used to manage a pool of Scheduler IDs
std::bitset<MAX_TM_SCHED_ID> tm_sched_bitset;
-std::bitset<MAX_TM_QMP_ID> tm_qmp_bitset;
+bcmos_fastlock tm_sched_bitset_lock;
-// Lock used to gaurd critical section during various API handling at the core_api_handler
+// Used to manage a pool of TM_QMP IDs
+std::bitset<MAX_TM_QMP_ID> tm_qmp_bitset;
+bcmos_fastlock tm_qmp_bitset_lock;
+
+// Used to manage a pool of Flow IDs
+std::bitset<MAX_FLOW_ID> flow_id_bitset;
+bcmos_fastlock flow_id_bitset_lock;
+
+// Maps voltha flow-id to device flow
+std::map<uint64_t, device_flow> voltha_flow_to_device_flow;
+bcmos_fastlock voltha_flow_to_device_flow_lock;
+
+
+// General purpose lock used to gaurd critical section during various API handling at the core_api_handler
bcmos_fastlock data_lock;
+
char* grpc_server_interface_name = NULL;
diff --git a/agent/src/core_data.h b/agent/src/core_data.h
index d0761c5..322c834 100644
--- a/agent/src/core_data.h
+++ b/agent/src/core_data.h
@@ -20,6 +20,7 @@
#include "core.h"
#include "Queue.h"
+#include "device.h"
extern "C"
{
@@ -43,8 +44,6 @@
#define ONU_DEACTIVATE_COMPLETE_WAIT_TIMEOUT 5000 // in milli-seconds
-#define MIN_ALLOC_ID_GPON 256
-#define MIN_ALLOC_ID_XGSPON 1024
#define MAX_ACL_ID 33
@@ -132,6 +131,23 @@
// above elements.
} acl_classifier_key;
+typedef struct device_flow_params {
+ uint8_t pbit; // If pbit classifier is not present in flow, use 0xff to invalidate
+ uint32_t gemport_id;
+ uint16_t flow_id;
+} device_flow_params;
+
+typedef struct device_flow {
+ bool is_flow_replicated; // If true number of replicated flows is to the NUMBER_OF_PBITS, else 1
+ device_flow_params params[NUMBER_OF_REPLICATED_FLOWS]; // A voltha-flow cannot be replicated more than the number of pbits
+ uint64_t voltha_flow_id; // This is corresponding voltha flow-id.
+ uint64_t symmetric_voltha_flow_id; // Is not applicable for trap-to-controller voltha flows.
+ // 0 value means invalid or not-applicable
+ // Applicable for bi-directional data path flows (one flow per direction)
+ // Symmetric flows should share the same device_flow_id.
+
+} device_flow;
+
// *******************************************************//
// Extern Variable/Constant declarations used by the core //
// *******************************************************//
@@ -215,9 +231,9 @@
extern std::map<acl_classifier_key, uint16_t> acl_classifier_to_acl_id_map;
extern bool operator<(const acl_classifier_key& a1, const acl_classifier_key& a2);
-typedef std::tuple<uint16_t, std::string> flow_id_flow_direction;
-typedef std::tuple<int16_t, uint16_t, int32_t> acl_id_gem_id_intf_id;
-extern std::map<flow_id_flow_direction, acl_id_gem_id_intf_id> flow_to_acl_map;
+typedef std::tuple<uint64_t, std::string> flow_id_flow_direction;
+typedef std::tuple<int16_t, int32_t> acl_id_intf_id;
+extern std::map<flow_id_flow_direction, acl_id_intf_id> flow_to_acl_map;
// Keeps a reference count of how many flows are referencing a given ACL ID.
// Key represents the ACL-ID and value is number of flows referencing the given ACL-ID.
@@ -225,12 +241,6 @@
// When there are no flows referencing the ACL-ID, the ACL should be removed.
extern std::map<uint16_t, uint16_t> acl_ref_cnt;
-typedef std::tuple<uint16_t, uint16_t> gem_id_intf_id; // key to gem_ref_cnt
-// Keeps a reference count of how many ACL related flows are referencing a given (gem-id, pon_intf_id).
-// When there is at least on flow, we should install the gem. When there are no flows
-// the gem should be removed.
-extern std::map<gem_id_intf_id, uint16_t> gem_ref_cnt;
-
// Needed to keep track of how many flows for a given acl_id, intf_id and intf_type are
// installed. When there is at least on flow for this key, we should have interface registered
// for the given ACL-ID. When there are no flows, the intf should unregister itself from
@@ -239,14 +249,26 @@
extern std::map<acl_id_intf_id_intf_type, uint16_t> intf_acl_registration_ref_cnt;
extern std::bitset<MAX_ACL_ID> acl_id_bitset;
+extern bcmos_fastlock acl_id_bitset_lock;
/*** ACL Handling related data end ***/
extern std::bitset<MAX_TM_SCHED_ID> tm_sched_bitset;
+extern bcmos_fastlock tm_sched_bitset_lock;
+
extern std::bitset<MAX_TM_QMP_ID> tm_qmp_bitset;
+extern bcmos_fastlock tm_qmp_bitset_lock;
+
+extern std::bitset<MAX_FLOW_ID> flow_id_bitset;
+extern bcmos_fastlock flow_id_bitset_lock;
+
+extern std::map<uint64_t, device_flow> voltha_flow_to_device_flow;
+extern bcmos_fastlock voltha_flow_to_device_flow_lock;
extern Queue<openolt::Indication> oltIndQ;
+/*** ACL Handling related data end ***/
+
extern bcmos_fastlock data_lock;
// Interface name on which grpc server is running on
diff --git a/agent/src/core_utils.cc b/agent/src/core_utils.cc
index 98ef6a2..6e1b17c 100644
--- a/agent/src/core_utils.cc
+++ b/agent/src/core_utils.cc
@@ -99,15 +99,17 @@
sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
int sched_id = -1;
+ bcmos_fastlock_lock(&tm_sched_bitset_lock);
+
std::map<sched_map_key_tuple, int>::const_iterator it = sched_map.find(key);
if (it != sched_map.end()) {
sched_id = it->second;
}
if (sched_id != -1) {
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return sched_id;
}
- bcmos_fastlock_lock(&data_lock);
// Complexity of O(n). Is there better way that can avoid linear search?
for (sched_id = 0; sched_id < MAX_TM_SCHED_ID; sched_id++) {
if (tm_sched_bitset[sched_id] == 0) {
@@ -115,14 +117,13 @@
break;
}
}
- bcmos_fastlock_unlock(&data_lock, 0);
if (sched_id < MAX_TM_SCHED_ID) {
- bcmos_fastlock_lock(&data_lock);
sched_map[key] = sched_id;
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return sched_id;
} else {
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
return -1;
}
}
@@ -140,13 +141,13 @@
void free_tm_sched_id(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction, tech_profile_id);
std::map<sched_map_key_tuple, int>::const_iterator it;
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_sched_bitset_lock);
it = sched_map.find(key);
if (it != sched_map.end()) {
tm_sched_bitset[it->second] = 0;
sched_map.erase(it);
}
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_sched_bitset_lock, 0);
}
bool is_tm_sched_id_present(int pon_intf_id, int onu_id, int uni_id, std::string direction, int tech_profile_id) {
@@ -248,10 +249,10 @@
*/
void update_sched_qmp_id_map(uint32_t sched_id,uint32_t pon_intf_id, uint32_t onu_id, \
uint32_t uni_id, int tm_qmp_id) {
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
sched_qmp_id_map.insert(make_pair(key, tm_qmp_id));
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
}
/**
@@ -292,7 +293,7 @@
std::vector<uint32_t> tmq_map_profile) {
int tm_qmp_id;
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
/* Complexity of O(n). Is there better way that can avoid linear search? */
for (tm_qmp_id = 0; tm_qmp_id < MAX_TM_QMP_ID; tm_qmp_id++) {
if (tm_qmp_bitset[tm_qmp_id] == 0) {
@@ -300,15 +301,14 @@
break;
}
}
- bcmos_fastlock_unlock(&data_lock, 0);
if (tm_qmp_id < MAX_TM_QMP_ID) {
- bcmos_fastlock_lock(&data_lock);
qmp_id_to_qmp_map.insert(make_pair(tm_qmp_id, tmq_map_profile));
- bcmos_fastlock_unlock(&data_lock, 0);
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
update_sched_qmp_id_map(sched_id, pon_intf_id, onu_id, uni_id, tm_qmp_id);
return tm_qmp_id;
} else {
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
return -1;
}
}
@@ -329,11 +329,10 @@
bool result;
sched_qmp_id_map_key_tuple key(sched_id, pon_intf_id, onu_id, uni_id);
std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it = sched_qmp_id_map.find(key);
- bcmos_fastlock_lock(&data_lock);
+ bcmos_fastlock_lock(&tm_qmp_bitset_lock);
if (it != sched_qmp_id_map.end()) {
sched_qmp_id_map.erase(it);
}
- bcmos_fastlock_unlock(&data_lock, 0);
uint32_t tm_qmp_ref_count = 0;
std::map<sched_qmp_id_map_key_tuple, int>::const_iterator it2 = sched_qmp_id_map.begin();
@@ -347,10 +346,8 @@
if (tm_qmp_ref_count == 0) {
std::map<int, std::vector < uint32_t > >::const_iterator it3 = qmp_id_to_qmp_map.find(tm_qmp_id);
if (it3 != qmp_id_to_qmp_map.end()) {
- bcmos_fastlock_lock(&data_lock);
tm_qmp_bitset[tm_qmp_id] = 0;
qmp_id_to_qmp_map.erase(it3);
- bcmos_fastlock_unlock(&data_lock, 0);
OPENOLT_LOG(INFO, openolt_log_id, "Reference count for tm qmp profile id %d is : %d. So clearing it\n", \
tm_qmp_id, tm_qmp_ref_count);
result = true;
@@ -360,6 +357,8 @@
tm_qmp_id, tm_qmp_ref_count);
result = false;
}
+ bcmos_fastlock_unlock(&tm_qmp_bitset_lock, 0);
+
return result;
}
@@ -368,6 +367,7 @@
int get_acl_id() {
int acl_id;
+ bcmos_fastlock_lock(&acl_id_bitset_lock);
/* Complexity of O(n). Is there better way that can avoid linear search? */
for (acl_id = 0; acl_id < MAX_ACL_ID; acl_id++) {
if (acl_id_bitset[acl_id] == 0) {
@@ -375,6 +375,8 @@
break;
}
}
+ bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+
if (acl_id < MAX_ACL_ID) {
return acl_id ;
} else {
@@ -385,9 +387,90 @@
/* ACL ID is a shared resource, caller of this function has to ensure atomicity using locks
Frees up the ACL ID. */
void free_acl_id (int acl_id) {
+ bcmos_fastlock_lock(&acl_id_bitset_lock);
if (acl_id < MAX_ACL_ID) {
acl_id_bitset[acl_id] = 0;
}
+ bcmos_fastlock_unlock(&acl_id_bitset_lock, 0);
+}
+
+/* Gets a free Flow ID if available, else INVALID_FLOW_ID */
+uint16_t get_flow_id() {
+ uint16_t flow_id;
+
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ /* Complexity of O(n). Is there better way that can avoid linear search? */
+ // start flow_id from 1 as 0 is invalid
+ for (flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END; flow_id++) {
+ if (flow_id_bitset[flow_id] == 0) {
+ flow_id_bitset[flow_id] = 1;
+ break;
+ }
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+
+ if (flow_id <= MAX_FLOW_ID) {
+ return flow_id ;
+ } else {
+ return INVALID_FLOW_ID;
+ }
+}
+
+/* Gets requested number of Flow IDs.
+ 'num_of_flow_ids' is number of flow_ids requested. This cannot be more than NUMBER_OF_PBITS
+ 'flow_ids' is pointer to array of size NUMBER_OF_PBITS
+ If the operation is successful, returns true else false
+ The operation is successful if we can allocate fully the number of flow_ids requested.
+ */
+bool get_flow_ids(int num_of_flow_ids, uint16_t *flow_ids) {
+ if (num_of_flow_ids > NUMBER_OF_PBITS) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "requested number of flow_ids is more than 8\n");
+ return false;
+ }
+ int cnt = 0;
+
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ /* Complexity of O(n). Is there better way that can avoid linear search? */
+ // start flow_id from 1 as 0 is invalid
+ for (uint16_t flow_id = FLOW_ID_START; flow_id <= FLOW_ID_END && cnt < num_of_flow_ids; flow_id++) {
+ if (flow_id_bitset[flow_id] == 0) {
+ flow_id_bitset[flow_id] = 1;
+ flow_ids[cnt] = flow_id;
+ cnt++;
+ }
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+ // If we could not allocate the requested number of flow_ids free the allocated flow_ids
+ // and return false
+ if (cnt != num_of_flow_ids) {
+ OPENOLT_LOG(ERROR, openolt_log_id, "could not allocated the rquested number of flows ids. requested=%d, allocated=%d", num_of_flow_ids, cnt);
+ if (cnt > 0) {
+ for(int i=0; i < cnt; i++) {
+ free_flow_id(flow_ids[i]);
+ }
+ }
+ return false;
+ }
+ return true;
+}
+
+/* Frees up the FLOW ID. */
+void free_flow_id (uint16_t flow_id) {
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ if (flow_id <= MAX_FLOW_ID) {
+ flow_id_bitset[flow_id] = 0;
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+}
+
+void free_flow_ids(uint8_t num_flows, uint16_t *flow_ids) {
+ for (uint8_t i = 0; i < num_flows; i++) {
+ bcmos_fastlock_lock(&flow_id_bitset_lock);
+ if (flow_ids[i] <= MAX_FLOW_ID) {
+ flow_id_bitset[flow_ids[i]] = 0;
+ }
+ bcmos_fastlock_unlock(&flow_id_bitset_lock, 0);
+ }
}
/**
@@ -597,8 +680,8 @@
Status pushOltOperInd(uint32_t intf_id, const char *type, const char *state)
{
- openolt::Indication ind;
- openolt::IntfOperIndication* intf_oper_ind = new openolt::IntfOperIndication;
+ ::openolt::Indication ind;
+ ::openolt::IntfOperIndication* intf_oper_ind = new ::openolt::IntfOperIndication;
intf_oper_ind->set_type(type);
intf_oper_ind->set_intf_id(intf_id);
@@ -835,7 +918,7 @@
err = bcmolt_cfg_set(dev_id, &cfg.hdr);
if(err != BCM_ERR_OK) {
- OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d\n", gemport_id);
+ OPENOLT_LOG(ERROR, openolt_log_id, "failed to install gem_port = %d err_text=%s\n", gemport_id, cfg.hdr.hdr.err_text);
return bcm_to_grpc_err(err, "Access_Control set ITU PON Gem port failed");
}
@@ -930,7 +1013,6 @@
bcmolt_classifier c_val = { };
// hardcode the action for now.
bcmolt_access_control_fwd_action_type action_type = BCMOLT_ACCESS_CONTROL_FWD_ACTION_TYPE_TRAP_TO_HOST;
-
int acl_id = get_acl_id();
if (acl_id < 0) {
OPENOLT_LOG(ERROR, openolt_log_id, "exhausted acl_id for eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d\n",
@@ -1051,9 +1133,9 @@
} else key->dst_port = -1;
}
-Status handle_acl_rule_install(int32_t onu_id, uint32_t flow_id,
+Status handle_acl_rule_install(int32_t onu_id, uint64_t flow_id,
const std::string flow_type, int32_t access_intf_id,
- int32_t network_intf_id, int32_t gemport_id,
+ int32_t network_intf_id,
const ::openolt::Classifier& classifier) {
int acl_id;
int32_t intf_id = flow_type.compare(upstream) == 0? access_intf_id: network_intf_id;
@@ -1064,19 +1146,19 @@
// few map keys we are going to use later.
flow_id_flow_direction fl_id_fl_dir(flow_id, flow_type);
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
+
acl_classifier_key acl_key;
formulate_acl_classifier_key(&acl_key, classifier);
const acl_classifier_key acl_key_const = {.ether_type=acl_key.ether_type, .ip_proto=acl_key.ip_proto,
.src_port=acl_key.src_port, .dst_port=acl_key.dst_port};
-
bcmos_fastlock_lock(&data_lock);
// Check if the acl is already installed
if (acl_classifier_to_acl_id_map.count(acl_key_const) > 0) {
// retreive the acl_id
acl_id = acl_classifier_to_acl_id_map[acl_key_const];
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
+
+
if (flow_to_acl_map.count(fl_id_fl_dir)) {
// coult happen if same trap flow is received again
OPENOLT_LOG(INFO, openolt_log_id, "flow and related acl already handled, nothing more to do\n");
@@ -1084,7 +1166,7 @@
return Status::OK;
}
- OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
+ OPENOLT_LOG(INFO, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d already installed with acl id = %u\n",
flow_id, acl_key.ether_type, acl_key.ip_proto, acl_key.src_port, acl_key.dst_port, acl_id);
// The acl_ref_cnt is needed to know how many flows refer an ACL.
@@ -1101,7 +1183,7 @@
} else {
resp = install_acl(acl_key_const);
if (!resp.ok()) {
- OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%u with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
+ OPENOLT_LOG(ERROR, openolt_log_id, "Acl for flow_id=%lu with eth_type = %d, ip_proto = %d, src_port = %d, dst_port = %d failed\n",
flow_id, acl_key_const.ether_type, acl_key_const.ip_proto, acl_key_const.src_port, acl_key_const.dst_port);
bcmos_fastlock_unlock(&data_lock, 0);
return resp;
@@ -1112,7 +1194,7 @@
// Initialize the acl reference count
acl_ref_cnt[acl_id] = 1;
- OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%u with acl_id=%d\n", flow_id, acl_id);
+ OPENOLT_LOG(INFO, openolt_log_id, "acl add success for flow_id=%lu with acl_id=%d\n", flow_id, acl_id);
}
// Register the interface for the given acl
@@ -1128,70 +1210,20 @@
OPENOLT_LOG(ERROR, openolt_log_id, "failed to update acl interfaces intf_id=%d, intf_type=%s, acl_id=%d", intf_id, intf_type.c_str(), acl_id);
// TODO: Ideally we should return error from hear and clean up other other stateful
// counters we creaed earlier. Will leave it out for now.
- }
+ }
intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type] = 1;
}
-
- // Install the gem port if needed.
- if (gemport_id > 0 && access_intf_id >= 0) {
- if (gem_ref_cnt.count(gem_intf) > 0) {
- // The gem port is already installed
- // Increment the ref counter indicating number of flows referencing this gem port
- gem_ref_cnt[gem_intf]++;
- OPENOLT_LOG(DEBUG, openolt_log_id, "increment gem_ref_cnt in acl handler, ref_cnt=%d\n", gem_ref_cnt[gem_intf]);
-
- } else {
- // We should ideally never land here. The gem port should have been created the
- // first time ACL was installed.
- // Install the gem port
- Status resp = install_gem_port(access_intf_id, onu_id, gemport_id);
- if (!resp.ok()) {
- // TODO: We might need to reverse all previous data, but leave it out for now.
- OPENOLT_LOG(ERROR, openolt_log_id, "failed to install the gemport=%d for acl_id=%d, intf_id=%d\n", gemport_id, acl_id, access_intf_id);
- bcmos_fastlock_unlock(&data_lock, 0);
- return resp;
- }
- // Initialize the refence count for the gemport.
- gem_ref_cnt[gem_intf] = 1;
- OPENOLT_LOG(DEBUG, openolt_log_id, "intialized gem ref count in acl handler\n");
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not incrementing gem_ref_cnt in acl handler flow_id=%d, gemport_id=%d, intf_id=%d\n", flow_id, gemport_id, access_intf_id);
- }
-
- // Update the flow_to_acl_map
- // This info is needed during flow remove. We need to which ACL ID and GEM PORT ID
- // the flow was referring to.
- // After retrieving the ACL ID and GEM PORT ID, we decrement the corresponding
- // reference counters for those ACL ID and GEMPORT ID.
- acl_id_gem_id_intf_id ac_id_gm_id_if_id(acl_id, gemport_id, intf_id);
- flow_to_acl_map[fl_id_fl_dir] = ac_id_gm_id_if_id;
+ acl_id_intf_id ac_id_if_id(acl_id, intf_id);
+ flow_to_acl_map[fl_id_fl_dir] = ac_id_if_id;
bcmos_fastlock_unlock(&data_lock, 0);
return Status::OK;
}
-void clear_gem_port(int gemport_id, int access_intf_id) {
- gem_id_intf_id gem_intf(gemport_id, access_intf_id);
- if (gemport_id > 0 && access_intf_id >= 0 && gem_ref_cnt.count(gem_intf) > 0) {
- OPENOLT_LOG(DEBUG, openolt_log_id, "decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- gem_ref_cnt[gem_intf]--;
- if (gem_ref_cnt[gem_intf] == 0) {
- // For datapath flow this may not be necessary (to be verified)
- remove_gem_port(access_intf_id, gemport_id);
- gem_ref_cnt.erase(gem_intf);
- OPENOLT_LOG(DEBUG, openolt_log_id, "removing gem_ref_cnt entry gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "gem_ref_cnt not zero yet gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- }
- } else {
- OPENOLT_LOG(DEBUG, openolt_log_id, "not decrementing gem_ref_cnt gemport_id=%d access_intf_id=%d\n", gemport_id, access_intf_id);
- }
-}
-
-Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+//Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type) {
+Status handle_acl_rule_cleanup(int16_t acl_id, int32_t intf_id, const std::string flow_type) {
const std::string intf_type= flow_type.compare(upstream) == 0 ? "pon": "nni";
acl_id_intf_id_intf_type ac_id_inf_id_inf_type(acl_id, intf_id, intf_type);
intf_acl_registration_ref_cnt[ac_id_inf_id_inf_type]--;
@@ -1219,8 +1251,6 @@
}
}
- clear_gem_port(gemport_id, intf_id);
-
return Status::OK;
}
@@ -1369,3 +1399,52 @@
return mac_address;
}
+
+void update_voltha_flow_to_cache(uint64_t voltha_flow_id, device_flow dev_flow) {
+ OPENOLT_LOG(DEBUG, openolt_log_id, "updating voltha flow=%lu to cache\n", voltha_flow_id)
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ voltha_flow_to_device_flow[voltha_flow_id] = dev_flow;
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+void remove_voltha_flow_from_cache(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ voltha_flow_to_device_flow.erase(it);
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+}
+
+bool is_voltha_flow_installed(uint64_t voltha_flow_id ) {
+ int count;
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ count = voltha_flow_to_device_flow.count(voltha_flow_id);
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return count > 0 ? true : false;
+}
+
+const device_flow_params* get_device_flow_params(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+ return it->second.params;
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return NULL;
+}
+
+const device_flow* get_device_flow(uint64_t voltha_flow_id) {
+ bcmos_fastlock_lock(&voltha_flow_to_device_flow_lock);
+ std::map<uint64_t, device_flow>::const_iterator it = voltha_flow_to_device_flow.find(voltha_flow_id);
+ if (it != voltha_flow_to_device_flow.end()) {
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+ return &it->second;
+ }
+ bcmos_fastlock_unlock(&voltha_flow_to_device_flow_lock, 0);
+
+ return NULL;
+}
diff --git a/agent/src/core_utils.h b/agent/src/core_utils.h
index 4a4d73f..e4eec7f 100644
--- a/agent/src/core_utils.h
+++ b/agent/src/core_utils.h
@@ -67,6 +67,10 @@
uint32_t uni_id, int tm_qmp_id);
int get_acl_id();
void free_acl_id (int acl_id);
+uint16_t get_flow_id();
+bool get_flow_ids(int num_of_flow_ids, uint16_t *flow_ids);
+void free_flow_id (uint16_t flow_id);
+void free_flow_ids(uint8_t num_flows, uint16_t *flow_ids);
std::string get_qos_type_as_string(bcmolt_egress_qos_type qos_type);
bcmolt_egress_qos_type get_qos_type(uint32_t pon_intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t queue_size=0);
void clear_qos_type(uint32_t pon_intf_id, uint32_t onu_id, uint32_t uni_id);
@@ -94,15 +98,20 @@
Status install_acl(const acl_classifier_key acl_key);
Status remove_acl(int acl_id);
void formulate_acl_classifier_key(acl_classifier_key *key, const ::openolt::Classifier& classifier);
-Status handle_acl_rule_install(int32_t onu_id, uint32_t flow_id,
+Status handle_acl_rule_install(int32_t onu_id, uint64_t flow_id,
const std::string flow_type, int32_t access_intf_id,
- int32_t network_intf_id, int32_t gemport_id,
+ int32_t network_intf_id,
const ::openolt::Classifier& classifier);
void clear_gem_port(int gemport_id, int access_intf_id);
-Status handle_acl_rule_cleanup(int16_t acl_id, int32_t gemport_id, int32_t intf_id, const std::string flow_type);
+Status handle_acl_rule_cleanup(int16_t acl_id, int32_t intf_id, const std::string flow_type);
Status check_bal_ready();
Status check_connection();
std::string get_ip_address(const char* nw_intf);
bcmos_errno getOnuMaxLogicalDistance(uint32_t intf_id, uint32_t *mld);
char* get_intf_mac(const char* intf_name, char* mac_address, unsigned int max_size_of_mac_address);
+void update_voltha_flow_to_cache(uint64_t voltha_flow_id, device_flow dev_flow);
+void remove_voltha_flow_from_cache(uint64_t voltha_flow_id);
+bool is_voltha_flow_installed(uint64_t voltha_flow_id );
+const device_flow* get_device_flow(uint64_t voltha_flow_id);
+const device_flow_params* get_device_flow_params(uint64_t voltha_flow_id);
#endif // OPENOLT_CORE_UTILS_H_
diff --git a/agent/src/indications.cc b/agent/src/indications.cc
index 1939e60..03f81fe 100644
--- a/agent/src/indications.cc
+++ b/agent/src/indications.cc
@@ -1292,6 +1292,7 @@
if(rc != BCM_ERR_OK)
return Status(grpc::StatusCode::INTERNAL, "Packet indication subscribe failed");
+#ifndef SCALE_AND_PERF
rx_cfg.obj_type = BCMOLT_OBJ_ID_ITUPON_ALLOC;
rx_cfg.rx_cb = ItuPonAllocConfigCompletedInd;
rx_cfg.flags = BCMOLT_AUTO_FLAGS_NONE;
@@ -1299,6 +1300,7 @@
rc = bcmolt_ind_subscribe(current_device, &rx_cfg);
if(rc != BCM_ERR_OK)
return Status(grpc::StatusCode::INTERNAL, "ITU PON Alloc Configuration Complete Indication subscribe failed");
+#endif
rx_cfg.obj_type = BCMOLT_OBJ_ID_GROUP;
rx_cfg.rx_cb = GroupIndication;
diff --git a/agent/src/stats_collection.cc b/agent/src/stats_collection.cc
index 4979366..5b7063a 100644
--- a/agent/src/stats_collection.cc
+++ b/agent/src/stats_collection.cc
@@ -245,7 +245,7 @@
openolt::PortStatistics* port_stats =
collectPortStatistics(intf_ref);
- openolt::Indication ind;
+ ::openolt::Indication ind;
ind.set_allocated_port_stats(port_stats);
oltIndQ.push(ind);
}
@@ -258,7 +258,7 @@
openolt::PortStatistics* port_stats =
collectPortStatistics(intf_ref);
- openolt::Indication ind;
+ ::openolt::Indication ind;
ind.set_allocated_port_stats(port_stats);
oltIndQ.push(ind);
}