7 #include <condition_variable> 19 std::condition_variable cv;
25 std::atomic<bool> need_to_flush;
26 std::atomic<bool> was_flushed;
27 std::condition_variable was_flushed_cv;
28 std::mutex was_flushed_mutex;
31 : q(), mutex(), cv(), cap(cap), need_to_flush(
false), was_flushed(
false), accepting(
true)
36 std::unique_lock<std::mutex> lock(mutex);
39 q.push_back(std::move(item));
49 bool dequeue(T* item ,
unsigned int timeout_ms = 5000)
51 std::unique_lock<std::mutex> lock(mutex);
54 const auto ready = [
this]() {
return (q.size() > 0) || need_to_flush; };
55 if (!ready() && !cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
64 *item = std::move(q.front());
71 std::unique_lock<std::mutex> lock(mutex);
83 std::unique_lock<std::mutex> lock(mutex);
87 auto val = std::move(q.front());
89 *item = std::move(val);
97 std::unique_lock<std::mutex> lock(mutex);
100 need_to_flush =
true;
104 auto item = std::move(q.front());
112 std::unique_lock<std::mutex> lock(mutex);
113 need_to_flush =
false;
119 std::unique_lock<std::mutex> lock(mutex);
137 using namespace std::chrono;
139 std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
140 auto good = [&]() {
return _owner->_was_stopped.load(); };
141 return !(_owner->_was_stopped_cv.wait_for(lock, milliseconds(ms), good));
154 _thread = std::thread([&]()
158 std::function<void(cancellable_timer)> item;
160 if (_queue.dequeue(&item))
172 std::unique_lock<std::mutex> lock(_was_flushed_mutex);
175 _was_flushed_cv.notify_all();
188 _queue.enqueue(std::move(item));
194 std::unique_lock<std::mutex> lock(_was_stopped_mutex);
195 _was_stopped =
false;
203 std::unique_lock<std::mutex> lock(_was_stopped_mutex);
205 _was_stopped_cv.notify_all();
211 std::unique_lock<std::mutex> lock(_was_flushed_mutex);
212 _was_flushed =
false;
215 std::unique_lock<std::mutex> lock_was_flushed(_was_flushed_mutex);
216 _was_flushed_cv.wait_for(lock_was_flushed, std::chrono::hours(999999), [&]() {
return _was_flushed.load(); });
232 std::condition_variable cv;
233 bool invoked =
false;
234 auto wait_sucess = std::make_shared<std::atomic_bool>(
true);
238 if (_was_stopped || !(*wait_sucess))
242 std::lock_guard<std::mutex> locker(m);
247 std::unique_lock<std::mutex> locker(m);
248 *wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() {
return invoked || _was_stopped; });
256 std::atomic<bool> _was_stopped;
257 std::condition_variable _was_stopped_cv;
258 std::mutex _was_stopped_mutex;
260 std::atomic<bool> _was_flushed;
261 std::condition_variable _was_flushed_cv;
262 std::mutex _was_flushed_mutex;
264 std::atomic<bool> _is_alive;
267 template<
class T = std::function<
void(dispatcher::cancellable_timer)>>
272 : _operation(
std::move(operation)), _dispatcher(1), _stopped(true)
309 std::atomic<bool> _stopped;
void enqueue(T &&item)
Definition: concurrency.h:34
active_object(T operation)
Definition: concurrency.h:271
size_t size()
Definition: concurrency.h:117
bool dequeue(T *item, unsigned int timeout_ms=5000)
Definition: concurrency.h:49
~dispatcher()
Definition: concurrency.h:221
void start()
Definition: concurrency.h:192
Definition: concurrency.h:268
Definition: concurrency.h:125
void stop()
Definition: concurrency.h:284
dispatcher(unsigned int cap)
Definition: concurrency.h:148
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:131
bool try_sleep(int ms)
Definition: concurrency.h:135
void stop()
Definition: concurrency.h:200
bool peek(T **item)
Definition: concurrency.h:69
void invoke(T item)
Definition: concurrency.h:184
Definition: concurrency.h:128
void start()
Definition: concurrency.h:110
bool flush()
Definition: concurrency.h:229
void start()
Definition: concurrency.h:276
bool try_dequeue(T *item)
Definition: concurrency.h:81
void clear()
Definition: concurrency.h:95
Definition: concurrency.h:15
~active_object()
Definition: concurrency.h:290
const int QUEUE_MAX_SIZE
Definition: concurrency.h:12