25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
30#include <condition_variable>
68template <
typename T,
class Container=std::deque<T>>
84 mutable std::mutex lock_;
86 std::condition_variable notEmptyCond_;
88 std::condition_variable notFullCond_;
92 std::queue<T,Container> que_;
95 using guard = std::lock_guard<std::mutex>;
97 using unique_guard = std::unique_lock<std::mutex>;
152 unique_guard g(lock_);
153 notFullCond_.wait(g, [
this]{
return que_.size() < cap_;});
155 que_.emplace(std::move(val));
157 notEmptyCond_.notify_one();
166 unique_guard g(lock_);
167 if (que_.size() >= cap_)
170 que_.emplace(std::move(val));
172 notEmptyCond_.notify_one();
184 template <
typename Rep,
class Period>
186 unique_guard g(lock_);
187 if (!notFullCond_.wait_for(g, relTime, [
this]{return que_.size() < cap_;}))
190 que_.emplace(std::move(val));
192 notEmptyCond_.notify_one();
205 template <
class Clock,
class Duration>
207 unique_guard g(lock_);
208 if (!notFullCond_.wait_until(g, absTime, [
this]{return que_.size() < cap_;}))
211 que_.emplace(std::move(val));
213 notEmptyCond_.notify_one();
226 unique_guard g(lock_);
227 notEmptyCond_.wait(g, [
this]{
return !que_.empty();});
229 *val = std::move(que_.front());
232 notFullCond_.notify_one();
241 unique_guard g(lock_);
242 notEmptyCond_.wait(g, [
this]{
return !que_.empty();});
247 notFullCond_.notify_one();
262 unique_guard g(lock_);
266 *val = std::move(que_.front());
269 notFullCond_.notify_one();
282 template <
typename Rep,
class Period>
287 unique_guard g(lock_);
288 if (!notEmptyCond_.wait_for(g, relTime, [
this]{return !que_.empty();}))
291 *val = std::move(que_.front());
294 notFullCond_.notify_one();
307 template <
class Clock,
class Duration>
312 unique_guard g(lock_);
313 if (!notEmptyCond_.wait_until(g, absTime, [
this]{return !que_.empty();}))
316 *val = std::move(que_.front());
319 notFullCond_.notify_one();
Definition thread_queue.h:70
typename Container::size_type size_type
Definition thread_queue.h:77
T value_type
Definition thread_queue.h:75
size_type size() const
Definition thread_queue.h:141
void capacity(size_type cap)
Definition thread_queue.h:133
Container container_type
Definition thread_queue.h:73
bool try_put(value_type val)
Definition thread_queue.h:165
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:283
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:185
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:80
bool try_get(value_type *val)
Definition thread_queue.h:258
thread_queue()
Definition thread_queue.h:103
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:206
void get(value_type *val)
Definition thread_queue.h:222
bool empty() const
Definition thread_queue.h:115
size_type capacity() const
Definition thread_queue.h:123
thread_queue(size_t cap)
Definition thread_queue.h:109
void put(value_type val)
Definition thread_queue.h:151
value_type get()
Definition thread_queue.h:240
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:308
Definition async_client.h:49