VOL-1447: Changes in OpenOLT driver for creating the Traffic Schedulers and
Queues as per the TechProfile configuration

Change-Id: I3a51ce53c8f9bd369b89b5f1f55f74f73893d65e
diff --git a/agent/src/core.cc b/agent/src/core.cc
index c9781ff..2912932 100644
--- a/agent/src/core.cc
+++ b/agent/src/core.cc
@@ -24,6 +24,7 @@
 #include <sstream>
 #include <chrono>
 #include <thread>
+#include <bitset>
 
 #include "device.h"
 #include "core.h"
@@ -55,6 +56,9 @@
 
 #define MAX_SUPPORTED_INTF 16
 #define BAL_RSC_MANAGER_BASE_TM_SCHED_ID 16384
+#define MAX_TM_QUEUE_ID 8192
+#define MAX_TM_SCHED_ID 16384
+#define EAP_ETHER_TYPE 34958
 
 static unsigned int num_of_nni_ports = 0;
 static unsigned int num_of_pon_ports = 0;
@@ -67,7 +71,10 @@
 
 const uint32_t tm_upstream_sched_id_start = 18432;
 const uint32_t tm_downstream_sched_id_start = 16384;
-const uint32_t tm_queue_id_start = 4; //0 to 3 are default queues. Lets not use them.
+//0 to 3 are default queues. Lets not use them.
+const uint32_t tm_queue_id_start = 4;
+// Upto 8 fixed Upstream. Queue id 0 to 3 are pre-created, lets not use them.
+const uint32_t us_fixed_queue_id_list[8] = {4, 5, 6, 7, 8, 9, 10, 11};
 const std::string upstream = "upstream";
 const std::string downstream = "downstream";
 
@@ -76,18 +83,51 @@
 static std::map<uint32_t, uint32_t> flowid_to_port; // For mapping upstream flows to logical ports
 static std::map<uint32_t, uint32_t> flowid_to_gemport; // For mapping downstream flows into gemports
 static std::map<uint32_t, std::set<uint32_t> > port_to_flows; // For mapping logical ports to downstream flows
-static std::map<uint32_t, uint32_t> port_to_alloc;
-static bcmos_fastlock flow_lock;
+
+// This represents the Key to 'queue_map' map.
+// Represents (pon_intf_id, onu_id, uni_id, gemport_id, direction)
+typedef std::tuple<uint32_t, uint32_t, uint32_t, uint32_t, std::string> queue_map_key_tuple;
+// 'queue_map' maps queue_map_key_tuple to downstream queue id present
+// on the Subscriber Scheduler
+static std::map<queue_map_key_tuple, int> queue_map;
+// This represents the Key to 'sched_map' map.
+// Represents (pon_intf_id, onu_id, uni_id, direction)
+
+typedef std::tuple<uint32_t, uint32_t, uint32_t, std::string> sched_map_key_tuple;
+// 'sched_map' maps sched_map_key_tuple to DBA (Upstream) or
+// Subscriber (Downstream) Scheduler ID
+static std::map<sched_map_key_tuple, int> sched_map;
+
+
+std::bitset<MAX_TM_QUEUE_ID> tm_queue_bitset;
+std::bitset<MAX_TM_SCHED_ID> tm_sched_bitset;
+
+static bcmos_fastlock data_lock;
 
 #define MIN_ALLOC_ID_GPON 256
 #define MIN_ALLOC_ID_XGSPON 1024
 
-Status SchedAdd_(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no, 
-                 uint32_t alloc_id, openolt::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
-                 openolt::SchedulingPolicy sched_policy, openolt::TrafficShapingInfo traffic_shaping_info);
-static Status SchedRemove_(std::string direction, int intf_id, int onu_id, int uni_id, uint32_t port_no, int alloc_id);
+static bcmos_errno CreateSched(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
+                          uint32_t port_no, uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, \
+                          uint32_t priority, tech_profile::SchedulingPolicy sched_policy,
+                          tech_profile::TrafficShapingInfo traffic_shaping_info);
+static bcmos_errno RemoveSched(int intf_id, int onu_id, int uni_id, std::string direction);
+static bcmos_errno CreateQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, \
+                               uint32_t priority, uint32_t gemport_id);
+static bcmos_errno RemoveQueue(std::string direction, int intf_id, int onu_id, int uni_id, uint32_t port_no, int alloc_id);
 
