VOL-1150 A single threaded implementation for the OpenOLT driver
Stats collection is driven from the main thread.
Change-Id: I773a4aeb0e840f2a36895e39b999b2dcaf2adaa6
diff --git a/src/Queue.h b/src/Queue.h
index 854851e..30fbbda 100644
--- a/src/Queue.h
+++ b/src/Queue.h
@@ -11,22 +11,34 @@
#include <thread>
#include <mutex>
#include <condition_variable>
+#include <chrono>
template <typename T>
class Queue
{
public:
- T pop()
+ std::pair<T, bool> pop(int timeout)
{
+ std::cv_status status = std::cv_status::no_timeout;
std::unique_lock<std::mutex> mlock(mutex_);
+ static int duration = 0;
while (queue_.empty())
{
- cond_.wait(mlock);
+ status = cond_.wait_for(mlock, std::chrono::seconds(1));
+ if (status == std::cv_status::timeout)
+ {
+ duration++;
+ if (duration > timeout)
+ {
+ duration = 0;
+ return std::pair<T, bool>({}, false);
+ }
+ }
}
auto val = queue_.front();
queue_.pop();
- return val;
+ return std::pair<T, bool>(val, true);
}
void pop(T& item)
diff --git a/src/core.cc b/src/core.cc
index 9ad075c..ae54c56 100644
--- a/src/core.cc
+++ b/src/core.cc
@@ -36,12 +36,13 @@
#include <bal_api_end.h>
}
+State state;
Status Enable_() {
bcmbal_access_terminal_cfg acc_term_obj;
bcmbal_access_terminal_key key = { };
- if (!state::is_activated()) {
+ if (!state.is_activated()) {
std::cout << "Enable OLT" << std::endl;
key.access_term_id = DEFAULT_ATERM_ID;
BCMBAL_CFG_INIT(&acc_term_obj, access_terminal, key);
@@ -51,10 +52,11 @@
std::cout << "ERROR: Failed to enable OLT" << std::endl;
return bcm_to_grpc_err(err, "Failed to enable OLT");
}
+ init_stats();
}
+
//If already enabled, generate an extra indication ????
return Status::OK;
-
}
Status Disable_() {
@@ -79,7 +81,7 @@
//TEMPORARY WORK AROUND
Status status = DisableUplinkIf_(0);
if (status.ok()) {
- state::deactivate();
+ state.deactivate();
openolt::Indication ind;
openolt::OltIndication* olt_ind = new openolt::OltIndication;
olt_ind->set_oper_state("down");
@@ -94,7 +96,7 @@
Status Reenable_() {
Status status = EnableUplinkIf_(0);
if (status.ok()) {
- state::activate();
+ state.activate();
openolt::Indication ind;
openolt::OltIndication* olt_ind = new openolt::OltIndication;
olt_ind->set_oper_state("up");
diff --git a/src/core.h b/src/core.h
index 2890eb3..b0c3b5f 100644
--- a/src/core.h
+++ b/src/core.h
@@ -22,6 +22,10 @@
using grpc::Status;
#include <openolt.grpc.pb.h>
+#include "state.h"
+
+extern State state;
+
Status Enable_();
Status ActivateOnu_(uint32_t intf_id, uint32_t onu_id,
const char *vendor_id, const char *vendor_specific, uint32_t pir);
diff --git a/src/indications.cc b/src/indications.cc
index adac6ad..4d7a393 100644
--- a/src/indications.cc
+++ b/src/indications.cc
@@ -48,10 +48,10 @@
bcmbal_access_terminal_oper_status_change *acc_term_ind = (bcmbal_access_terminal_oper_status_change *)obj;
if (acc_term_ind->data.new_oper_status == BCMBAL_STATUS_UP) {
olt_ind->set_oper_state("up");
- state::activate();
+ state.activate();
} else {
olt_ind->set_oper_state("down");
- state::deactivate();
+ state.deactivate();
}
ind.set_allocated_olt_ind(olt_ind);
std::cout << "olt indication, oper_state:" << ind.olt_ind().oper_state() << std::endl;
diff --git a/src/server.cc b/src/server.cc
index 709eb31..8cd83ff 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -151,18 +151,26 @@
ServerContext* context,
const ::openolt::Empty* request,
ServerWriter<openolt::Indication>* writer) override {
+
std::cout << "Connection to Voltha established. Indications enabled"
<< std::endl;
- state::connect();
- while (state::is_connected) {
- auto oltInd = oltIndQ.pop();
+ state.connect();
+
+ while (state.is_connected()) {
+ std::pair<openolt::Indication, bool> ind = oltIndQ.pop(COLLECTION_PERIOD);
+ if (ind.second == false) {
+ /* timeout - do lower priority periodic stuff like stats */
+ stats_collection();
+ continue;
+ }
+ openolt::Indication oltInd = ind.first;
bool isConnected = writer->Write(oltInd);
if (!isConnected) {
//Lost connectivity to this Voltha instance
//Put the indication back in the queue for next connecting instance
oltIndQ.push(oltInd);
- state::disconnect();
+ state.disconnect();
}
//oltInd.release_olt_ind()
}
diff --git a/src/state.cc b/src/state.cc
deleted file mode 100644
index 54339e1..0000000
--- a/src/state.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-#include "stats_collection.h"
-#include <mutex>
-
-namespace state {
-
- bool connected_to_voltha = false;
- bool activated = false;
- std::mutex state_lock;
-
- bool is_connected() {
- return connected_to_voltha;
- }
-
- bool is_activated() {
- return activated;
- }
-
- void connect() {
- state_lock.lock();
- connected_to_voltha = true;
- if (activated) {
- start_collecting_statistics();
- }
- state_lock.unlock();
- }
-
- void disconnect() {
- state_lock.lock();
- connected_to_voltha = false;
- stop_collecting_statistics();
- state_lock.unlock();
- }
-
- void activate() {
- state_lock.lock();
- activated = true;
- if (connected_to_voltha) {
- start_collecting_statistics();
- }
- state_lock.unlock();
- }
-
- void deactivate() {
- state_lock.lock();
- activated = false;
- stop_collecting_statistics();
- state_lock.unlock();
- }
-
-}
diff --git a/src/state.h b/src/state.h
index c392a01..331b09d 100644
--- a/src/state.h
+++ b/src/state.h
@@ -1,13 +1,35 @@
#ifndef OPENOLT_STATE_H_
#define OPENOLT_STATE_H_
-namespace state {
- bool is_activated();
- bool is_connected();
- void connect();
- void disconnect();
- void activate();
- void deactivate();
-}
+class State {
+ public:
+ bool is_connected() {
+ return connected_to_voltha;
+ }
+
+ bool is_activated() {
+ return activated;
+ }
+
+ void connect() {
+ connected_to_voltha = true;
+ }
+
+ void disconnect() {
+ connected_to_voltha = false;
+ }
+
+ void activate() {
+ activated = true;
+ }
+
+ void deactivate() {
+ activated = false;
+ }
+
+ private:
+ bool connected_to_voltha = false;
+ bool activated = false;
+};
#endif
diff --git a/src/stats_collection.cc b/src/stats_collection.cc
index d364384..0416ef4 100644
--- a/src/stats_collection.cc
+++ b/src/stats_collection.cc
@@ -1,7 +1,6 @@
#include "stats_collection.h"
#include <unistd.h>
-#include <pthread.h>
#include <openolt.grpc.pb.h>
#include "indications.h"
@@ -14,29 +13,13 @@
#include <flow_fsm.h>
}
-#define COLLECTION_PERIOD 15
//FIXME
#define FLOWS_COUNT 100
-bool isCollectingStatistics;
bcmbal_flow_key* flows_keys = new bcmbal_flow_key[FLOWS_COUNT];
-bool init_done = false;
-
-void start_collecting_statistics() {
- if (!init_done) {
- memset(flows_keys, 0, FLOWS_COUNT * sizeof(bcmbal_flow_key));
- init_done = true;
- }
- pthread_t statisticsCollectionThread;
- isCollectingStatistics = true;
- pthread_create(&statisticsCollectionThread, NULL, stats_collection, NULL);
-
- std::cout << "Statistics collection thread started" << std::endl;
-}
-
-void stop_collecting_statistics() {
- isCollectingStatistics = false;
+void init_stats() {
+ memset(flows_keys, 0, FLOWS_COUNT * sizeof(bcmbal_flow_key));
}
openolt::PortStatistics* get_default_port_statistics() {
@@ -162,77 +145,68 @@
#endif
-void* stats_collection(void* x) {
+void* stats_collection() {
time_t now;
- while(isCollectingStatistics) {
+ std::cout << "Collecting statistics" << std::endl;
- std::cout << "Collecting statistics" << std::endl;
+ //Ports statistics
- //Ports statistics
-
- //Uplink ports
- for (int i = 0; i < 4; i++) {
- openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_NNI);
- //FIXME Use clean port translation
- port_stats->set_intf_id(128 + i);
- time(&now);
- port_stats->set_timestamp((int)now);
- openolt::Indication ind;
- ind.set_allocated_port_stats(port_stats);
- oltIndQ.push(ind);
- }
- //Pon ports
- for (int i = 0; i < 16; i++) {
- openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_PON);
- //FIXME Use clean port translation
- port_stats->set_intf_id((0x2 << 28) + i);
- time(&now);
- port_stats->set_timestamp((int)now);
- openolt::Indication ind;
- ind.set_allocated_port_stats(port_stats);
- oltIndQ.push(ind);
- }
-
- //Flows statistics
- // flow_inst *current_entry = NULL;
- //
- // TAILQ_FOREACH(current_entry,
- // &FLOW_FSM_FLOW_LIST_CTX_PTR->active_flow_list,
- // flow_inst_next) {
- // int flows_measurements = 0;
- //
- // for (int i = 0; i < FLOWS_COUNT; i++) {
- //
- // // bcmbal_flow_id flow_id = current_entry->api_req_flow_info.key.flow_id;
- // // bcmbal_flow_type flow_type = current_entry->api_req_flow_info.key.flow_type;
- //
- // if (flows_keys[i].flow_id != 0) {
- // openolt::FlowStatistics* flow_stats = collectFlowStatistics(flows_keys[i].flow_id, flows_keys[i].flow_type);
- // if (flow_stats->rx_packets() == -1) {
- // //It Failed
- // flows_keys[i].flow_id = 0;
- // } else {
- // flow_stats->set_flow_id(flows_keys[i].flow_id);
- // time(&now);
- // flow_stats->set_timestamp((int)now);
- // openolt::Indication ind;
- // ind.set_allocated_flow_stats(flow_stats);
- // oltIndQ.push(ind);
- // flows_measurements ++;
- // }
- // }
- //
- // }
- // std::cout << "Stats of " << flows_measurements << " flows retrieved" << std::endl;
-
- sleep(COLLECTION_PERIOD);
-
+ //Uplink ports
+ for (int i = 0; i < 4; i++) {
+ openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_NNI);
+ //FIXME Use clean port translation
+ port_stats->set_intf_id(128 + i);
+ time(&now);
+ port_stats->set_timestamp((int)now);
+ openolt::Indication ind;
+ ind.set_allocated_port_stats(port_stats);
+ oltIndQ.push(ind);
+ }
+ //Pon ports
+ for (int i = 0; i < 16; i++) {
+ openolt::PortStatistics* port_stats = collectPortStatistics(i, BCMBAL_INTF_TYPE_PON);
+ //FIXME Use clean port translation
+ port_stats->set_intf_id((0x2 << 28) + i);
+ time(&now);
+ port_stats->set_timestamp((int)now);
+ openolt::Indication ind;
+ ind.set_allocated_port_stats(port_stats);
+ oltIndQ.push(ind);
}
- std::cout << "Statistics collection thread terminated" << std::endl;
-
+ //Flows statistics
+ // flow_inst *current_entry = NULL;
+ //
+ // TAILQ_FOREACH(current_entry,
+ // &FLOW_FSM_FLOW_LIST_CTX_PTR->active_flow_list,
+ // flow_inst_next) {
+ // int flows_measurements = 0;
+ //
+ // for (int i = 0; i < FLOWS_COUNT; i++) {
+ //
+ // // bcmbal_flow_id flow_id = current_entry->api_req_flow_info.key.flow_id;
+ // // bcmbal_flow_type flow_type = current_entry->api_req_flow_info.key.flow_type;
+ //
+ // if (flows_keys[i].flow_id != 0) {
+ // openolt::FlowStatistics* flow_stats = collectFlowStatistics(flows_keys[i].flow_id, flows_keys[i].flow_type);
+ // if (flow_stats->rx_packets() == -1) {
+ // //It Failed
+ // flows_keys[i].flow_id = 0;
+ // } else {
+ // flow_stats->set_flow_id(flows_keys[i].flow_id);
+ // time(&now);
+ // flow_stats->set_timestamp((int)now);
+ // openolt::Indication ind;
+ // ind.set_allocated_flow_stats(flow_stats);
+ // oltIndQ.push(ind);
+ // flows_measurements ++;
+ // }
+ // }
+ //
+ // }
+ // std::cout << "Stats of " << flows_measurements << " flows retrieved" << std::endl;
}
diff --git a/src/stats_collection.h b/src/stats_collection.h
index 6f892b7..0437634 100644
--- a/src/stats_collection.h
+++ b/src/stats_collection.h
@@ -8,7 +8,9 @@
#include <bal_model_types.h>
}
-void start_collecting_statistics();
+#define COLLECTION_PERIOD 15
+
+void init_stats();
void stop_collecting_statistics();
openolt::PortStatistics* get_default_port_statistics();
openolt::PortStatistics* collectPortStatistics(int intf_id, bcmbal_intf_type intf_type);
@@ -16,7 +18,7 @@
openolt::FlowStatistics* get_default_flow_statistics();
openolt::FlowStatistics* collectFlowStatistics(bcmbal_flow_id flow_id, bcmbal_flow_type flow_type);
#endif
-void* stats_collection(void* x);
+void* stats_collection();
void register_new_flow(bcmbal_flow_key key);