VOL-4077: Improve storage usage
- send onu, uni port number information in packet indications
  to VOLTHA
- adjust the resource id limits to be realistic enough for
  the votha-2.8 requirements

Change-Id: Ic2d5dcdcf567d44ae98851db4e97cf1a662a37a3
diff --git a/agent/src/core_api_handler.cc b/agent/src/core_api_handler.cc
index 0e195d9..1579a4f 100644
--- a/agent/src/core_api_handler.cc
+++ b/agent/src/core_api_handler.cc
@@ -270,7 +270,7 @@
             pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::ALLOC_ID);
             pool->set_sharing(::openolt::DeviceInfo::DeviceResourceRanges::Pool::DEDICATED_PER_INTF);
             pool->set_start(ALLOC_ID_START);
-            pool->set_end(ALLOC_ID_START);
+            pool->set_end(ALLOC_ID_END);
 
             pool = range->add_pools();
             pool->set_type(::openolt::DeviceInfo::DeviceResourceRanges::Pool::GEMPORT_ID);
@@ -365,6 +365,9 @@
         bcmos_fastlock_init(&gem_cfg_wait_lock, 0);
         bcmos_fastlock_init(&onu_deactivate_wait_lock, 0);
         bcmos_fastlock_init(&acl_packet_trap_handler_lock, 0);
+        bcmos_fastlock_init(&symmetric_datapath_flow_id_lock, 0);
+        bcmos_fastlock_init(&pon_gem_to_onu_uni_map_lock, 0);
+
 
         OPENOLT_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
 