-static inline int mk_sched_id(int intf_id, int onu_id, std::string direction) {
+/**
+* Returns the default NNI (Upstream direction) or PON (Downstream direction) scheduler
+* Every NNI port and PON port have default scheduler.
+* The NNI0 default scheduler ID is 18432, and NNI1 is 18433 and so on.
+* Similarly, PON0 default scheduler ID is 16384. PON1 is 16385 and so on.
+*
+* @param intf_id NNI or PON interface ID
+* @param direction "upstream" or "downstream"
+*
+* @return default scheduler ID for the given interface.
+*/
+static inline int get_default_tm_sched_id(int intf_id, std::string direction) {
     if (direction.compare(upstream) == 0) {
         return tm_upstream_sched_id_start + intf_id;
     } else if (direction.compare(downstream) == 0) {
@@ -99,17 +139,164 @@
     }
 }
 
-static inline int mk_queue_id(int pon_intf_id, int onu_id, int uni_id, uint32_t port_no, uint32_t alloc_id) {
-    (void) uni_id; // unused
-    if(port_no == 0) return tm_queue_id_start + pon_intf_id * 32 + onu_id; // old style
+/**
+* Gets a unique tm_queue_id for a given intf_id, onu_id, uni_id, gemport_id, direction
+* The tm_queue_id is locally cached in a map, so that it can rendered when necessary.
+* VOLTHA replays whole configuration on OLT reboot, so caching locally is not a problem
+*
+* @param intf_id NNI or PON intf ID
+* @param onu_id ONU ID
+* @param uni_id UNI ID
+* @param gemport_id GEM Port ID
+* @param direction Upstream or downstream
+*
+* @return tm_queue_id
+*/
+int get_tm_queue_id(int intf_id, int onu_id, int uni_id, int gemport_id, std::string direction) {
+    queue_map_key_tuple key(intf_id, onu_id, uni_id, gemport_id, direction);
+    int queue_id = -1;
 
-    int start = intf_technologies[pon_intf_id] == "gpon" ? MIN_ALLOC_ID_GPON : MIN_ALLOC_ID_XGSPON; // offset built into all alloc Ids
-    return tm_queue_id_start + alloc_id - start; // use alloc_id as a unique queue id. It is unique per UNI, and in the range of IDs supported by BAL
+    std::map<queue_map_key_tuple, int>::const_iterator it = queue_map.find(key);
+    if (it != queue_map.end()) {
+        queue_id = it->second;
+    }
+    if (queue_id != -1) {
+        return queue_id;
+    }
+
+    bcmos_fastlock_lock(&data_lock);
+    // Complexity of O(n). Is there better way that can avoid linear search?
+    for (queue_id = 0; queue_id < MAX_TM_QUEUE_ID; queue_id++) {
+        if (tm_queue_bitset[queue_id] == 0) {
+            tm_queue_bitset[queue_id] = 1;
+            break;
+        }
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+
+    if (queue_id < MAX_TM_QUEUE_ID) {
+        bcmos_fastlock_lock(&data_lock);
+        queue_map[key] = queue_id;
+        bcmos_fastlock_unlock(&data_lock, 0);
+        return queue_id;
+    } else {
+        return -1;
+    }
 }
 
-static inline int mk_agg_port_id(int intf_id, int onu_id) {
-    if (board_technology == "gpon") return 511 + intf_id * 32 + onu_id;
-    return 1023 + intf_id * 32 + onu_id;
+/**
+* Update tm_queue_id for a given intf_id, onu_id, uni_id, gemport_id, direction
+*
+* @param intf_id NNI or PON intf ID
+* @param onu_id ONU ID
+* @param uni_id UNI ID
+* @param gemport_id GEM Port ID
+* @param direction Upstream or downstream
+* @param tm_queue_id tm_queue_id
+*/
+void update_tm_queue_id(int pon_intf_id, int onu_id, int uni_id, int gemport_id, std::string direction,
+                                  uint32_t queue_id) {
+    queue_map_key_tuple key(pon_intf_id, onu_id, uni_id, gemport_id, direction);
+    bcmos_fastlock_lock(&data_lock);
+    queue_map[key] = queue_id;
+    bcmos_fastlock_unlock(&data_lock, 0);
+}
+
+/**
+* Free tm_queue_id for a given intf_id, onu_id, uni_id, gemport_id, direction
+*
+* @param intf_id NNI or PON intf ID
+* @param onu_id ONU ID
+* @param uni_id UNI ID
+* @param gemport_id GEM Port ID
+* @param direction Upstream or downstream
+*/
+void free_tm_queue_id(int pon_intf_id, int onu_id, int uni_id, int gemport_id, std::string direction) {
+    queue_map_key_tuple key(pon_intf_id, onu_id, uni_id, gemport_id, direction);
+    std::map<queue_map_key_tuple, int>::const_iterator it;
+    bcmos_fastlock_lock(&data_lock);
+    it = queue_map.find(key);
+    if (it != queue_map.end()) {
+        tm_queue_bitset[it->second] = 0;
+        queue_map.erase(it);
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+}
+
+/**
+* Gets a unique tm_sched_id for a given intf_id, onu_id, uni_id, gemport_id, direction
+* The tm_sched_id is locally cached in a map, so that it can rendered when necessary.
+* VOLTHA replays whole configuration on OLT reboot, so caching locally is not a problem
+*
+* @param intf_id NNI or PON intf ID
+* @param onu_id ONU ID
+* @param uni_id UNI ID
+* @param gemport_id GEM Port ID
+* @param direction Upstream or downstream
+*
+* @return tm_sched_id
+*/
+uint32_t get_tm_sched_id(int pon_intf_id, int onu_id, int uni_id, std::string direction) {
+    sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction);
+    int sched_id = -1;
+
+    std::map<sched_map_key_tuple, int>::const_iterator it = sched_map.find(key);
+    if (it != sched_map.end()) {
+        sched_id = it->second;
+    }
+    if (sched_id != -1) {
+        return sched_id;
+    }
+
+    bcmos_fastlock_lock(&data_lock);
+    // Complexity of O(n). Is there better way that can avoid linear search?
+    for (sched_id = 0; sched_id < MAX_TM_SCHED_ID; sched_id++) {
+        if (tm_sched_bitset[sched_id] == 0) {
+            tm_sched_bitset[sched_id] = 1;
+            break;
+        }
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+
+    if (sched_id < MAX_TM_SCHED_ID) {
+        bcmos_fastlock_lock(&data_lock);
+        sched_map[key] = sched_id;
+        bcmos_fastlock_unlock(&data_lock, 0);
+        return sched_id;
+    } else {
+        return -1;
+    }
+}
+
+/**
+* Free tm_sched_id for a given intf_id, onu_id, uni_id, gemport_id, direction
+*
+* @param intf_id NNI or PON intf ID
+* @param onu_id ONU ID
+* @param uni_id UNI ID
+* @param gemport_id GEM Port ID
+* @param direction Upstream or downstream
+*/
+void free_tm_sched_id(int pon_intf_id, int onu_id, int uni_id, std::string direction) {
+    sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction);
+    std::map<sched_map_key_tuple, int>::const_iterator it;
+    bcmos_fastlock_lock(&data_lock);
+    it = sched_map.find(key);
+    if (it != sched_map.end()) {
+        tm_sched_bitset[it->second] = 0;
+        sched_map.erase(it);
+    }
+    bcmos_fastlock_unlock(&data_lock, 0);
+}
+
+bool is_tm_sched_id_present(int pon_intf_id, int onu_id, int uni_id, std::string direction) {
+    sched_map_key_tuple key(pon_intf_id, onu_id, uni_id, direction);
+    return sched_map.count(key) > 0 ? true: false;
+}
+
+bool is_tm_queue_id_present(int pon_intf_id, int onu_id, int uni_id, int gemport_id, std::string direction) {
+    queue_map_key_tuple key(pon_intf_id, onu_id, uni_id, gemport_id, direction);
+    return queue_map.count(key) > 0 ? true: false;
 }
 
 char* openolt_read_sysinfo(char* field_name, char* field_val)
@@ -260,7 +447,7 @@
 
         vendor_init();
         bcmbal_init(argc, argv, NULL);
-        bcmos_fastlock_init(&flow_lock, 0);
+        bcmos_fastlock_init(&data_lock, 0);
 
         BCM_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
 
@@ -309,7 +496,7 @@
     //This fails with Operation Not Supported, bug ???
 
     //TEMPORARY WORK AROUND
-    Status status = DisableUplinkIf_(0);
+    Status status = DisableUplinkIf_(nni_intf_id);
     if (status.ok()) {
         state.deactivate();
         openolt::Indication ind;
@@ -622,8 +809,8 @@
 
     if (0 == key.sub_term_id)
     {
-            BCM_LOG(INFO, openolt_log_id,"Invalid Key to handle subscriber terminal clear subscriber_terminal_id %d, Interface ID %d\n",
-                onu_id, intf_id);
+            BCM_LOG(INFO, openolt_log_id,"Invalid Key to handle subscriber terminal clear subscriber_terminal_id %d, \
+                    Interface ID %d\n", onu_id, intf_id);
             return Status(grpc::StatusCode::INTERNAL, "Failed to delete ONU");
     }
 
@@ -692,33 +879,35 @@
     return Status::OK;
 }
 
-Status OnuPacketOut_(uint32_t intf_id, uint32_t onu_id, uint32_t port_no, const std::string pkt) {
+Status OnuPacketOut_(uint32_t intf_id, uint32_t onu_id, uint32_t port_no, uint32_t gemport_id, const std::string pkt) {
     bcmos_errno err = BCM_ERR_OK;
     bcmbal_dest proxy_pkt_dest;
     bcmbal_u8_list_u32_max_2048 buf;
 
     if (port_no > 0) {
         bool found = false;
-        uint32_t gemport_id;
-
-        bcmos_fastlock_lock(&flow_lock);
-        // Map the port_no to one of the flows that owns it to find a gemport_id for that flow.
-        // Pick any flow that is mapped with the same port_no.
-        std::map<uint32_t, std::set<uint32_t> >::const_iterator it = port_to_flows.find(port_no);
-        if (it != port_to_flows.end() && !it->second.empty()) {
-            uint32_t flow_id = *(it->second.begin()); // Pick any flow_id out of the bag set
-            std::map<uint32_t, uint32_t>::const_iterator fit = flowid_to_gemport.find(flow_id);
-            if (fit != flowid_to_gemport.end()) {
-                found = true;
-                gemport_id = fit->second;
+        if (gemport_id == 0) {
+            bcmos_fastlock_lock(&data_lock);
+            // Map the port_no to one of the flows that owns it to find a gemport_id for that flow.
+            // Pick any flow that is mapped with the same port_no.
+            std::map<uint32_t, std::set<uint32_t> >::const_iterator it = port_to_flows.find(port_no);
+            if (it != port_to_flows.end() && !it->second.empty()) {
+                uint32_t flow_id = *(it->second.begin()); // Pick any flow_id out of the bag set
+                std::map<uint32_t, uint32_t>::const_iterator fit = flowid_to_gemport.find(flow_id);
+                if (fit != flowid_to_gemport.end()) {
+                    found = true;
+                    gemport_id = fit->second;
+                }
             }
-        }
-        bcmos_fastlock_unlock(&flow_lock, 0);
+            bcmos_fastlock_unlock(&data_lock, 0);
 
-        if (!found) {
-            BCM_LOG(ERROR, openolt_log_id, "Packet out failed to find destination for ONU %d port_no %u on PON %d\n",
-                onu_id, port_no, intf_id);
-            return grpc::Status(grpc::StatusCode::NOT_FOUND, "no flow for port_no");
+            if (!found) {
+                BCM_LOG(ERROR, openolt_log_id, "Packet out failed to find destination for ONU %d port_no %u on PON %d\n",
+                        onu_id, port_no, intf_id);
+                return grpc::Status(grpc::StatusCode::NOT_FOUND, "no flow for port_no");
+            }
+            BCM_LOG(INFO, openolt_log_id, "Gem port %u found for ONU %d port_no %u on PON %d\n",
+                    gemport_id, onu_id, port_no, intf_id);
         }
 
         proxy_pkt_dest.type = BCMBAL_DEST_TYPE_SVC_PORT;
@@ -770,13 +959,13 @@
 
 uint32_t GetPortNum_(uint32_t flow_id)
 {
-    bcmos_fastlock_lock(&flow_lock);
+    bcmos_fastlock_lock(&data_lock);
     uint32_t port_no = 0;
     std::map<uint32_t, uint32_t >::const_iterator it = flowid_to_port.find(flow_id);
     if (it != flowid_to_port.end()) {
         port_no = it->second;
     }
-    bcmos_fastlock_unlock(&flow_lock, 0);
+    bcmos_fastlock_unlock(&data_lock, 0);
     return port_no;
 }
 
@@ -788,14 +977,18 @@
     bcmos_errno err;
     bcmbal_flow_cfg cfg;
     bcmbal_flow_key key = { };
+    int32_t o_vid = -1;
+    bool single_tag = false;
+    uint32_t ether_type = 0;
 
-    BCM_LOG(INFO, openolt_log_id, "flow add - intf_id %d, onu_id %d, uni_id %d, port_no %u, flow_id %d, flow_type %s, gemport_id %d, network_intf_id %d, cookie %llu\n",
-        access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type.c_str(), gemport_id, network_intf_id, cookie);
+    BCM_LOG(INFO, openolt_log_id, "flow add - intf_id %d, onu_id %d, uni_id %d, port_no %u, flow_id %d, flow_type %s, \
+            gemport_id %d, network_intf_id %d, cookie %llu\n", \
+            access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type.c_str(), gemport_id, network_intf_id, cookie);
 
     key.flow_id = flow_id;
-    if (flow_type.compare("upstream") == 0 ) {
+    if (flow_type.compare(upstream) == 0 ) {
         key.flow_type = BCMBAL_FLOW_TYPE_UPSTREAM;
-    } else if (flow_type.compare("downstream") == 0) {
+    } else if (flow_type.compare(downstream) == 0) {
         key.flow_type = BCMBAL_FLOW_TYPE_DOWNSTREAM;
     } else {
         BCM_LOG(WARNING, openolt_log_id, "Invalid flow type %s\n", flow_type.c_str());
@@ -820,7 +1013,7 @@
         BCMBAL_CFG_PROP_SET(&cfg, flow, svc_port_id, gemport_id);
     }
     if (gemport_id >= 0 && port_no != 0) {
-        bcmos_fastlock_lock(&flow_lock);
+        bcmos_fastlock_lock(&data_lock);
         if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
             port_to_flows[port_no].insert(key.flow_id);
             flowid_to_gemport[key.flow_id] = gemport_id;
@@ -829,7 +1022,7 @@
         {
             flowid_to_port[key.flow_id] = port_no;
         }
-        bcmos_fastlock_unlock(&flow_lock, 0);
+        bcmos_fastlock_unlock(&data_lock, 0);
     }
     if (priority_value >= 0) {
         BCMBAL_CFG_PROP_SET(&cfg, flow, priority, priority_value);
@@ -858,17 +1051,8 @@
             BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, i_vid, classifier.i_vid());
         }
 
-        if (classifier.o_pbits()) {
-            BCM_LOG(DEBUG, openolt_log_id, "classify o_pbits 0x%x\n", classifier.o_pbits());
-            BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, o_pbits, classifier.o_pbits());
-        }
-
-        if (classifier.i_pbits()) {
-            BCM_LOG(DEBUG, openolt_log_id, "classify i_pbits 0x%x\n", classifier.i_pbits());
-            BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, i_pbits, classifier.i_pbits());
-        }
-
         if (classifier.eth_type()) {
+            ether_type = classifier.eth_type();
             BCM_LOG(DEBUG, openolt_log_id, "classify ether_type 0x%04x\n", classifier.eth_type());
             BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, ether_type, classifier.eth_type());
         }
@@ -914,8 +1098,15 @@
                 BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, pkt_tag_type, BCMBAL_PKT_TAG_TYPE_UNTAGGED);
             } else if (classifier.pkt_tag_type().compare("single_tag") == 0) {
                 BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, pkt_tag_type, BCMBAL_PKT_TAG_TYPE_SINGLE_TAG);
+                single_tag = true;
+
+		BCM_LOG(DEBUG, openolt_log_id, "classify o_pbits 0x%x\n", classifier.o_pbits());
+                BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, o_pbits, classifier.o_pbits());
             } else if (classifier.pkt_tag_type().compare("double_tag") == 0) {
                 BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, pkt_tag_type, BCMBAL_PKT_TAG_TYPE_DOUBLE_TAG);
+
+		BCM_LOG(DEBUG, openolt_log_id, "classify o_pbits 0x%x\n", classifier.o_pbits());
+                BCMBAL_ATTRIBUTE_PROP_SET(&val, classifier, o_pbits, classifier.o_pbits());
             }
         }
 
