Port statistics collection

Change-Id: I15ad34d01267673cb64077bd89da0a8566531492
diff --git a/Makefile b/Makefile
index 770f9a3..a7e7477 100644
--- a/Makefile
+++ b/Makefile
@@ -122,6 +122,7 @@
 	-I$(BAL_DIR)/bal_release/src/common/os_abstraction/posix \
 	-I$(BAL_DIR)/bal_release/src/common/config \
 	-I$(BAL_DIR)/bal_release/src/core/platform \
+	-I$(BAL_DIR)/bal_release/src/core/main \
 	-I$(BAL_DIR)/bal_release/src/common/include \
 	-I$(BAL_DIR)/bal_release/src/lib/libbalapi \
 	-I$(BAL_DIR)/bal_release/3rdparty/maple/sdk/host_driver/utils \
diff --git a/protos/openolt.proto b/protos/openolt.proto
index 069a7bc..3e72181 100644
--- a/protos/openolt.proto
+++ b/protos/openolt.proto
@@ -60,6 +60,7 @@
         };
     }
 
+
     rpc EnableIndication(Empty) returns (stream Indication) {}
 }
 
@@ -72,6 +73,8 @@
         OnuIndication onu_ind = 5;
         OmciIndication omci_ind = 6;
         PacketIndication pkt_ind = 7;
+        PortStatistics port_stats = 8;
+        FlowStatistics flow_stats = 9;
     }
 }
 
@@ -193,4 +196,32 @@
     bytes vendor_specific = 2;
 }
 
+message PortStatistics {
+    fixed32 intf_id = 1;
+    fixed64 rx_bytes = 2;
+    fixed64 rx_packets = 3;
+    fixed64 rx_ucast_packets = 4;
+    fixed64 rx_mcast_packets = 5;
+    fixed64 rx_bcast_packets = 6;
+    fixed64 rx_error_packets = 7;
+    fixed64 tx_bytes = 8;
+    fixed64 tx_packets = 9;
+    fixed64 tx_ucast_packets = 10;
+    fixed64 tx_mcast_packets = 11;
+    fixed64 tx_bcast_packets = 12;
+    fixed64 tx_error_packets = 13;
+    fixed64 rx_crc_errors = 14;
+    fixed64 bip_errors = 15;
+    fixed32 timestamp = 16;
+}
+
+message FlowStatistics {
+    fixed32 flow_id = 1;
+    fixed64 rx_bytes = 2;
+    fixed64 rx_packets = 3;
+    fixed64 tx_bytes = 8;
+    fixed64 tx_packets = 9;
+    fixed32 timestamp = 16;
+}
+
 message Empty {}
diff --git a/src/core.cc b/src/core.cc
index c763072..b5fe90d 100644
--- a/src/core.cc
+++ b/src/core.cc
@@ -25,6 +25,7 @@
 
 #include "core.h"
 #include "indications.h"
+#include "stats_collection.h"
 
 extern "C"
 {
@@ -403,6 +404,8 @@
         return Status(grpc::StatusCode::INTERNAL, "flow add failed");
     }
 
+    register_new_flow(key);
+
     return Status::OK;
 }
 
@@ -421,12 +424,11 @@
 
         val.type = BCMBAL_TM_SCHED_OWNER_TYPE_AGG_PORT;
         val.u.agg_port.intf_id = (bcmbal_intf_id) intf_id;
-	val.u.agg_port.presence_mask = val.u.agg_port.presence_mask | BCMBAL_TM_SCHED_OWNER_AGG_PORT_ID_INTF_ID;
+	    val.u.agg_port.presence_mask = val.u.agg_port.presence_mask | BCMBAL_TM_SCHED_OWNER_AGG_PORT_ID_INTF_ID;
         val.u.agg_port.sub_term_id = (bcmbal_sub_id) onu_id;
         val.u.agg_port.presence_mask = val.u.agg_port.presence_mask | BCMBAL_TM_SCHED_OWNER_AGG_PORT_ID_SUB_TERM_ID;
