blob: 8e90a7560ad17460bf6c9bcc53ebbfe5b466d4bc [file] [log] [blame]
Shad Ansari01b0e652018-04-05 21:02:53 +00001//
2// Copyright (c) 2013 Juan Palacios juan.palacios.puyana@gmail.com
3// Subject to the BSD 2-Clause License
4// - see < http://opensource.org/licenses/BSD-2-Clause>
5//
6
7#ifndef CONCURRENT_QUEUE_
8#define CONCURRENT_QUEUE_
9
10#include <queue>
11#include <thread>
12#include <mutex>
13#include <condition_variable>
Shad Ansariedef2132018-08-10 22:14:50 +000014#include <chrono>
Shad Ansari01b0e652018-04-05 21:02:53 +000015
16template <typename T>
17class Queue
18{
19 public:
Orhan Kupusogluec57af02021-05-12 12:38:17 +000020 /**
21 * @brief pop with timeout
22 * @details This pop() checks the queue within the given timeout duration at each given interval.
23 * Use of the out parameter is for exception safety reasons.
24 * The timeout duration is decreased at each check interval, so that it may be <= 0.
25 * <a href="https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html">Implementing a Thread-Safe Queue using Condition Variables (Updated)</a>
26 * @param[out] value pop queue.front()
27 * @param[in] timeout_duration time out after this duration
28 * @param[in] check_interval check at each this interval
29 * @return [true] if pop happens within the timeout duration, [false] otherwise
30 */
31 bool pop(T& value,
32 std::chrono::milliseconds timeout_duration,
33 const std::chrono::milliseconds& check_interval=std::chrono::milliseconds(10)) {
34 std::unique_lock<std::mutex> lock(mutex_);
35
36 while (queue_.empty()) {
37 if (cond_.wait_for(lock, check_interval) == std::cv_status::timeout) {
38 timeout_duration -= check_interval;
39 if (timeout_duration <= std::chrono::milliseconds::zero() ) {
40 return false;
41 }
42 }
43 }
44
45 value = queue_.front();
46 queue_.pop();
47 return true;
48 }
49
50 /**
51 * @brief returns the number of elements
52 * @return Returns the number of elements in the underlying container.
53 */
54 std::size_t size() {
55 std::unique_lock<std::mutex> lock(mutex_);
56 return queue_.size();
57 }
Shad Ansari01b0e652018-04-05 21:02:53 +000058
Girish Gowdra96461052019-11-22 20:13:59 +053059 // timeout is in milliseconds, wait_granularity in milliseconds
60 std::pair<T, bool> pop(int timeout, int wait_granularity=10)
Shad Ansari01b0e652018-04-05 21:02:53 +000061 {
Shad Ansariedef2132018-08-10 22:14:50 +000062 std::cv_status status = std::cv_status::no_timeout;
Shad Ansari01b0e652018-04-05 21:02:53 +000063 std::unique_lock<std::mutex> mlock(mutex_);
Shad Ansariedef2132018-08-10 22:14:50 +000064 static int duration = 0;
Girish Gowdra96461052019-11-22 20:13:59 +053065 if (timeout < wait_granularity) {
66 wait_granularity = timeout;
67 }
Shad Ansari01b0e652018-04-05 21:02:53 +000068 while (queue_.empty())
69 {
Girish Gowdra96461052019-11-22 20:13:59 +053070 status = cond_.wait_for(mlock, std::chrono::milliseconds(wait_granularity));
Shad Ansariedef2132018-08-10 22:14:50 +000071 if (status == std::cv_status::timeout)
72 {
Girish Gowdra96461052019-11-22 20:13:59 +053073 duration+=wait_granularity;
Shad Ansariedef2132018-08-10 22:14:50 +000074 if (duration > timeout)
75 {
76 duration = 0;
77 return std::pair<T, bool>({}, false);
78 }
79 }
Shad Ansari01b0e652018-04-05 21:02:53 +000080 }
81 auto val = queue_.front();
82 queue_.pop();
Shad Ansariedef2132018-08-10 22:14:50 +000083 return std::pair<T, bool>(val, true);
Shad Ansari01b0e652018-04-05 21:02:53 +000084 }
85
86 void pop(T& item)
87 {
88 std::unique_lock<std::mutex> mlock(mutex_);
89 while (queue_.empty())
90 {
91 cond_.wait(mlock);
92 }
93 item = queue_.front();
94 queue_.pop();
95 }
96
97 void push(const T& item)
98 {
99 std::unique_lock<std::mutex> mlock(mutex_);
100 queue_.push(item);
101 mlock.unlock();
102 cond_.notify_one();
103 }
104 Queue()=default;
105 Queue(const Queue&) = delete; // disable copying
106 Queue& operator=(const Queue&) = delete; // disable assignment
Orhan Kupusogluec57af02021-05-12 12:38:17 +0000107
Shad Ansari01b0e652018-04-05 21:02:53 +0000108 private:
109 std::queue<T> queue_;
110 std::mutex mutex_;
111 std::condition_variable cond_;
112};
113
114#endif