VOL-1023 - Supporting multiple UNI per ONU

To support multiple UNIs on a given ONU, OpenOLT Driver is updated to support
an abstract opaque cookie that is registered with all controller-bound trap
flows. This cookie is supplied back to adapter during all packet indications.
OpenOLT adapter will use this cookie to track the OF logical port that should
be used to reflect the origin of packet to OF controller.

For outbound packets from controller, the system needs to inject frames
directly onto a GEM port rather than towards "any GEM" associated with the ONU.
This is necessary to send to various UNIs on one ONU. This is achieved by
registering the OF logical port number with the flow and gemport(s) associated
with the port. The OF Out packet will include the logical port number and
hence can be tracked back to a registered GEM for that UNI.

Change-Id: I1085a4b44d12b9402a431c41083e3a5cdb4764b9
diff --git a/agent/src/core.cc b/agent/src/core.cc
index c7730c0..2e01bc7 100644
--- a/agent/src/core.cc
+++ b/agent/src/core.cc
@@ -73,10 +73,19 @@
 
 State state;
 
-Status SchedAdd_(std::string direction, uint32_t access_intf_id, uint32_t onu_id,
+static std::map<uint32_t, uint32_t> flowid_to_port; // For mapping upstream flows to logical ports
+static std::map<uint32_t, uint32_t> flowid_to_gemport; // For mapping downstream flows into gemports
+static std::map<uint32_t, std::set<uint32_t> > port_to_flows; // For mapping logical ports to downstream flows
+static std::map<uint32_t, uint32_t> port_to_alloc;
+static bcmos_fastlock flow_lock;
+
+#define MIN_ALLOC_ID_GPON 256
+#define MIN_ALLOC_ID_XGSPON 1024
+
+Status SchedAdd_(std::string direction, uint32_t access_intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no, 
                  uint32_t alloc_id, openolt::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
                  openolt::SchedulingPolicy sched_policy);
-static Status SchedRemove_(std::string direction, int intf_id, int onu_id, int alloc_id);
+static Status SchedRemove_(std::string direction, int intf_id, int onu_id, int uni_id, uint32_t port_no, int alloc_id);
 
 static inline int mk_sched_id(int intf_id, int onu_id, std::string direction) {
     if (direction.compare(upstream) == 0) {
@@ -90,8 +99,12 @@
     }
 }
 
-static inline int mk_queue_id(int pon_intf_id, int onu_id) {
-    return tm_queue_id_start + pon_intf_id * 32 + onu_id;
+static inline int mk_queue_id(int pon_intf_id, int onu_id, int uni_id, uint32_t port_no, uint32_t alloc_id) {
+    (void) uni_id; // unused
+    if(port_no == 0) return tm_queue_id_start + pon_intf_id * 32 + onu_id; // old style
+
+    int start = intf_technologies[pon_intf_id] == "gpon" ? MIN_ALLOC_ID_GPON : MIN_ALLOC_ID_XGSPON; // offset built into all alloc Ids
+    return tm_queue_id_start + alloc_id - start; // use alloc_id as a unique queue id. It is unique per UNI, and in the range of IDs supported by BAL
 }
 
 static inline int mk_agg_port_id(int intf_id, int onu_id) {
@@ -112,7 +125,7 @@
     if (board_technology == "xgspon") {
         device_info->set_onu_id_start(1);
         device_info->set_onu_id_end(255);
-        device_info->set_alloc_id_start(1024);
+        device_info->set_alloc_id_start(MIN_ALLOC_ID_XGSPON);
         device_info->set_alloc_id_end(16383);
         device_info->set_gemport_id_start(1024);
         device_info->set_gemport_id_end(65535);
@@ -122,7 +135,7 @@
     else if (board_technology == "gpon") {
         device_info->set_onu_id_start(1);
         device_info->set_onu_id_end(127);
-        device_info->set_alloc_id_start(256);
+        device_info->set_alloc_id_start(MIN_ALLOC_ID_GPON);
         device_info->set_alloc_id_end(767);
         device_info->set_gemport_id_start(256);
         device_info->set_gemport_id_end(4095);
@@ -218,6 +231,8 @@
 
         vendor_init();
         bcmbal_init(argc, argv, NULL);
+        bcmos_fastlock_init(&flow_lock, 0);
+
 
         BCM_LOG(INFO, openolt_log_id, "Enable OLT - %s-%s\n", VENDOR_ID, MODEL_ID);
 
@@ -649,14 +664,48 @@
     return Status::OK;
 }
 
-Status OnuPacketOut_(uint32_t intf_id, uint32_t onu_id, const std::string pkt) {
+Status OnuPacketOut_(uint32_t intf_id, uint32_t onu_id, uint32_t port_no, const std::string pkt) {
     bcmos_errno err = BCM_ERR_OK;
     bcmbal_dest proxy_pkt_dest;
     bcmbal_u8_list_u32_max_2048 buf;
 
-    proxy_pkt_dest.type = BCMBAL_DEST_TYPE_SUB_TERM,
-    proxy_pkt_dest.u.sub_term.sub_term_id = onu_id;
-    proxy_pkt_dest.u.sub_term.intf_id = intf_id;
+    if (port_no > 0) {
+        bool found = false;
+        uint32_t gemport_id;
+
+        bcmos_fastlock_lock(&flow_lock);
+        // Map the port_no to one of the flows that owns it to find a gemport_id for that flow.
+        // Pick any flow that is mapped with the same port_no.
+        std::map<uint32_t, std::set<uint32_t> >::const_iterator it = port_to_flows.find(port_no);
+        if (it != port_to_flows.end() && !it->second.empty()) {
+            uint32_t flow_id = *(it->second.begin()); // Pick any flow_id out of the bag set
+            std::map<uint32_t, uint32_t>::const_iterator fit = flowid_to_gemport.find(flow_id);
+            if (fit != flowid_to_gemport.end()) {
+                found = true;
+                gemport_id = fit->second;
+            }
+        }
+        bcmos_fastlock_unlock(&flow_lock, 0);
+
+        if (!found) {
+            BCM_LOG(ERROR, openolt_log_id, "Packet out failed to find destination for ONU %d port_no %u on PON %d\n",
+                onu_id, port_no, intf_id);
+            return grpc::Status(grpc::StatusCode::NOT_FOUND, "no flow for port_no");
+        }
+
+        proxy_pkt_dest.type = BCMBAL_DEST_TYPE_SVC_PORT;
+        proxy_pkt_dest.u.svc_port.svc_port_id = gemport_id;
+        proxy_pkt_dest.u.svc_port.intf_id = intf_id;
+        BCM_LOG(INFO, openolt_log_id, "Packet out of length %d sent to gemport %d on pon %d port_no %u\n",
+            pkt.size(), gemport_id, intf_id, port_no);
+    }
+    else {
+        proxy_pkt_dest.type = BCMBAL_DEST_TYPE_SUB_TERM,
+        proxy_pkt_dest.u.sub_term.sub_term_id = onu_id;
+        proxy_pkt_dest.u.sub_term.intf_id = intf_id;
+        BCM_LOG(INFO, openolt_log_id, "Packet out of length %d sent to onu %d on pon %d\n",
+            pkt.size(), onu_id, intf_id);
+    }
 
     buf.len = pkt.size();
     buf.val = (uint8_t *)malloc((buf.len)*sizeof(uint8_t));
@@ -664,9 +713,6 @@
 
     err = bcmbal_pkt_send(0, proxy_pkt_dest, (const char *)(buf.val), buf.len);
 
-    BCM_LOG(INFO, openolt_log_id, "Packet out of length %d sent to ONU %d on PON %d\n",
-        buf.len, onu_id, intf_id);
-
     free(buf.val);
 
     return Status::OK;
@@ -694,17 +740,29 @@
     return Status::OK;
 }
 
-Status FlowAdd_(int32_t access_intf_id, int32_t onu_id,
+uint32_t GetPortNum_(uint32_t flow_id)
+{
+    bcmos_fastlock_lock(&flow_lock);
+    uint32_t port_no = 0;
+    std::map<uint32_t, uint32_t >::const_iterator it = flowid_to_port.find(flow_id);
+    if (it != flowid_to_port.end()) {
+        port_no = it->second;
+    }
+    bcmos_fastlock_unlock(&flow_lock, 0);
+    return port_no;
+}
+
+Status FlowAdd_(int32_t access_intf_id, int32_t onu_id, int32_t uni_id, uint32_t port_no,
                 uint32_t flow_id, const std::string flow_type,
                 int32_t alloc_id, int32_t network_intf_id,
                 int32_t gemport_id, const ::openolt::Classifier& classifier,
-                const ::openolt::Action& action, int32_t priority_value) {
+                const ::openolt::Action& action, int32_t priority_value, uint64_t cookie) {
     bcmos_errno err;
     bcmbal_flow_cfg cfg;
     bcmbal_flow_key key = { };
 
-    BCM_LOG(INFO, openolt_log_id, "flow add - intf_id %d, onu_id %d, flow_id %d, flow_type %s, gemport_id %d, network_intf_id %d\n",
-        access_intf_id, onu_id, flow_id, flow_type.c_str(), gemport_id, network_intf_id);
+    BCM_LOG(INFO, openolt_log_id, "flow add - intf_id %d, onu_id %d, uni_id %d, port_no %u, flow_id %d, flow_type %s, gemport_id %d, network_intf_id %d, cookie %u\n",
+        access_intf_id, onu_id, uni_id, port_no, flow_id, flow_type.c_str(), gemport_id, network_intf_id, cookie);
 
     key.flow_id = flow_id;
     if (flow_type.compare("upstream") == 0 ) {
@@ -719,6 +777,8 @@
     BCMBAL_CFG_INIT(&cfg, flow, key);
 
     BCMBAL_CFG_PROP_SET(&cfg, flow, admin_state, BCMBAL_STATE_UP);
+    BCMBAL_CFG_PROP_SET(&cfg, flow, cookie, cookie);
+
     if (access_intf_id >= 0) {
         BCMBAL_CFG_PROP_SET(&cfg, flow, access_int_id, access_intf_id);
     }
@@ -731,6 +791,18 @@
     if (gemport_id >= 0) {
         BCMBAL_CFG_PROP_SET(&cfg, flow, svc_port_id, gemport_id);
     }
+    if (gemport_id >= 0 && port_no != 0) {
+        bcmos_fastlock_lock(&flow_lock);
+        if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
+            port_to_flows[port_no].insert(key.flow_id);
+            flowid_to_gemport[key.flow_id] = gemport_id;
+        }
+        else
+        {
+            flowid_to_port[key.flow_id] = port_no;
+        }
+        bcmos_fastlock_unlock(&flow_lock, 0);
+    }
     if (priority_value >= 0) {
         BCMBAL_CFG_PROP_SET(&cfg, flow, priority, priority_value);
     }
@@ -860,7 +932,11 @@
         if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
             bcmbal_tm_queue_ref val = { };
             val.sched_id = mk_sched_id(access_intf_id, onu_id, "downstream");
-            val.queue_id = mk_queue_id(access_intf_id, onu_id);
+            uint32_t alloc_id;
+            bcmos_fastlock_lock(&flow_lock);
+            alloc_id = port_to_alloc[port_no];
+            bcmos_fastlock_unlock(&flow_lock, 0);
+            val.queue_id = mk_queue_id(access_intf_id, onu_id, uni_id, port_no, alloc_id);
             BCMBAL_CFG_PROP_SET(&cfg, flow, queue, val);
         } else if (key.flow_type == BCMBAL_FLOW_TYPE_UPSTREAM) {
             bcmbal_tm_sched_id val1;
@@ -904,6 +980,19 @@
         return bcm_to_grpc_err(BCM_ERR_PARM, "Invalid flow type");
     }
 
+    bcmos_fastlock_lock(&flow_lock);
+    uint32_t port_no = flowid_to_port[key.flow_id];
+    if (key.flow_type == BCMBAL_FLOW_TYPE_DOWNSTREAM) {
+        flowid_to_gemport.erase(key.flow_id);
+        port_to_flows[port_no].erase(key.flow_id);
+        if (port_to_flows[port_no].empty()) port_to_flows.erase(port_no);
+    }
+    else
+    {
+        flowid_to_port.erase(key.flow_id);
+    }
+    bcmos_fastlock_unlock(&flow_lock, 0);
+
     BCMBAL_CFG_INIT(&cfg, flow, key);
 
 
@@ -918,7 +1007,7 @@
     return Status::OK;
 }
 
-Status SchedAdd_(std::string direction, uint32_t intf_id, uint32_t onu_id,
+Status SchedAdd_(std::string direction, uint32_t intf_id, uint32_t onu_id, uint32_t uni_id, uint32_t port_no,
                  uint32_t alloc_id, openolt::AdditionalBW additional_bw, uint32_t weight, uint32_t priority,
                  openolt::SchedulingPolicy sched_policy) {
 
@@ -930,7 +1019,7 @@
         // Note: We use the default scheduler available in the DL.
         key.sched_id = mk_sched_id(intf_id, onu_id, direction);
         key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
-        key.id = mk_queue_id(intf_id, onu_id);
+        key.id = mk_queue_id(intf_id, onu_id, uni_id, port_no, alloc_id);
 
         BCMBAL_CFG_INIT(&cfg, tm_queue, key);
         //Queue must be set with either weight or priority, not both,
@@ -943,10 +1032,18 @@
         // TODO: Shaping parameters will be available after meter bands are supported.
         // TODO: The shaping parameters will be applied on the downstream queue on the PON default scheduler.
         if (err) {
-            BCM_LOG(ERROR, openolt_log_id, "Failed to create subscriber downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d\n",
-                    key.id, key.sched_id, intf_id, onu_id);
+            BCM_LOG(ERROR, openolt_log_id, "Failed to create subscriber downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
+                    key.id, key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
             return bcm_to_grpc_err(err, "Failed to create subscriber downstream tm queue");
         }
+
+        bcmos_fastlock_lock(&flow_lock);
+        port_to_alloc[port_no] = alloc_id;
+        bcmos_fastlock_unlock(&flow_lock, 0);
+
+        BCM_LOG(INFO, openolt_log_id, "Create downstream sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
+                key.id,intf_id,onu_id,uni_id,port_no,alloc_id);
+
     } else { //"upstream"
         bcmbal_tm_sched_cfg cfg;
         bcmbal_tm_sched_key key = { };
@@ -973,12 +1070,12 @@
 
         err = bcmbal_cfg_set(DEFAULT_ATERM_ID, &(cfg.hdr));
         if (err) {
-            BCM_LOG(ERROR, openolt_log_id, "Failed to create upstream DBA sched, id %d, intf_id %d, onu_id %d\n",
-                    key.id, intf_id,  onu_id);
+            BCM_LOG(ERROR, openolt_log_id, "Failed to create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alloc_id %d\n",
+                    key.id, intf_id, onu_id,uni_id,port_no,alloc_id);
             return bcm_to_grpc_err(err, "Failed to create upstream DBA sched");
         }
-        BCM_LOG(INFO, openolt_log_id, "Create upstream DBA sched, id %d, intf_id %d, onu_id %d\n",
-                key.id,intf_id,onu_id);
+        BCM_LOG(INFO, openolt_log_id, "Create upstream DBA sched, id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alloc_id %d\n",
+                key.id,intf_id,onu_id,uni_id,port_no,alloc_id);
     }
 
     return Status::OK;
@@ -988,6 +1085,8 @@
 Status CreateTconts_(const openolt::Tconts *tconts) {
     uint32_t intf_id = tconts->intf_id();
     uint32_t onu_id = tconts->onu_id();
+    uint32_t uni_id = tconts->uni_id();
+    uint32_t port_no = tconts->port_no();
     std::string direction;
     unsigned int alloc_id;
     openolt::Scheduler scheduler;
@@ -1014,7 +1113,7 @@
         weight = scheduler.weight();
         sched_policy = scheduler.sched_policy();
         // TODO: TrafficShapingInfo is not supported for now as meter band support is not there
-        SchedAdd_(direction, intf_id, onu_id, alloc_id, additional_bw, weight, priority, sched_policy);
+        SchedAdd_(direction, intf_id, onu_id, uni_id, port_no, alloc_id, additional_bw, weight, priority, sched_policy);
     }
     return Status::OK;
 }
@@ -1022,6 +1121,8 @@
 Status RemoveTconts_(const openolt::Tconts *tconts) {
     uint32_t intf_id = tconts->intf_id();
     uint32_t onu_id = tconts->onu_id();
+    uint32_t uni_id = tconts->uni_id();
+    uint32_t port_no = tconts->port_no();
     std::string direction;
     unsigned int alloc_id;
 
@@ -1037,12 +1138,12 @@
             return Status::CANCELLED;
         }
         alloc_id = tcont.alloc_id();
-        SchedRemove_(direction, intf_id, onu_id, alloc_id);
+        SchedRemove_(direction, intf_id, onu_id, uni_id, port_no, alloc_id);
     }
     return Status::OK;
 }
 
-Status SchedRemove_(std::string direction, int intf_id, int onu_id, int alloc_id) {
+Status SchedRemove_(std::string direction, int intf_id, int onu_id, int uni_id, uint32_t port_no, int alloc_id) {
 
     bcmos_errno err;
 
@@ -1074,19 +1175,23 @@
 	    bcmbal_tm_queue_key queue_key = { };
 	    queue_key.sched_id = mk_sched_id(intf_id, onu_id, "downstream");
 	    queue_key.sched_dir = BCMBAL_TM_SCHED_DIR_DS;
-	    queue_key.id = mk_queue_id(intf_id, onu_id);;
+	    queue_key.id = mk_queue_id(intf_id, onu_id, uni_id, port_no, alloc_id);
 
 	    BCMBAL_CFG_INIT(&queue_cfg, tm_queue, queue_key);
 
 	    err = bcmbal_cfg_clear(DEFAULT_ATERM_ID, &(queue_cfg.hdr));
 	    if (err) {
-		    BCM_LOG(ERROR, openolt_log_id, "Failed to remove downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d\n",
-				    queue_key.id, queue_key.sched_id, intf_id, onu_id);
+		    BCM_LOG(ERROR, openolt_log_id, "Failed to remove downstream tm queue, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
+				    queue_key.id, queue_key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
 		    return Status(grpc::StatusCode::INTERNAL, "Failed to remove downstream tm queue");
 	    }
 
-	    BCM_LOG(INFO, openolt_log_id, "Remove upstream DBA sched, id %d, sched_id %d, intf_id %d, onu_id %d\n",
-			    queue_key.id, queue_key.sched_id, intf_id, onu_id);
+        bcmos_fastlock_lock(&flow_lock);
+        port_to_alloc.erase(port_no);
+        bcmos_fastlock_unlock(&flow_lock, 0);
+
+	    BCM_LOG(INFO, openolt_log_id, "Remove upstream DBA sched, id %d, sched_id %d, intf_id %d, onu_id %d, uni_id %d, port_no %u, alt_id %d\n",
+			    queue_key.id, queue_key.sched_id, intf_id, onu_id, uni_id, port_no, alloc_id);
     }
 
     return Status::OK;