-
-	val.u.agg_port.agg_port_id = (bcmbal_aggregation_port_id) agg_port_id;
-	val.u.agg_port.presence_mask = val.u.agg_port.presence_mask | BCMBAL_TM_SCHED_OWNER_AGG_PORT_ID_AGG_PORT_ID;
+	    val.u.agg_port.agg_port_id = (bcmbal_aggregation_port_id) agg_port_id;
+	    val.u.agg_port.presence_mask = val.u.agg_port.presence_mask | BCMBAL_TM_SCHED_OWNER_AGG_PORT_ID_AGG_PORT_ID;
 
         BCMBAL_CFG_PROP_SET(&cfg, tm_sched, owner, val);
     }
diff --git a/src/indications.cc b/src/indications.cc
index 0d4387a..7346772 100644
--- a/src/indications.cc
+++ b/src/indications.cc
@@ -17,6 +17,7 @@
 #include "indications.h"
 #include "core.h"
 #include "utils.h"
+#include "stats_collection.h"
 extern "C"
 {
 #include <bcmos_system.h>
@@ -41,8 +42,10 @@
     bcmbal_access_terminal_ind *acc_term_ind = (bcmbal_access_terminal_ind *)obj;
     if (acc_term_ind->data.oper_status == BCMBAL_STATUS_UP) {
         olt_ind->set_oper_state("up");
+        start_collecting_statistics();
     } else {
         olt_ind->set_oper_state("down");
+        stop_collecting_statistics();
     }
     ind.set_allocated_olt_ind(olt_ind);
     std::cout << "olt indication, oper_state:" << ind.olt_ind().oper_state() << std::endl;
diff --git a/src/server.cc b/src/server.cc
index 9f926e8..cfd6f3f 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -19,6 +19,7 @@
 #include <memory>
 #include <string>
 #include <time.h>
+#include <pthread.h>
 
 #include "Queue.h"
 #include <iostream>
@@ -27,6 +28,7 @@
 #include "server.h"
 #include "core.h"
 #include "indications.h"
+#include "stats_collection.h"
 
 #include <grpc++/grpc++.h>
 #include <openolt.grpc.pb.h>
@@ -125,6 +127,8 @@
 
         return Status::OK;
     }