@@ -1523,12 +1526,13 @@
 
 Status FlowAddWrapper_(const ::openolt::Flow* request) {
 
+    Status st = Status::OK;
     int32_t access_intf_id = request->access_intf_id();
     int32_t onu_id = request->onu_id();
     int32_t uni_id = request->uni_id();
     uint32_t port_no = request->port_no();
     uint64_t voltha_flow_id = request->flow_id();
-    uint64_t symmetric_voltha_flow_id = request->symmetric_flow_id();
+    uint64_t symmetric_voltha_flow_id = 0;
     const std::string flow_type = request->flow_type();
     int32_t alloc_id = request->alloc_id();
     int32_t network_intf_id = request->network_intf_id();
@@ -1544,6 +1548,25 @@
     const google::protobuf::Map<unsigned int, bool> &gemport_to_aes = request->gemport_to_aes();
     uint16_t flow_id;
     bool enable_encryption;
+    // When following conditions are ALL met, it qualifies as datapath flow.
+    // 1. valid access_intf_id, onu_id, uni_id
+    // 2. Valid tech_profile_id
+    // 3. flow_type that is not MULTICAST
+    // 4. Not a trap-to-host flow.
+    bool datapathFlow = access_intf_id >= 0 && onu_id >= 0 && uni_id >= 0 && tech_profile_id > 0
+                        && flow_type != multicast && !action.cmd().trap_to_host();
+
+    if (datapathFlow) {
+        const std::string inverse_flow_type = flow_type.compare(upstream) == 0 ? downstream : upstream;
+        symmetric_datapath_flow_id_map_key key(access_intf_id, onu_id, uni_id, tech_profile_id, inverse_flow_type);
+        // Find the onu-uni mapping for the pon-gem key
+        bcmos_fastlock_lock(&symmetric_datapath_flow_id_lock);
+        auto it = symmetric_datapath_flow_id_map.find(key);
+        bcmos_fastlock_unlock(&symmetric_datapath_flow_id_lock, 0);
+        if (it != symmetric_datapath_flow_id_map.end()) {
+            symmetric_voltha_flow_id = it->second;
+        }
+    }
 
     // The intf_id variable defaults to access(PON) interface ID.
     // For trap-from-nni flow where access interface ID is not valid , change it to NNI interface ID
@@ -1576,25 +1599,27 @@
         }
 
         if (!replicate_flow) {  // No flow replication
-                flow_id = dev_fl_symm_params[0].flow_id;
-                gemport_id = dev_fl_symm_params[0].gemport_id; // overwrite the gemport with symmetric flow gemport
-                                                               // Should be same as what is coming in this request.
-                enable_encryption = get_aes_flag_for_gem_port(gemport_to_aes, gemport_id);
-                ::openolt::Classifier cl = ::openolt::Classifier(classifier);
-                cl.set_o_pbits(dev_fl_symm_params[0].pbit);
-                Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
-                                    flow_type, alloc_id, network_intf_id, gemport_id, cl,
-                                    action, priority, cookie, group_id, tech_profile_id, enable_encryption);
-                if (st.error_code() == grpc::StatusCode::OK) {
-                    device_flow dev_fl;
-                    dev_fl.is_flow_replicated = false;
-                    dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
-                    dev_fl.voltha_flow_id = voltha_flow_id;
-                    memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params));
-                    // update voltha flow to cache
-                    update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
-                }
+            flow_id = dev_fl_symm_params[0].flow_id;
+            gemport_id = dev_fl_symm_params[0].gemport_id; // overwrite the gemport with symmetric flow gemport
+                                                           // Should be same as what is coming in this request.
+            enable_encryption = get_aes_flag_for_gem_port(gemport_to_aes, gemport_id);
+            ::openolt::Classifier cl = ::openolt::Classifier(classifier);
+            cl.set_o_pbits(dev_fl_symm_params[0].pbit);
+            st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                                flow_type, alloc_id, network_intf_id, gemport_id, cl,
+                                action, priority, cookie, group_id, tech_profile_id, enable_encryption);
+            if (st.error_code() != grpc::StatusCode::OK && st.error_code() != grpc::StatusCode::ALREADY_EXISTS) {
+                OPENOLT_LOG(ERROR, openolt_log_id, "failed to install device flow=%u for voltha flow=%lu", flow_id, voltha_flow_id);
                 return st;
+            }
+
+            device_flow dev_fl;
+            dev_fl.is_flow_replicated = false;
+            dev_fl.symmetric_voltha_flow_id = symmetric_voltha_flow_id;
+            dev_fl.voltha_flow_id = voltha_flow_id;
+            memcpy(dev_fl.params, dev_fl_symm_params, sizeof(device_flow_params));
+            // update voltha flow to cache
+            update_voltha_flow_to_cache(voltha_flow_id, dev_fl);
         } else { // Flow to be replicated
             OPENOLT_LOG(INFO, openolt_log_id,"symmetric flow and replication is needed\n");
             for (uint8_t i=0; i<NUMBER_OF_REPLICATED_FLOWS; i++) {
@@ -1603,7 +1628,7 @@
                 gemport_id = dev_fl_symm_params[i].gemport_id;
                 enable_encryption = get_aes_flag_for_gem_port(gemport_to_aes, gemport_id);
                 cl.set_o_pbits(dev_fl_symm_params[i].pbit);
-                Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
                                     flow_type, alloc_id, network_intf_id, gemport_id, cl,
                                     action, priority, cookie, group_id, tech_profile_id, enable_encryption);
                 if (st.error_code() != grpc::StatusCode::OK && st.error_code() != grpc::StatusCode::ALREADY_EXISTS) {
@@ -1635,7 +1660,7 @@
                 return ::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "flow-ids-exhausted");
             }
             enable_encryption = get_aes_flag_for_gem_port(gemport_to_aes, gemport_id);
-            Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+            st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
                                 flow_type, alloc_id, network_intf_id, gemport_id, classifier,
                                 action, priority, cookie, group_id, tech_profile_id, enable_encryption);
             if (st.error_code() == grpc::StatusCode::OK) {
@@ -1651,8 +1676,8 @@
             } else {
                 // Free the flow id on failure
                 free_flow_id(flow_id);
+                return st;
             }
