VOL-1150 A single threaded implementation for the OpenOLT driver

Stats collection is driven from the main thread.

Change-Id: I773a4aeb0e840f2a36895e39b999b2dcaf2adaa6
diff --git a/src/Queue.h b/src/Queue.h
index 854851e..30fbbda 100644
--- a/src/Queue.h
+++ b/src/Queue.h
@@ -11,22 +11,34 @@
 #include <thread>
 #include <mutex>
 #include <condition_variable>
+#include <chrono>
 
 template <typename T>
 class Queue
 {
  public:
 
-  T pop() 
+  std::pair<T, bool> pop(int timeout)
   {
+    std::cv_status status = std::cv_status::no_timeout;
     std::unique_lock<std::mutex> mlock(mutex_);
+    static int duration = 0;
     while (queue_.empty())
     {
-      cond_.wait(mlock);
+      status = cond_.wait_for(mlock, std::chrono::seconds(1));
+      if (status == std::cv_status::timeout)
+      {
+        duration++;
+        if (duration > timeout)
+        {
+          duration = 0;
+          return std::pair<T, bool>({}, false);
+        }
+      }
     }
     auto val = queue_.front();
     queue_.pop();
-    return val;
+    return std::pair<T, bool>(val, true);
   }
 
   void pop(T& item)
diff --git a/src/core.cc b/src/core.cc
index 9ad075c..ae54c56 100644
--- a/src/core.cc
+++ b/src/core.cc
@@ -36,12 +36,13 @@
 #include <bal_api_end.h>
 }
 
