blob: 2e366cee721cddf243e4a26727e3a997665d0151 [file] [log] [blame]
Shad Ansari01b0e652018-04-05 21:02:53 +00001//
2// Copyright (c) 2013 Juan Palacios juan.palacios.puyana@gmail.com
3// Subject to the BSD 2-Clause License
4// - see < http://opensource.org/licenses/BSD-2-Clause>
5//
6
7#ifndef CONCURRENT_QUEUE_
8#define CONCURRENT_QUEUE_
9
10#include <queue>
11#include <thread>
12#include <mutex>
13#include <condition_variable>
Shad Ansariedef2132018-08-10 22:14:50 +000014#include <chrono>
Shad Ansari01b0e652018-04-05 21:02:53 +000015
16template <typename T>
17class Queue
18{
19 public:
20
Girish Gowdra96461052019-11-22 20:13:59 +053021 // timeout is in milliseconds, wait_granularity in milliseconds
22 std::pair<T, bool> pop(int timeout, int wait_granularity=10)
Shad Ansari01b0e652018-04-05 21:02:53 +000023 {
Shad Ansariedef2132018-08-10 22:14:50 +000024 std::cv_status status = std::cv_status::no_timeout;
Shad Ansari01b0e652018-04-05 21:02:53 +000025 std::unique_lock<std::mutex> mlock(mutex_);
Shad Ansariedef2132018-08-10 22:14:50 +000026 static int duration = 0;
Girish Gowdra96461052019-11-22 20:13:59 +053027 if (timeout < wait_granularity) {
28 wait_granularity = timeout;
29 }
Shad Ansari01b0e652018-04-05 21:02:53 +000030 while (queue_.empty())
31 {
Girish Gowdra96461052019-11-22 20:13:59 +053032 status = cond_.wait_for(mlock, std::chrono::milliseconds(wait_granularity));
Shad Ansariedef2132018-08-10 22:14:50 +000033 if (status == std::cv_status::timeout)
34 {
Girish Gowdra96461052019-11-22 20:13:59 +053035 duration+=wait_granularity;
Shad Ansariedef2132018-08-10 22:14:50 +000036 if (duration > timeout)
37 {
38 duration = 0;
39 return std::pair<T, bool>({}, false);
40 }
41 }
Shad Ansari01b0e652018-04-05 21:02:53 +000042 }
43 auto val = queue_.front();
44 queue_.pop();
Shad Ansariedef2132018-08-10 22:14:50 +000045 return std::pair<T, bool>(val, true);
Shad Ansari01b0e652018-04-05 21:02:53 +000046 }
47
48 void pop(T& item)
49 {
50 std::unique_lock<std::mutex> mlock(mutex_);
51 while (queue_.empty())
52 {
53 cond_.wait(mlock);
54 }
55 item = queue_.front();
56 queue_.pop();
57 }
58
59 void push(const T& item)
60 {
61 std::unique_lock<std::mutex> mlock(mutex_);
62 queue_.push(item);
63 mlock.unlock();
64 cond_.notify_one();
65 }
66 Queue()=default;
67 Queue(const Queue&) = delete; // disable copying
68 Queue& operator=(const Queue&) = delete; // disable assignment
69
70 private:
71 std::queue<T> queue_;
72 std::mutex mutex_;
73 std::condition_variable cond_;
74};
75
76#endif