@@ -944,6 +1135,7 @@
 
         if (action.o_vid()) {
             BCM_LOG(INFO, openolt_log_id, "action o_vid=%d\n", action.o_vid());
+            o_vid = action.o_vid();
             BCMBAL_ATTRIBUTE_PROP_SET(&val, action, o_vid, action.o_vid());
         }
 
@@ -979,24 +1171,23 @@
 
         if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
             bcmbal_tm_queue_ref val = { };
-            val.sched_id = mk_sched_id(access_intf_id, onu_id, "downstream");
-            uint32_t alloc_id;
-            bcmos_fastlock_lock(&flow_lock);
-            alloc_id = port_to_alloc[port_no];
-            bcmos_fastlock_unlock(&flow_lock, 0);
-            val.queue_id = mk_queue_id(access_intf_id, onu_id, uni_id, port_no, alloc_id);
+            if (single_tag && ether_type == EAP_ETHER_TYPE) {
+                val.sched_id = get_default_tm_sched_id(access_intf_id, downstream);
+                val.queue_id = 0;
+
+            } else {
+                val.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, downstream); // Subscriber Scheduler
+                val.queue_id = get_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, downstream);
+            }
             BCMBAL_CFG_PROP_SET(&cfg, flow, queue, val);
         } else if (key.flow_type == BCMBAL_FLOW_TYPE_UPSTREAM) {
             bcmbal_tm_sched_id val1;
-            if (alloc_id != 0) {
-                val1 = alloc_id;
-            } else {
-                BCM_LOG(ERROR, openolt_log_id, "alloc_id not present");
-            }
+            val1 = get_tm_sched_id(access_intf_id, onu_id, uni_id, upstream); // DBA Scheduler ID
             BCMBAL_CFG_PROP_SET(&cfg, flow, dba_tm_sched_id, val1);
 
             bcmbal_tm_queue_ref val2 = { };
-            val2.sched_id = mk_sched_id(network_intf_id, onu_id, "upstream");
+            val2.sched_id = get_default_tm_sched_id(network_intf_id, upstream); // NNI Scheduler ID
+            val2.queue_id = get_tm_queue_id(network_intf_id, onu_id, uni_id, gemport_id, upstream); // Queue on NNI
             BCMBAL_CFG_PROP_SET(&cfg, flow, queue, val2);
         }
     }