+State state;
 
 Status Enable_() {
     bcmbal_access_terminal_cfg acc_term_obj;
     bcmbal_access_terminal_key key = { };
 
-    if (!state::is_activated()) {
+    if (!state.is_activated()) {
         std::cout << "Enable OLT" << std::endl;
         key.access_term_id = DEFAULT_ATERM_ID;
         BCMBAL_CFG_INIT(&acc_term_obj, access_terminal, key);
@@ -51,10 +52,11 @@
             std::cout << "ERROR: Failed to enable OLT" << std::endl;
             return bcm_to_grpc_err(err, "Failed to enable OLT");
         }
+        init_stats();
     }
+
     //If already enabled, generate an extra indication ????
     return Status::OK;
-
 }
 
 Status Disable_() {
@@ -79,7 +81,7 @@
     //TEMPORARY WORK AROUND
     Status status = DisableUplinkIf_(0);
     if (status.ok()) {
-        state::deactivate();
+        state.deactivate();
         openolt::Indication ind;
         openolt::OltIndication* olt_ind = new openolt::OltIndication;
         olt_ind->set_oper_state("down");
@@ -94,7 +96,7 @@
 Status Reenable_() {
     Status status = EnableUplinkIf_(0);
     if (status.ok()) {
-        state::activate();
+        state.activate();
         openolt::Indication ind;
         openolt::OltIndication* olt_ind = new openolt::OltIndication;
         olt_ind->set_oper_state("up");
diff --git a/src/core.h b/src/core.h
index 2890eb3..b0c3b5f 100644
--- a/src/core.h
+++ b/src/core.h
@@ -22,6 +22,10 @@
 using grpc::Status;
 #include <openolt.grpc.pb.h>
 
+#include "state.h"
+
+extern State state;
+
 Status Enable_();
 Status ActivateOnu_(uint32_t intf_id, uint32_t onu_id,
     const char *vendor_id, const char *vendor_specific, uint32_t pir);
diff --git a/src/indications.cc b/src/indications.cc
index adac6ad..4d7a393 100644
--- a/src/indications.cc
+++ b/src/indications.cc
@@ -48,10 +48,10 @@
     bcmbal_access_terminal_oper_status_change *acc_term_ind = (bcmbal_access_terminal_oper_status_change *)obj;
     if (acc_term_ind->data.new_oper_status == BCMBAL_STATUS_UP) {
         olt_ind->set_oper_state("up");
-        state::activate();
+        state.activate();
     } else {
         olt_ind->set_oper_state("down");
-        state::deactivate();
+        state.deactivate();
     }
     ind.set_allocated_olt_ind(olt_ind);
     std::cout << "olt indication, oper_state:" << ind.olt_ind().oper_state() << std::endl;
diff --git a/src/server.cc b/src/server.cc
index 709eb31..8cd83ff 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -151,18 +151,26 @@
             ServerContext* context,
             const ::openolt::Empty* request,
             ServerWriter<openolt::Indication>* writer) override {
+
         std::cout << "Connection to Voltha established. Indications enabled"
         << std::endl;
-        state::connect();
 
-        while (state::is_connected) {
-            auto oltInd = oltIndQ.pop();
+        state.connect();
+
+        while (state.is_connected()) {
+            std::pair<openolt::Indication, bool> ind = oltIndQ.pop(COLLECTION_PERIOD);
+            if (ind.second == false) {
+                /* timeout - do lower priority periodic stuff like stats */
+                stats_collection();
+                continue;
+            }
+            openolt::Indication oltInd = ind.first;
             bool isConnected = writer->Write(oltInd);
             if (!isConnected) {
                 //Lost connectivity to this Voltha instance
                 //Put the indication back in the queue for next connecting instance
                 oltIndQ.push(oltInd);
-                state::disconnect();
+                state.disconnect();
             }
             //oltInd.release_olt_ind()
         }
diff --git a/src/state.cc b/src/state.cc
deleted file mode 100644
index 54339e1..0000000
--- a/src/state.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-#include "stats_collection.h"
-#include <mutex>
-
-namespace state {
-
-    bool connected_to_voltha = false;
-    bool activated = false;
-    std::mutex state_lock;
-
-    bool is_connected() {
-        return connected_to_voltha;
-    }
-
-    bool is_activated() {
-        return activated;
-    }
-
-    void connect() {
-        state_lock.lock();
-        connected_to_voltha = true;
-        if (activated) {
-            start_collecting_statistics();
-        }
-        state_lock.unlock();
-    }
-
-    void disconnect() {
-        state_lock.lock();
-        connected_to_voltha = false;
-        stop_collecting_statistics();
-        state_lock.unlock();
-    }
-
-    void activate() {
-        state_lock.lock();
-        activated = true;
-        if (connected_to_voltha) {
-            start_collecting_statistics();
-        }
-        state_lock.unlock();
-    }
-
-    void deactivate() {
-        state_lock.lock();
-        activated = false;
-        stop_collecting_statistics();
-        state_lock.unlock();
-    }
-
-}
diff --git a/src/state.h b/src/state.h
index c392a01..331b09d 100644
--- a/src/state.h
+++ b/src/state.h
@@ -1,13 +1,35 @@
 #ifndef OPENOLT_STATE_H_
 #define OPENOLT_STATE_H_
 
-namespace state {
-    bool is_activated();
-    bool is_connected();
-    void connect();
-    void disconnect();
-    void activate();
-    void deactivate();
-}
+class State {
+  public:
 
+    bool is_connected() {
+        return connected_to_voltha;
+    }
+
+    bool is_activated() {
+        return activated;
+    }
+
+    void connect() {
+        connected_to_voltha = true;
+    }
+
+    void disconnect() {
+        connected_to_voltha = false;
+    }
+
+    void activate() {
+        activated = true;
+    }
+
+    void deactivate() {
+        activated = false;
+    }
+
+  private:
+    bool connected_to_voltha = false;
+    bool activated = false;
+};
 #endif
diff --git a/src/stats_collection.cc b/src/stats_collection.cc
index d364384..0416ef4 100644
--- a/src/stats_collection.cc
+++ b/src/stats_collection.cc
@@ -1,7 +1,6 @@
 #include "stats_collection.h"
 
 #include <unistd.h>
-#include <pthread.h>
 
 #include <openolt.grpc.pb.h>
 #include "indications.h"
@@ -14,29 +13,13 @@
 #include <flow_fsm.h>
 }
 
-#define COLLECTION_PERIOD 15
 //FIXME
 #define FLOWS_COUNT 100
 
-bool isCollectingStatistics;
 bcmbal_flow_key* flows_keys = new bcmbal_flow_key[FLOWS_COUNT];
-bool init_done = false;
 
-
-void start_collecting_statistics() {
-    if (!init_done) {
-        memset(flows_keys, 0, FLOWS_COUNT * sizeof(bcmbal_flow_key));
-        init_done = true;
-    }
-    pthread_t statisticsCollectionThread;
-    isCollectingStatistics = true;
-    pthread_create(&statisticsCollectionThread, NULL, stats_collection, NULL);
-
-    std::cout << "Statistics collection thread started" << std::endl;
-}
-
-void stop_collecting_statistics() {
-    isCollectingStatistics = false;
+void init_stats() {
+    memset(flows_keys, 0, FLOWS_COUNT * sizeof(bcmbal_flow_key));
 }
 
 openolt::PortStatistics* get_default_port_statistics() {
@@ -162,77 +145,68 @@
 #endif
 
 
-void* stats_collection(void* x) {
+void* stats_collection() {
 
     time_t now;
 
-    while(isCollectingStatistics) {
+    std::cout << "Collecting statistics" << std::endl;
 
-        std::cout << "Collecting statistics" << std::endl;
+    //Ports statistics
 
-        //Ports statistics
-
-        //Uplink ports
-        for (int i = 0; i < 4; i++) {
-            openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_NNI);
-            //FIXME Use clean port translation
-            port_stats->set_intf_id(128 + i);
-            time(&now);
-            port_stats->set_timestamp((int)now);
-            openolt::Indication ind;
-            ind.set_allocated_port_stats(port_stats);
-            oltIndQ.push(ind);
-        }
-        //Pon ports
-        for (int i = 0; i < 16; i++) {
-            openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_PON);
-            //FIXME Use clean port translation
-            port_stats->set_intf_id((0x2 << 28) + i);
-            time(&now);
-            port_stats->set_timestamp((int)now);
-            openolt::Indication ind;
-            ind.set_allocated_port_stats(port_stats);
-            oltIndQ.push(ind);
-        }
-
-        //Flows statistics
-        // flow_inst *current_entry = NULL;
-        //
-        // TAILQ_FOREACH(current_entry,
-        //               &FLOW_FSM_FLOW_LIST_CTX_PTR->active_flow_list,
-        //               flow_inst_next) {
-        // int flows_measurements = 0;
-        //
-        // for (int i = 0; i < FLOWS_COUNT; i++) {
-        //
-        //     // bcmbal_flow_id flow_id = current_entry->api_req_flow_info.key.flow_id;
-        //     // bcmbal_flow_type flow_type = current_entry->api_req_flow_info.key.flow_type;
-        //
-        //     if (flows_keys[i].flow_id != 0) {
-        //         openolt::FlowStatistics* flow_stats = collectFlowStatistics(flows_keys[i].flow_id, flows_keys[i].flow_type);
-        //         if (flow_stats->rx_packets() == -1) {
-        //             //It Failed
-        //             flows_keys[i].flow_id = 0;
-        //         } else {
-        //             flow_stats->set_flow_id(flows_keys[i].flow_id);
-        //             time(&now);
-        //             flow_stats->set_timestamp((int)now);
-        //             openolt::Indication ind;
-        //             ind.set_allocated_flow_stats(flow_stats);
-        //             oltIndQ.push(ind);
-        //             flows_measurements ++;
-        //         }
-        //     }
-        //
-        // }
-        // std::cout << "Stats of " << flows_measurements << " flows retrieved" << std::endl;
-
-        sleep(COLLECTION_PERIOD);
-
+    //Uplink ports
+    for (int i = 0; i < 4; i++) {
+        openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_NNI);
+        //FIXME Use clean port translation
+        port_stats->set_intf_id(128 + i);
+        time(&now);
+        port_stats->set_timestamp((int)now);
+        openolt::Indication ind;
+        ind.set_allocated_port_stats(port_stats);
+        oltIndQ.push(ind);
+    }
+    //Pon ports
+    for (int i = 0; i < 16; i++) {
+        openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_PON);
+        //FIXME Use clean port translation
+        port_stats->set_intf_id((0x2 << 28) + i);
+        time(&now);
+        port_stats->set_timestamp((int)now);
+        openolt::Indication ind;
+        ind.set_allocated_port_stats(port_stats);
+        oltIndQ.push(ind);
     }
 
-    std::cout << "Statistics collection thread terminated" << std::endl;
-
+    //Flows statistics
+    // flow_inst *current_entry = NULL;
+    //
+    // TAILQ_FOREACH(current_entry,
+    //               &FLOW_FSM_FLOW_LIST_CTX_PTR->active_flow_list,
+    //               flow_inst_next) {
+    // int flows_measurements = 0;
+    //
+    // for (int i = 0; i < FLOWS_COUNT; i++) {
+    //
+    //     // bcmbal_flow_id flow_id = current_entry->api_req_flow_info.key.flow_id;
+    //     // bcmbal_flow_type flow_type = current_entry->api_req_flow_info.key.flow_type;
+    //
+    //     if (flows_keys[i].flow_id != 0) {
+    //         openolt::FlowStatistics* flow_stats = collectFlowStatistics(flows_keys[i].flow_id, flows_keys[i].flow_type);
+    //         if (flow_stats->rx_packets() == -1) {
+    //             //It Failed
+    //             flows_keys[i].flow_id = 0;
+    //         } else {
+    //             flow_stats->set_flow_id(flows_keys[i].flow_id);
+    //             time(&now);
+    //             flow_stats->set_timestamp((int)now);
+    //             openolt::Indication ind;
+    //             ind.set_allocated_flow_stats(flow_stats);
+    //             oltIndQ.push(ind);
+    //             flows_measurements ++;
+    //         }
+    //     }
+    //
+    // }
+    // std::cout << "Stats of " << flows_measurements << " flows retrieved" << std::endl;
 
 }
 
diff --git a/src/stats_collection.h b/src/stats_collection.h
index 6f892b7..0437634 100644
--- a/src/stats_collection.h
+++ b/src/stats_collection.h
@@ -8,7 +8,9 @@
 #include <bal_model_types.h>
 }
 
-void start_collecting_statistics();
+#define COLLECTION_PERIOD 15
+
+void init_stats();
 void stop_collecting_statistics();
 openolt::PortStatistics* get_default_port_statistics();
 openolt::PortStatistics* collectPortStatistics(int intf_id, bcmbal_intf_type intf_type);
@@ -16,7 +18,7 @@
 openolt::FlowStatistics* get_default_flow_statistics();
 openolt::FlowStatistics* collectFlowStatistics(bcmbal_flow_id flow_id, bcmbal_flow_type flow_type);
 #endif
-void* stats_collection(void* x);
+void* stats_collection();
 void register_new_flow(bcmbal_flow_key key);