-            return st;
         } else { // Flow to be replicated
             OPENOLT_LOG(INFO, openolt_log_id,"not a symmetric flow and replication is needed\n");
             if (pbit_to_gemport.size() != NUMBER_OF_PBITS) {
@@ -1676,7 +1701,7 @@
                     gemport_id = dev_fl.params[cnt].gemport_id;
                     enable_encryption = get_aes_flag_for_gem_port(gemport_to_aes, gemport_id);
                     cl.set_o_pbits(dev_fl.params[cnt].pbit);
-                    Status st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
+                    st = FlowAdd_(access_intf_id, onu_id, uni_id, port_no, flow_id,
                                         flow_type, alloc_id, network_intf_id, gemport_id, cl,
                                         action, priority, cookie, group_id, tech_profile_id, enable_encryption);
                     if (st.error_code() != grpc::StatusCode::OK) {
@@ -1703,7 +1728,15 @@
         }
     }
 
-    return Status::OK;
+    if (datapathFlow) {
+        // Create the pon-gem to onu-uni mapping
+        symmetric_datapath_flow_id_map_key key(access_intf_id, onu_id, uni_id, tech_profile_id, flow_type);
+        bcmos_fastlock_lock(&symmetric_datapath_flow_id_lock);
+        symmetric_datapath_flow_id_map[key] = voltha_flow_id;
+        bcmos_fastlock_unlock(&symmetric_datapath_flow_id_lock, 0);
+    }
+
+    return st;
 }
 
 
@@ -2084,6 +2117,10 @@
 }
 
 Status FlowRemoveWrapper_(const ::openolt::Flow* request) {
+    int32_t access_intf_id = request->access_intf_id();
+    int32_t onu_id = request->onu_id();
+    int32_t uni_id = request->uni_id();
+    uint32_t tech_profile_id = request->tech_profile_id();
     const std::string flow_type = request->flow_type();
     uint64_t voltha_flow_id = request->flow_id();
     Status st;
@@ -2116,6 +2153,13 @@
     }
     // remove the flow from cache on voltha flow removal
     remove_voltha_flow_from_cache(voltha_flow_id);
+
+    symmetric_datapath_flow_id_map_key key(access_intf_id, onu_id, uni_id, tech_profile_id, flow_type);
+    // Remove onu-uni mapping for the pon-gem key
+    bcmos_fastlock_lock(&symmetric_datapath_flow_id_lock);
+    symmetric_datapath_flow_id_map.erase(key);
+    bcmos_fastlock_unlock(&symmetric_datapath_flow_id_lock, 0);
+
     return Status::OK;
 }
 
@@ -2769,6 +2813,14 @@
             OPENOLT_LOG(ERROR, openolt_log_id, "failed to created gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
             return BCM_ERR_INTERNAL;
         }
+        if (direction == upstream) {
+            // Create the pon-gem to onu-uni mapping
+            pon_gem pg(access_intf_id, gemport_id);
+            onu_uni ou(onu_id, uni_id);
+            bcmos_fastlock_lock(&pon_gem_to_onu_uni_map_lock);
+            pon_gem_to_onu_uni_map[pg] = ou;
+            bcmos_fastlock_unlock(&pon_gem_to_onu_uni_map_lock, 0);
+        }
     }
 
     OPENOLT_LOG(INFO, openolt_log_id, "Created tm_queue, direction %s, id %d, sched_id %d, tm_q_set_id %d, \
@@ -2852,6 +2904,13 @@
             OPENOLT_LOG(ERROR, openolt_log_id, "failed to remove gemport=%d, access_intf=%d, onu_id=%d\n", gemport_id, access_intf_id, onu_id);
             return BCM_ERR_INTERNAL;
         }
+        if (direction == upstream) {
+            // Remove the pon-gem to onu-uni mapping
+            pon_gem pg(access_intf_id, gemport_id);
+            bcmos_fastlock_lock(&pon_gem_to_onu_uni_map_lock);
+            pon_gem_to_onu_uni_map.erase(pg);
+            bcmos_fastlock_unlock(&pon_gem_to_onu_uni_map_lock, 0);
+        }
     }
 
     if (direction == downstream) {