@@ -1019,16 +1210,16 @@
 
     key.flow_id = (bcmbal_flow_id) flow_id;
     key.flow_id = flow_id;
-    if (flow_type.compare("upstream") == 0 ) {
+    if (flow_type.compare(upstream) == 0 ) {
         key.flow_type = BCMBAL_FLOW_TYPE_UPSTREAM;
-    } else if (flow_type.compare("downstream") == 0) {
+    } else if (flow_type.compare(downstream) == 0) {
         key.flow_type = BCMBAL_FLOW_TYPE_DOWNSTREAM;
     } else {
         BCM_LOG(WARNING, openolt_log_id, "Invalid flow type %s\n", flow_type.c_str());
         return bcm_to_grpc_err(BCM_ERR_PARM, "Invalid flow type");
     }
 
-    bcmos_fastlock_lock(&flow_lock);
+    bcmos_fastlock_lock(&data_lock);
     uint32_t port_no = flowid_to_port[key.flow_id];
     if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
         flowid_to_gemport.erase(key.flow_id);
@@ -1039,7 +1230,7 @@
     {
         flowid_to_port.erase(key.flow_id);
     }
-    bcmos_fastlock_unlock(&flow_lock, 0);
+    bcmos_fastlock_unlock(&data_lock, 0);
 
     BCMBAL_CFG_INIT(&cfg, flow, key);
 
@@ -1055,217 +1246,375 @@
     return Status::OK;
 }
 