+
+
 };
 
 void RunServer() {
diff --git a/src/stats_collection.cc b/src/stats_collection.cc
new file mode 100644
index 0000000..de11c72
--- /dev/null
+++ b/src/stats_collection.cc
@@ -0,0 +1,240 @@
+#include "stats_collection.h"
+
+#include <unistd.h>
+#include <pthread.h>
+
+#include <openolt.grpc.pb.h>
+#include "indications.h"
+
+extern "C"
+{
+#include <bcmos_system.h>
+#include <bal_api.h>
+#include <bal_api_end.h>
+#include <flow_fsm.h>
+}
+
+#define COLLECTION_PERIOD 15
+//FIXME
+#define FLOWS_COUNT 100
+
+bool isCollectingStatistics;
+bcmbal_flow_key* flows_keys = new bcmbal_flow_key[FLOWS_COUNT];
+bool init_done = false;
+
+
+void start_collecting_statistics() {
+    if (!init_done) {
+        memset(flows_keys, 0, FLOWS_COUNT * sizeof(bcmbal_flow_key));
+        init_done = true;
+    }
+    pthread_t statisticsCollectionThread;
+    isCollectingStatistics = true;
+    pthread_create(&statisticsCollectionThread, NULL, stats_collection, NULL);
+
+    std::cout << "Statistics collection thread started" << std::endl;
+}
+
+void stop_collecting_statistics() {
+    isCollectingStatistics = false;
+}
+
+openolt::PortStatistics* get_default_port_statistics() {
+    openolt::PortStatistics* port_stats = new openolt::PortStatistics;
+    port_stats->set_intf_id(-1);
+    port_stats->set_rx_bytes(-1);
+    port_stats->set_rx_packets(-1);
+    port_stats->set_rx_ucast_packets(-1);
+    port_stats->set_rx_mcast_packets(-1);
+    port_stats->set_rx_bcast_packets(-1);
+    port_stats->set_rx_error_packets(-1);
+    port_stats->set_tx_bytes(-1);
+    port_stats->set_tx_packets(-1);
+    port_stats->set_tx_ucast_packets(-1);
+    port_stats->set_tx_mcast_packets(-1);
+    port_stats->set_tx_bcast_packets(-1);
+    port_stats->set_tx_error_packets(-1);
+    port_stats->set_rx_crc_errors(-1);
+    port_stats->set_bip_errors(-1);
+
+    return port_stats;
+}
+
+openolt::FlowStatistics* get_default_flow_statistics() {
+    openolt::FlowStatistics* flow_stats = new openolt::FlowStatistics;
+    flow_stats->set_flow_id(-1);
+    flow_stats->set_rx_bytes(-1);
+    flow_stats->set_rx_packets(-1);
+    flow_stats->set_tx_bytes(-1);
+    flow_stats->set_tx_packets(-1);
+
+    return flow_stats;
+}
+
+openolt::PortStatistics* collectPortStatistics(int intf_id, bcmbal_intf_type intf_type) {
+
+    bcmos_errno err;
+    bcmbal_interface_stat stat;     /**< declare main API struct */
+    bcmbal_interface_key key = { }; /**< declare key */
+    bcmos_bool clear_on_read = false;
+
+    openolt::PortStatistics* port_stats = get_default_port_statistics();
+    // build key
+    key.intf_id = (bcmbal_intf_id) intf_id;
+    key.intf_type = intf_type;
+
+    /* init the API struct */
+    BCMBAL_STAT_INIT(&stat, interface, key);
+    BCMBAL_STAT_PROP_GET(&stat, interface, all_properties);
+
+    /* call API */
+    err = bcmbal_stat_get(DEFAULT_ATERM_ID, &stat.hdr, clear_on_read);
+    if (err == BCM_ERR_OK)
+    {
+        std::cout << "Interface statistics retrieved"
+                  << " intf_id:" << intf_id << std::endl;
+
+        port_stats->set_rx_bytes(stat.data.rx_bytes);
+        port_stats->set_rx_packets(stat.data.rx_packets);
+        port_stats->set_rx_ucast_packets(stat.data.rx_ucast_packets);
+        port_stats->set_rx_mcast_packets(stat.data.rx_mcast_packets);
+        port_stats->set_rx_bcast_packets(stat.data.rx_bcast_packets);
+        port_stats->set_rx_error_packets(stat.data.rx_error_packets);
+        port_stats->set_tx_bytes(stat.data.tx_bytes);
+        port_stats->set_tx_packets(stat.data.tx_packets);
+        port_stats->set_tx_ucast_packets(stat.data.tx_ucast_packets);
+        port_stats->set_tx_mcast_packets(stat.data.tx_mcast_packets);
+        port_stats->set_tx_bcast_packets(stat.data.tx_bcast_packets);
+        port_stats->set_tx_error_packets(stat.data.tx_error_packets);
+        port_stats->set_rx_crc_errors(stat.data.rx_crc_errors);
+        port_stats->set_bip_errors(stat.data.bip_errors);
+
+    } else {
+        std::cout << "ERROR: Failed to retrieve port statistics"
+                  << " intf_id:" << intf_id
+                  << " intf_type:" << intf_type << std::endl;
+    }
+
+    return port_stats;
+
+}
+
+openolt::FlowStatistics* collectFlowStatistics(bcmbal_flow_id flow_id, bcmbal_flow_type flow_type) {
+
+    bcmos_errno err;
+    bcmbal_flow_stat stat;     /**< declare main API struct */
+    bcmbal_flow_key key = { }; /**< declare key */
+    bcmos_bool clear_on_read = false;
+
+    openolt::FlowStatistics* flow_stats = get_default_flow_statistics();
+    //Key
+    key.flow_id = flow_id;
+    key.flow_type = flow_type;
+
+    /* init the API struct */
+    BCMBAL_STAT_INIT(&stat, flow, key);
+    BCMBAL_STAT_PROP_GET(&stat, flow, all_properties);
+
+    err = bcmbal_stat_get(DEFAULT_ATERM_ID, &stat.hdr, clear_on_read);
+
+    if (err == BCM_ERR_OK)
+    {
+        std::cout << "Flow statistics retrieved"
+                  << " flow_id:" << flow_id
+                  << " flow_type:" << flow_type << std::endl;
+
+        flow_stats->set_rx_bytes(stat.data.rx_bytes);
+        flow_stats->set_rx_packets(stat.data.rx_packets);
+        flow_stats->set_tx_bytes(stat.data.tx_bytes);
+        flow_stats->set_tx_packets(stat.data.tx_packets);
+
+    } else {
+        std::cout   << "ERROR: Failed to retrieve flow statistics"
+                    << " flow_id:" << flow_id
+                    << " flow_type:" << flow_type << std::endl;
+    }
+
+    return flow_stats;
+}
+
+
+void* stats_collection(void* x) {
+
+    time_t now;
+
+    while(isCollectingStatistics) {
+        //Ports statistics
+
+        //Uplink ports
+        for (int i = 0; i < 4; i++) {
+            openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_NNI);
+            //FIXME Use clean port translation
+            port_stats->set_intf_id(128 + i);
+            time(&now);
+            port_stats->set_timestamp((int)now);
+            openolt::Indication ind;
+            ind.set_allocated_port_stats(port_stats);
+            oltIndQ.push(ind);
+        }
+        //Pon ports
+        for (int i = 0; i < 16; i++) {
+            openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_PON);
+            //FIXME Use clean port translation
+            port_stats->set_intf_id((0x2 << 28) + i);
+            time(&now);
+            port_stats->set_timestamp((int)now);
+            openolt::Indication ind;
+            ind.set_allocated_port_stats(port_stats);
+            oltIndQ.push(ind);
+        }
+
+        //Flows statistics
+        // flow_inst *current_entry = NULL;
+        //
+        // TAILQ_FOREACH(current_entry,
+        //               &FLOW_FSM_FLOW_LIST_CTX_PTR->active_flow_list,
+        //               flow_inst_next) {
+        // int flows_measurements = 0;
+        //
+        // for (int i = 0; i < FLOWS_COUNT; i++) {
+        //
+        //     // bcmbal_flow_id flow_id = current_entry->api_req_flow_info.key.flow_id;
+        //     // bcmbal_flow_type flow_type = current_entry->api_req_flow_info.key.flow_type;
+        //
+        //     if (flows_keys[i].flow_id != 0) {
+        //         openolt::FlowStatistics* flow_stats = collectFlowStatistics(flows_keys[i].flow_id, flows_keys[i].flow_type);
+        //         if (flow_stats->rx_packets() == -1) {
+        //             //It Failed
+        //             flows_keys[i].flow_id = 0;
+        //         } else {
+        //             flow_stats->set_flow_id(flows_keys[i].flow_id);
+        //             time(&now);
+        //             flow_stats->set_timestamp((int)now);
+        //             openolt::Indication ind;
+        //             ind.set_allocated_flow_stats(flow_stats);
+        //             oltIndQ.push(ind);
+        //             flows_measurements ++;
+        //         }
+        //     }
+        //
+        // }
+        // std::cout << "Stats of " << flows_measurements << " flows retrieved" << std::endl;
+
+        sleep(COLLECTION_PERIOD);
+
+    }
+
+    std::cout << "Statistics collection thread terminated" << std::endl;
+
+
+}
+
+/* Storing flow keys, temporary */
+void register_new_flow(bcmbal_flow_key key) {
+    for (int i = 0; i < FLOWS_COUNT; i++) {
+        if (flows_keys[i].flow_id == 0) {
+            flows_keys[i] = key;
+            break;
+        }
+    }
+}
diff --git a/src/stats_collection.h b/src/stats_collection.h
new file mode 100644
index 0000000..b76d335
--- /dev/null
+++ b/src/stats_collection.h
@@ -0,0 +1,21 @@
+#ifndef OPENOLT_STATS_COLLECTION_H_
+#define OPENOLT_STATS_COLLECTION_H_
+
+#include <openolt.grpc.pb.h>
+
+extern "C"
+{
+#include <bal_model_types.h>
+}
+
+void start_collecting_statistics();
+void stop_collecting_statistics();
+openolt::PortStatistics* get_default_port_statistics();
+openolt::PortStatistics* collectPortStatistics(int intf_id, bcmbal_intf_type intf_type);
+openolt::FlowStatistics* get_default_flow_statistics();
+openolt::FlowStatistics* collectFlowStatistics(bcmbal_flow_id flow_id, bcmbal_flow_type flow_type);
+void* stats_collection(void* x);
+void register_new_flow(bcmbal_flow_key key);
+
+
+#endif