-Status SchedAdd_(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
-                 uint32_t alloc_id, openolt::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
-                 openolt::SchedulingPolicy sched_policy, openolt::TrafficShapingInfo tf_sh_info) {
+bcmos_errno CreateSched(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
+                 uint32_t alloc_id, tech_profile::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
+                 tech_profile::SchedulingPolicy sched_policy, tech_profile::TrafficShapingInfo tf_sh_info) {
 
     bcmos_errno err;
 
-    if (direction == "downstream") {
-        bcmbal_tm_queue_cfg cfg;
-        bcmbal_tm_queue_key key = { };
-        // Note: We use the default scheduler available in the DL.
-        key.sched_id = mk_sched_id(intf_id, onu_id, direction);
-        key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
-        key.id = mk_queue_id(intf_id, onu_id, uni_id, port_no, alloc_id);
+    if (direction == downstream) {
 
-        BCMBAL_CFG_INIT(&cfg, tm_queue, key);
-        //Queue must be set with either weight or priority, not both,
-        // as its scheduler' sched_type is sp_wfq
-        BCMBAL_CFG_PROP_SET(&cfg, tm_queue, priority, priority);
-        //BCMBAL_CFG_PROP_SET(&cfg, tm_queue, weight, weight);
-        //BCMBAL_CFG_PROP_SET(&cfg, tm_queue, creation_mode, BCMBAL_TM_CREATION_MODE_MANUAL);
-
-        bcmbal_tm_shaping rate = {};
-        if (tf_sh_info.cir() >= 0 && tf_sh_info.pir() > 0) {
-            uint32_t cir = tf_sh_info.cir();
-            uint32_t pir = tf_sh_info.pir();
-            uint32_t burst = tf_sh_info.pbs();
-            BCM_LOG(INFO, openolt_log_id, "applying traffic shaping in DL cir=%u, pir=%u, burst=%u\n",
-               cir, pir, burst);
-            rate.presence_mask = BCMBAL_TM_SHAPING_ID_ALL;
-            rate.cir = cir;
-            rate.pir = pir;
-            rate.burst = burst;
-
-            BCMBAL_CFG_PROP_SET(&cfg, tm_queue, rate, rate);
-        }
-
-        err = bcmbal_cfg_set(DEFAULT_ATERM_ID, &cfg.hdr);
-        if (err) {
-            BCM_LOG(ERROR, openolt_log_id, "Failed to create subscriber downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
-                    key.id, key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
-            return bcm_to_grpc_err(err, "Failed to create subscriber downstream tm queue");
-        }
-
-        bcmos_fastlock_lock(&flow_lock);
-        port_to_alloc[port_no] = alloc_id;
-        bcmos_fastlock_unlock(&flow_lock, 0);
-
-        BCM_LOG(INFO, openolt_log_id, "Create downstream sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
-                key.id,intf_id,onu_id,uni_id,port_no,alloc_id);
-
-    } else { //"upstream"
         bcmbal_tm_sched_cfg cfg;
         bcmbal_tm_sched_key key = { };
-        bcmbal_tm_sched_type sched_type;
+        key.id = get_tm_sched_id(intf_id, onu_id, uni_id, direction);
+        key.dir = BCMBAL_TM_SCHED_DIR_DS;
 
-        key.id = alloc_id;
+        BCMBAL_CFG_INIT(&cfg, tm_sched, key);
+
+        {
+            // bcmbal_tm_sched_owner
+            // In downstream it is sub_term scheduler
+            bcmbal_tm_sched_owner tm_sched_owner = { };
+            tm_sched_owner.type = BCMBAL_TM_SCHED_OWNER_TYPE_SUB_TERM;
+            tm_sched_owner.u.sub_term.intf_id = intf_id;
+            tm_sched_owner.u.sub_term.sub_term_id = onu_id;
+            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, owner, tm_sched_owner);
+
+            // bcmbal_tm_sched_type
+            // set the deafult policy to strict priority
+            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, sched_type, BCMBAL_TM_SCHED_TYPE_SP);
+
+            // bcmbal_tm_sched_parent
+            // The parent for the sub_term scheduler is the PON scheduler in the downstream
+            bcmbal_tm_sched_parent tm_sched_parent = { };
+            tm_sched_parent.presence_mask |= (BCMBAL_TM_SCHED_PARENT_ID_SCHED_ID);
+            tm_sched_parent.sched_id = get_default_tm_sched_id(intf_id, downstream);
+            tm_sched_parent.presence_mask |= (BCMBAL_TM_SCHED_PARENT_ID_PRIORITY);
+            tm_sched_parent.priority = 1; // TODO: Hardcoded priority as 1
+            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, sched_parent, tm_sched_parent);
+
+            // num_priorities: Max number of strict priority scheduling elements
+            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, num_priorities, 8); // TODO: hardcoded 8 priorities.
+
+            // bcmbal_tm_shaping
+            if (tf_sh_info.cir() >= 0 && tf_sh_info.pir() > 0) {
+                bcmbal_tm_shaping rate = {};
+                uint32_t cir = tf_sh_info.cir();
+                uint32_t pir = tf_sh_info.pir();
+                uint32_t burst = tf_sh_info.pbs();
+                BCM_LOG(INFO, openolt_log_id, "applying traffic shaping in DL cir=%u, pir=%u, burst=%u\n",
+                   cir, pir, burst);
+                rate.presence_mask = BCMBAL_TM_SHAPING_ID_NONE;
+                rate.presence_mask |= BCMBAL_TM_SHAPING_ID_PIR;
+                rate.presence_mask |= BCMBAL_TM_SHAPING_ID_BURST;
+                // FIXME: Setting CIR, results in BAL throwing error 'tm_sched minimum rate is not supported yet'
+                // rate.cir = cir;
+                rate.pir = pir;
+                rate.burst = burst;
+
+                BCMBAL_CFG_PROP_SET(&cfg, tm_queue, rate, rate);
+            }
+
+            // creation_mode
+            // BCMBAL_CFG_PROP_SET(&cfg, tm_sched, creation_mode, BCMBAL_TM_CREATION_MODE_MANUAL);
+        }
+
+        err = bcmbal_cfg_set(DEFAULT_ATERM_ID, &(cfg.hdr));
+        if (err) {
+            BCM_LOG(ERROR, openolt_log_id, "Failed to create downstream subscriber scheduler, id %d, intf_id %d, \
+                    onu_id %d, uni_id %d, port_no %u\n", key.id, intf_id, onu_id,uni_id,port_no);
+            return err;
+        }
+        BCM_LOG(INFO, openolt_log_id, "Create downstream subscriber sched, id %d, intf_id %d, onu_id %d, \
+                uni_id %d, port_no %u\n", key.id,intf_id,onu_id,uni_id,port_no);
+
+    } else { //upstream
+        bcmbal_tm_sched_cfg cfg;
+        bcmbal_tm_sched_key key = { };
+
+        key.id = get_tm_sched_id(intf_id, onu_id, uni_id, direction);
         key.dir = BCMBAL_TM_SCHED_DIR_US;
 
         BCMBAL_CFG_INIT(&cfg, tm_sched, key);
 
         {
-            bcmbal_tm_sched_owner val = { };
+            // bcmbal_tm_sched_owner: AGG PORT
+            bcmbal_tm_sched_owner tm_sched_owner = { };
+            tm_sched_owner.type = BCMBAL_TM_SCHED_OWNER_TYPE_AGG_PORT;
+            tm_sched_owner.u.agg_port.presence_mask |= bcmbal_tm_sched_owner_agg_port_id_all;
+            tm_sched_owner.u.agg_port.intf_id = intf_id;
+            tm_sched_owner.u.agg_port.sub_term_id = onu_id;
+            tm_sched_owner.u.agg_port.agg_port_id = alloc_id;
+            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, owner, tm_sched_owner);
 
-            val.type = BCMBAL_TM_SCHED_OWNER_TYPE_AGG_PORT;
-            BCMBAL_ATTRIBUTE_PROP_SET(&val.u.agg_port, tm_sched_owner_agg_port, intf_id, (bcmbal_intf_id) intf_id);
-            BCMBAL_ATTRIBUTE_PROP_SET(&val.u.agg_port, tm_sched_owner_agg_port, sub_term_id, (bcmbal_sub_id) onu_id);
-            BCMBAL_ATTRIBUTE_PROP_SET(&val.u.agg_port, tm_sched_owner_agg_port, agg_port_id, (bcmbal_aggregation_port_id) alloc_id);
+            // bcmbal_tm_shaping
+            if (tf_sh_info.cir() >= 0 && tf_sh_info.pir() > 0) {
+                bcmbal_tm_shaping rate = {};
+                uint32_t cir = tf_sh_info.cir();
+                uint32_t pir = tf_sh_info.pir();
+                uint32_t burst = tf_sh_info.pbs();
+                BCM_LOG(INFO, openolt_log_id, "applying traffic shaping in UL cir=%u, pir=%u, burst=%u\n",
+                   cir, pir, burst);
+                rate.presence_mask = BCMBAL_TM_SHAPING_ID_ALL;
+                rate.cir = cir;
+                rate.pir = pir;
+                rate.burst = burst;
 
-            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, owner, val);
-
-        }
-        bcmbal_tm_shaping rate = {};
-        if (tf_sh_info.cir() >= 0 && tf_sh_info.pir() > 0) {
-            uint32_t cir = tf_sh_info.cir();
-            uint32_t pir = tf_sh_info.pir();
-            uint32_t burst = tf_sh_info.pbs();
-            BCM_LOG(INFO, openolt_log_id, "applying traffic shaping in UL cir=%u, pir=%u, burst=%u\n",
-               cir, pir, burst);
-            rate.presence_mask = BCMBAL_TM_SHAPING_ID_ALL;
-            rate.cir = cir;
-            rate.pir = pir;
-            rate.burst = burst;
-
-            BCMBAL_CFG_PROP_SET(&cfg, tm_sched, rate, rate);
+                BCMBAL_CFG_PROP_SET(&cfg, tm_sched, rate, rate);
+            }
         }
 
         err = bcmbal_cfg_set(DEFAULT_ATERM_ID, &(cfg.hdr));
         if (err) {
-            BCM_LOG(ERROR, openolt_log_id, "Failed to create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alloc_id %d\n",
-                    key.id, intf_id, onu_id,uni_id,port_no,alloc_id);
-            return bcm_to_grpc_err(err, "Failed to create upstream DBA sched");
+            BCM_LOG(ERROR, openolt_log_id, "Failed to create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d,\
+                    port_no %u, alloc_id %d\n", key.id, intf_id, onu_id,uni_id,port_no,alloc_id);
+            return err;
         }
-        BCM_LOG(INFO, openolt_log_id, "Create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alloc_id %d\n",
-                key.id,intf_id,onu_id,uni_id,port_no,alloc_id);
+        BCM_LOG(INFO, openolt_log_id, "Create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, \
+                alloc_id %d\n", key.id,intf_id,onu_id,uni_id,port_no,alloc_id);
     }
 
-    return Status::OK;
-
+    return BCM_ERR_OK;
 }
 
-Status CreateTconts_(const openolt::Tconts *tconts) {
-    uint32_t intf_id = tconts->intf_id();
-    uint32_t onu_id = tconts->onu_id();
-    uint32_t uni_id = tconts->uni_id();
-    uint32_t port_no = tconts->port_no();
+Status CreateTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+    uint32_t intf_id = traffic_scheds->intf_id();
+    uint32_t onu_id = traffic_scheds->onu_id();
+    uint32_t uni_id = traffic_scheds->uni_id();
+    uint32_t port_no = traffic_scheds->port_no();
     std::string direction;
     unsigned int alloc_id;
-    openolt::Scheduler scheduler;
-    openolt::AdditionalBW additional_bw;
+    tech_profile::SchedulerConfig sched_config;
+    tech_profile::AdditionalBW additional_bw;
     uint32_t priority;
     uint32_t weight;
-    openolt::SchedulingPolicy sched_policy;
-    openolt::TrafficShapingInfo traffic_shaping_info;
+    tech_profile::SchedulingPolicy sched_policy;
+    tech_profile::TrafficShapingInfo traffic_shaping_info;
+    bcmos_errno err;
 
-    for (int i = 0; i < tconts->tconts_size(); i++) {
-        openolt::Tcont tcont = tconts->tconts(i);
-        if (tcont.direction() == openolt::Direction::UPSTREAM) {
-            direction = "upstream";
-        } else if (tcont.direction() == openolt::Direction::DOWNSTREAM) {
-            direction = "downstream";
+    for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
+        tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+        if (traffic_sched.direction() == tech_profile::Direction::UPSTREAM) {
+            direction = upstream;
+        } else if (traffic_sched.direction() == tech_profile::Direction::DOWNSTREAM) {
+            direction = downstream;
         }
         else {
-            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", tcont.direction());
+            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", traffic_sched.direction());
             return Status::CANCELLED;
         }
-        alloc_id = tcont.alloc_id();
-        scheduler = tcont.scheduler();
-        additional_bw = scheduler.additional_bw();
-        priority = scheduler.priority();
-        weight = scheduler.weight();
-        sched_policy = scheduler.sched_policy();
-        traffic_shaping_info = tcont.traffic_shaping_info();
-        SchedAdd_(direction, intf_id, onu_id, uni_id, port_no, alloc_id, additional_bw, weight, priority, sched_policy, traffic_shaping_info);
+        alloc_id = traffic_sched.alloc_id();
+        sched_config = traffic_sched.scheduler();
+        additional_bw = sched_config.additional_bw();
+        priority = sched_config.priority();
+        weight = sched_config.weight();
+        sched_policy = sched_config.sched_policy();
+        traffic_shaping_info = traffic_sched.traffic_shaping_info();
+        err =  CreateSched(direction, intf_id, onu_id, uni_id, port_no, alloc_id, additional_bw, weight, priority,
+                           sched_policy, traffic_shaping_info);
+        if (err) {
+            return bcm_to_grpc_err(err, "Failed to create scheduler");
+        }
     }
     return Status::OK;
 }
 
-Status RemoveTconts_(const openolt::Tconts *tconts) {
-    uint32_t intf_id = tconts->intf_id();
-    uint32_t onu_id = tconts->onu_id();
-    uint32_t uni_id = tconts->uni_id();
-    uint32_t port_no = tconts->port_no();
-    std::string direction;
-    unsigned int alloc_id;
-
-    for (int i = 0; i < tconts->tconts_size(); i++) {
-        openolt::Tcont tcont = tconts->tconts(i);
-        if (tcont.direction() == openolt::Direction::UPSTREAM) {
-            direction = "upstream";
-        } else if (tcont.direction() == openolt::Direction::DOWNSTREAM) {
-            direction = "downstream";
-        }
-        else {
-            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", tcont.direction());
-            return Status::CANCELLED;
-        }
-        alloc_id = tcont.alloc_id();
-        SchedRemove_(direction, intf_id, onu_id, uni_id, port_no, alloc_id);
-    }
-    return Status::OK;
-}
-
-Status SchedRemove_(std::string direction, int intf_id, int onu_id, int uni_id, uint32_t port_no, int alloc_id) {
+bcmos_errno RemoveSched(int intf_id, int onu_id, int uni_id, std::string direction) {
 
     bcmos_errno err;
 
+    bcmbal_tm_sched_cfg tm_cfg_us;
+    bcmbal_tm_sched_key tm_key_us = { };
 
-    if (direction == "upstream") {
-        // DBA sched
-        bcmbal_tm_sched_cfg tm_cfg_us;
-        bcmbal_tm_sched_key tm_key_us = { };
-
-        tm_key_us.id = alloc_id;
+    if (is_tm_sched_id_present(intf_id, onu_id, uni_id, direction)) {
+        tm_key_us.id = get_tm_sched_id(intf_id, onu_id, uni_id, direction);
+    } else {
+        BCM_LOG(INFO, openolt_log_id, "schduler not present in %s\n", direction.c_str());
+        return BCM_ERR_OK;
+    }
+    if (direction == upstream) {
         tm_key_us.dir = BCMBAL_TM_SCHED_DIR_US;
+    } else {
+        tm_key_us.dir = BCMBAL_TM_SCHED_DIR_DS;
+    }
 
-        BCMBAL_CFG_INIT(&tm_cfg_us, tm_sched, tm_key_us);
+    BCMBAL_CFG_INIT(&tm_cfg_us, tm_sched, tm_key_us);
 
-        err = bcmbal_cfg_clear(DEFAULT_ATERM_ID, &(tm_cfg_us.hdr));
-        if (err) {
-            BCM_LOG(ERROR, openolt_log_id, "Failed to remove upstream DBA sched, id %d, intf_id %d, onu_id %d\n",
-                tm_key_us.id, intf_id, onu_id);
-            return Status(grpc::StatusCode::INTERNAL, "Failed to remove upstream DBA sched");
+    err = bcmbal_cfg_clear(DEFAULT_ATERM_ID, &(tm_cfg_us.hdr));
+    if (err) {
+        BCM_LOG(ERROR, openolt_log_id, "Failed to remove scheduler sched, direction = %s, id %d, intf_id %d, onu_id %d\n", \
+                direction.c_str(), tm_key_us.id, intf_id, onu_id);
+        return err;
+    }
+
+    free_tm_sched_id(intf_id, onu_id, uni_id, direction);
+
+    BCM_LOG(INFO, openolt_log_id, "Removed sched, direction = %s, id %d, intf_id %d, onu_id %d\n", \
+            direction.c_str(), tm_key_us.id, intf_id, onu_id);
+
+    return BCM_ERR_OK;
+}
+
+Status RemoveTrafficSchedulers_(const tech_profile::TrafficSchedulers *traffic_scheds) {
+    uint32_t intf_id = traffic_scheds->intf_id();
+    uint32_t onu_id = traffic_scheds->onu_id();
+    uint32_t uni_id = traffic_scheds->uni_id();
+    std::string direction;
+    bcmos_errno err;
+
+    for (int i = 0; i < traffic_scheds->traffic_scheds_size(); i++) {
+        tech_profile::TrafficScheduler traffic_sched = traffic_scheds->traffic_scheds(i);
+        if (traffic_sched.direction() == tech_profile::Direction::UPSTREAM) {
+            direction = upstream;
+        } else if (traffic_sched.direction() == tech_profile::Direction::DOWNSTREAM) {
+            direction = downstream;
         }
+        else {
+            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", traffic_sched.direction());
+            return Status::CANCELLED;
+        }
+        err = RemoveSched(intf_id, onu_id, uni_id, direction);
+        if (err) {
+            return bcm_to_grpc_err(err, "error-removing-traffic-scheduler");
+        }
+    }
+    return Status::OK;
+}
 
-        BCM_LOG(INFO, openolt_log_id, "Remove upstream DBA sched, id %d, intf_id %d, onu_id %d\n",
-            tm_key_us.id, intf_id, onu_id);
+bcmos_errno CreateQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t priority,
+                        uint32_t gemport_id) {
+    bcmos_errno err;
+    bcmbal_tm_queue_cfg cfg;
+    bcmbal_tm_queue_key key = { };
+    if (direction == downstream) {
+        // In the downstream, the queues are on the 'sub term' scheduler
+        // There is one queue per gem port
+        key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
+        key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction);
+        key.id = get_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, direction);
 
-    } else if (direction == "downstream") {
-	    // Queue
+    } else {
+        queue_map_key_tuple map_key(access_intf_id, onu_id, uni_id, gemport_id, direction);
+        if (queue_map.count(map_key) > 0) {
+            BCM_LOG(INFO, openolt_log_id, "upstream queue exists for intf_id %d, onu_id %d, uni_id %d\n. Not re-creating", \
+                    access_intf_id, onu_id, uni_id); 
+            return BCM_ERR_OK;
+        }
+        key.sched_dir = BCMBAL_TM_SCHED_DIR_US;
+        key.sched_id = get_default_tm_sched_id(nni_intf_id, direction);
+        if (priority > 7) {
+            return BCM_ERR_RANGE;
+        }
+        // There are 8 queues (one per p-bit)
+        key.id = us_fixed_queue_id_list[priority];
+        update_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, direction, key.id);
+        // FIXME: The upstream queues have to be created once only.
+        // The upstream queues on the NNI scheduler are shared by all subscribers.
+        // When the first scheduler comes in, the queues get created, and are re-used by all others.
+        // Also, these queues should be present until the last subscriber exits the system.
+        // One solution is to have these queues always, i.e., create it as soon as OLT is enabled.
+    }
+    BCMBAL_CFG_INIT(&cfg, tm_queue, key);
 
-	    bcmbal_tm_queue_cfg queue_cfg;
-	    bcmbal_tm_queue_key queue_key = { };
-	    queue_key.sched_id = mk_sched_id(intf_id, onu_id, "downstream");
-	    queue_key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
-	    queue_key.id = mk_queue_id(intf_id, onu_id, uni_id, port_no, alloc_id);
+    BCMBAL_CFG_PROP_SET(&cfg, tm_queue, priority, priority);
 
-	    BCMBAL_CFG_INIT(&queue_cfg, tm_queue, queue_key);
+    // BCMBAL_CFG_PROP_SET(&cfg, tm_queue, creation_mode, BCMBAL_TM_CREATION_MODE_MANUAL);
 
-	    err = bcmbal_cfg_clear(DEFAULT_ATERM_ID, &(queue_cfg.hdr));
-	    if (err) {
-		    BCM_LOG(ERROR, openolt_log_id, "Failed to remove downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
-				    queue_key.id, queue_key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
-		    return Status(grpc::StatusCode::INTERNAL, "Failed to remove downstream tm queue");
-	    }
 
-        bcmos_fastlock_lock(&flow_lock);
-        port_to_alloc.erase(port_no);
-        bcmos_fastlock_unlock(&flow_lock, 0);
+    err = bcmbal_cfg_set(DEFAULT_ATERM_ID, &cfg.hdr);
+    if (err) {
+        BCM_LOG(ERROR, openolt_log_id, "Failed to create subscriber tm queue, direction = %s, id %d, sched_id %d, \
+                intf_id %d, onu_id %d, uni_id %d\n", \
+                direction.c_str(), key.id, key.sched_id, access_intf_id, onu_id, uni_id);
+        return err;
+    }
 
-	    BCM_LOG(INFO, openolt_log_id, "Remove upstream DBA sched, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
-			    queue_key.id, queue_key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
+    BCM_LOG(INFO, openolt_log_id, "Created tm_queue, direction %s, id %d, intf_id %d, onu_id %d, uni_id %d", \
+            direction.c_str(), key.id, access_intf_id, onu_id, uni_id);
+
+    return BCM_ERR_OK;
+
+}
+
+Status CreateTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+    uint32_t intf_id = traffic_queues->intf_id();
+    uint32_t onu_id = traffic_queues->onu_id();
+    uint32_t uni_id = traffic_queues->uni_id();
+    std::string direction;
+    unsigned int alloc_id;
+    bcmos_errno err;
+
+    for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
+        tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+        if (traffic_queue.direction() == tech_profile::Direction::UPSTREAM) {
+            direction = upstream;
+        } else if (traffic_queue.direction() == tech_profile::Direction::DOWNSTREAM) {
+            direction = downstream;
+        }
+        else {
+            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", traffic_queue.direction());
+            return Status::CANCELLED;
+        }
+        err = CreateQueue(direction, intf_id, onu_id, uni_id, traffic_queue.priority(), traffic_queue.gemport_id());
+        if (err) {
+            return bcm_to_grpc_err(err, "Failed to create queue");
+        }
+    }
+    return Status::OK;
+}
+
+
+bcmos_errno RemoveQueue(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t priority,
+                        uint32_t gemport_id) {
+    bcmbal_tm_queue_cfg queue_cfg;
+    bcmbal_tm_queue_key queue_key = { };
+    bcmos_errno err;
+
+    if (direction == downstream) {
+        queue_key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
+        if (is_tm_queue_id_present(access_intf_id, onu_id, uni_id, gemport_id, direction) && \
+            is_tm_sched_id_present(access_intf_id, onu_id, uni_id, direction)) {
+            queue_key.sched_id = get_tm_sched_id(access_intf_id, onu_id, uni_id, direction);
+            queue_key.id = get_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, direction);
+        } else {
+            BCM_LOG(INFO, openolt_log_id, "queue not present in DS. Not clearing");
+            return BCM_ERR_OK;
+        }
+    } else {
+        free_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, direction);
+        // In the upstream we use pre-created queues on the NNI scheduler that are used by all subscribers.
+        // They should not be removed. So, lets return OK.
+        return BCM_ERR_OK;
+    }
+
+    BCMBAL_CFG_INIT(&queue_cfg, tm_queue, queue_key);
+
+    err = bcmbal_cfg_clear(DEFAULT_ATERM_ID, &(queue_cfg.hdr));
+
+    if (err) {
+        BCM_LOG(ERROR, openolt_log_id, "Failed to remove queue, direction = %s, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d\n",
+                direction.c_str(), queue_key.id, queue_key.sched_id, access_intf_id, onu_id, uni_id);
+        return err;
+    }
+
+    free_tm_queue_id(access_intf_id, onu_id, uni_id, gemport_id, direction);
+
+    return BCM_ERR_OK;
+}
+
+Status RemoveTrafficQueues_(const tech_profile::TrafficQueues *traffic_queues) {
+    uint32_t intf_id = traffic_queues->intf_id();
+    uint32_t onu_id = traffic_queues->onu_id();
+    uint32_t uni_id = traffic_queues->uni_id();
+    uint32_t port_no = traffic_queues->port_no();
+    std::string direction;
+    unsigned int alloc_id;
+    bcmos_errno err;
+
+    for (int i = 0; i < traffic_queues->traffic_queues_size(); i++) {
+        tech_profile::TrafficQueue traffic_queue = traffic_queues->traffic_queues(i);
+        if (traffic_queue.direction() == tech_profile::Direction::UPSTREAM) {
+            direction = upstream;
+        } else if (traffic_queue.direction() == tech_profile::Direction::DOWNSTREAM) {
+            direction = downstream;
+        } else {
+            BCM_LOG(ERROR, openolt_log_id, "direction-not-supported %d", traffic_queue.direction());
+            return Status::CANCELLED;
+        }
+        err = RemoveQueue(direction, intf_id, onu_id, uni_id, traffic_queue.priority(), traffic_queue.gemport_id());
+        if (err) {
+            return bcm_to_grpc_err(err, "Failed to remove queue");
+        }
     }
 
     return Status::OK;