Intel® RealSense™ Cross Platform API
Intel Realsense Cross-platform API
Loading...
Searching...
No Matches
concurrency.h
Go to the documentation of this file.
1// License: Apache 2.0. See LICENSE file in root directory.
2// Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3
4#pragma once
5#include <queue>
6#include <mutex>
7#include <condition_variable>
8#include <thread>
9#include <atomic>
10#include <functional>
11#include <cassert>
12
13const int QUEUE_MAX_SIZE = 10;
14// Simplest implementation of a blocking concurrent queue for thread messaging
15template<class T>
17{
18 std::deque<T> _queue;
19 mutable std::mutex _mutex;
20 std::condition_variable _deq_cv; // not empty signal
21 std::condition_variable _enq_cv; // not full signal
22
23 unsigned int const _cap;
24 bool _accepting;
25
26 std::function<void(T const &)> const _on_drop_callback;
27
28public:
29 explicit single_consumer_queue< T >( unsigned int cap = QUEUE_MAX_SIZE,
30 std::function< void( T const & ) > on_drop_callback = nullptr )
31 : _cap( cap )
32 , _accepting( true )
33 , _on_drop_callback( on_drop_callback )
34 {
35 }
36
37 // Enqueue an item onto the queue.
38 // If the queue grows beyond capacity, the front will be removed, losing whatever was there!
39 bool enqueue(T&& item)
40 {
41 std::unique_lock<std::mutex> lock(_mutex);
42 if( ! _accepting )
43 {
44 if( _on_drop_callback )
45 _on_drop_callback( item );
46 return false;
47 }
48
49 _queue.push_back(std::move(item));
50
51 if( _queue.size() > _cap )
52 {
53 if( _on_drop_callback )
54 _on_drop_callback( _queue.front() );
55 _queue.pop_front();
56 }
57
58 lock.unlock();
59
60 // We pushed something -- let others know there's something to dequeue
61 _deq_cv.notify_one();
62
63 return true;
64 }
65
66
67 // Enqueue an item, but wait for room if there isn't any
68 // Returns true if the enqueue succeeded
69 bool blocking_enqueue(T&& item)
70 {
71 std::unique_lock<std::mutex> lock(_mutex);
72 _enq_cv.wait( lock, [this]() {
73 return _queue.size() < _cap; } );
74 if( ! _accepting )
75 {
76 // We shouldn't be adding anything to the queue when we're stopping
77 if( _on_drop_callback )
78 _on_drop_callback( item );
79 return false;
80 }
81
82 _queue.push_back(std::move(item));
83 lock.unlock();
84
85 // We pushed something -- let another know there's something to dequeue
86 _deq_cv.notify_one();
87
88 return true;
89 }
90
91
92 // Remove one item; if unavailable, wait for it
93 // Return true if an item was removed -- otherwise, false
94 bool dequeue( T * item, unsigned int timeout_ms )
95 {
96 std::unique_lock<std::mutex> lock(_mutex);
97 if( ! _deq_cv.wait_for( lock,
98 std::chrono::milliseconds( timeout_ms ),
99 [this]() { return ! _accepting || ! _queue.empty(); } )
100 || _queue.empty() )
101 {
102 return false;
103 }
104
105 *item = std::move(_queue.front());
106 _queue.pop_front();
107
108 // We've made room -- let whoever is waiting for room know about it
109 _enq_cv.notify_one();
110
111 return true;
112 }
113
114 // Remove one item if available; do not wait for one
115 // Return true if an item was removed -- otherwise, false
116 bool try_dequeue(T* item)
117 {
118 std::lock_guard< std::mutex > lock( _mutex );
119 if( _queue.empty() )
120 return false;
121
122 *item = std::move(_queue.front());
123 _queue.pop_front();
124
125 // We've made room -- let whoever is waiting for room know about it
126 _enq_cv.notify_one();
127
128 return true;
129 }
130
131 template< class Fn >
132 bool peek( Fn fn ) const
133 {
134 std::lock_guard< std::mutex > lock( _mutex );
135 if( _queue.empty() )
136 return false;
137 fn( _queue.front() );
138 return true;
139 }
140
141 template< class Fn >
142 bool peek( Fn fn )
143 {
144 std::lock_guard< std::mutex > lock( _mutex );
145 if( _queue.empty() )
146 return false;
147 fn( _queue.front() );
148 return true;
149 }
150
151 void stop()
152 {
153 std::lock_guard< std::mutex > lock( _mutex );
154
155 // We no longer accept any more items!
156 _accepting = false;
157
158 _clear();
159 }
160
161 void clear()
162 {
163 std::lock_guard< std::mutex > lock( _mutex );
164 _clear();
165 }
166
167protected:
168 void _clear()
169 {
170 _queue.clear();
171
172 // Wake up anyone who is waiting for room to enqueue, or waiting for something to dequeue -- there's nothing now
173 _enq_cv.notify_all();
174 _deq_cv.notify_all();
175 }
176
177public:
178 void start()
179 {
180 std::lock_guard< std::mutex > lock( _mutex );
181 _accepting = true;
182 }
183
184 bool started() const { return _accepting; }
185 bool stopped() const { return ! started(); }
186
187 size_t size() const
188 {
189 std::lock_guard< std::mutex > lock( _mutex );
190 return _queue.size();
191 }
192
193 bool empty() const { return ! size(); }
194};
195
196// A single_consumer_queue meant to hold frame_holder objects
197template<class T>
199{
201
202public:
204 std::function< void( T const & ) > on_drop_callback = nullptr )
205 : _queue( cap, on_drop_callback )
206 {
207 }
208
209 bool enqueue( T && item )
210 {
211 if( item->is_blocking() )
212 return _queue.blocking_enqueue( std::move( item ) );
213 else
214 return _queue.enqueue( std::move( item ) );
215 }
216
217 bool dequeue(T* item, unsigned int timeout_ms)
218 {
219 return _queue.dequeue(item, timeout_ms);
220 }
221
222 bool try_dequeue(T* item)
223 {
224 return _queue.try_dequeue(item);
225 }
226
227 template< class Fn >
228 bool peek( Fn fn ) const
229 {
230 return _queue.peek( fn );
231 }
232
233 template< class Fn >
234 bool peek( Fn fn )
235 {
236 return _queue.peek( fn );
237 }
238
239 void clear()
240 {
241 _queue.clear();
242 }
243
244 void stop()
245 {
246 _queue.stop();
247 }
248
249 void start()
250 {
251 _queue.start();
252 }
253
254 size_t size() const
255 {
256 return _queue.size();
257 }
258
259 bool empty() const
260 {
261 return _queue.empty();
262 }
263
264 bool started() const { return _queue.started(); }
265 bool stopped() const { return _queue.stopped(); }
266};
267
268// The dispatcher is responsible for dispatching generic 'actions': any thread can queue an action
269// (lambda) for dispatch, while the dispatcher maintains a single thread that runs these actions one
270// at a time.
271//
273{
274public:
275 // An action, when run, takes a 'cancellable_timer', which it may ignore. This class allows the
276 // action to perform some sleep and know that, if the dispatcher is shutting down, its sleep
277 // will still be interrupted.
278 //
280 {
281 dispatcher* _owner;
282
283 public:
285 : _owner(owner)
286 {}
287
288 bool was_stopped() const { return _owner->_was_stopped.load(); }
289
290 // Replacement for sleep() -- try to sleep for a time, but stop if the
291 // dispatcher is stopped
292 //
293 // Return false if the dispatcher was stopped, true otherwise
294 //
295 template< class Duration >
296 bool try_sleep( Duration sleep_time )
297 {
298 using namespace std::chrono;
299
300 std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
301 if( was_stopped() )
302 return false;
303 // wait_for() returns "false if the predicate pred still evaluates to false after the
304 // rel_time timeout expired, otherwise true"
305 return ! (
306 _owner->_was_stopped_cv.wait_for( lock, sleep_time, [&]() { return was_stopped(); } ) );
307 }
308 };
309
310 // An action is any functor that accepts a cancellable timer
311 typedef std::function<void(cancellable_timer const &)> action;
312
313 // Certain conditions (see invoke()) may cause actions to be lost, e.g. when the queue gets full
314 // and we're non-blocking. The on_drop_callback allows caputring of these instances, if we
315 // want...
316 //
317 dispatcher( unsigned int queue_capacity,
318 std::function< void( action ) > on_drop_callback = nullptr );
319
321
322 bool empty() const { return _queue.empty(); }
323
324 // Main invocation of an action: this will be called from any thread, and basically just queues
325 // up the actions for our dispatching thread to handle them.
326 //
327 // A blocking invocation means that it will wait until there's room in the queue: if not
328 // blocking and the queue is full, the action will be queued at the expense of the next action
329 // in line (the oldest) for dispatch!
330 //
331 template<class T>
332 void invoke(T item, bool is_blocking = false)
333 {
334 if (!_was_stopped)
335 {
336 if(is_blocking)
337 _queue.blocking_enqueue(std::move(item));
338 else
339 _queue.enqueue(std::move(item));
340 }
341 }
342
343 // Like above, but synchronous: will return only when the action has actually been dispatched.
344 //
345 template<class T>
346 void invoke_and_wait(T item, std::function<bool()> exit_condition, bool is_blocking = false)
347 {
348 bool done = false;
349
350 //action
351 auto func = std::move(item);
353 {
354 std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
355 func(c);
356
357 done = true;
358 _blocking_invoke_cv.notify_one();
359 }, is_blocking);
360
361 //wait
362 std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
363 _blocking_invoke_cv.wait(lk, [&](){ return done || exit_condition(); });
364 }
365
366 // Stops the dispatcher. This is not a pause: it will clear out the queue, losing any pending
367 // actions!
368 //
369 void stop();
370
371 // A dispatcher starts out 'started' after construction. It can then be stopped and started
372 // again if needed.
373 //
374 void start();
375
376 // Return when all items in the queue are finished (within a timeout).
377 // If additional items are added while we're waiting, those will not be waited on!
378 //
379 bool flush(std::chrono::steady_clock::duration timeout = std::chrono::seconds(10) );
380
381
382private:
383 // Return true if dispatcher is started (within a timeout).
384 // false if not or the dispatcher is no longer alive
385 //
386 bool _wait_for_start( int timeout_ms );
387
388 friend cancellable_timer;
389
390 single_consumer_queue<std::function<void(cancellable_timer)>> _queue;
391 std::thread _thread;
392
393 std::atomic<bool> _was_stopped;
394 std::condition_variable _was_stopped_cv;
395 std::mutex _was_stopped_mutex;
396
397 std::mutex _dispatch_mutex;
398
399 std::condition_variable _blocking_invoke_cv;
400 std::mutex _blocking_invoke_mutex;
401
402 std::atomic<bool> _is_alive;
403};
404
405template<class T = std::function<void(dispatcher::cancellable_timer)>>
407{
408public:
409 active_object(T operation)
410 : _operation(std::move(operation)), _dispatcher(1), _stopped(true)
411 {
412 }
413
414 void start()
415 {
416 _stopped = false;
417 _dispatcher.start();
418
419 do_loop();
420 }
421
422 void stop()
423 {
424 if (!_stopped.load()) {
425 _stopped = true;
426 _dispatcher.stop();
427 }
428 }
429
431 {
432 stop();
433 }
434
435 bool is_active() const
436 {
437 return !_stopped;
438 }
439private:
440 void do_loop()
441 {
442 _dispatcher.invoke([this](dispatcher::cancellable_timer ct)
443 {
444 _operation(ct);
445 if (!_stopped)
446 {
447 do_loop();
448 }
449 });
450 }
451
452 T _operation;
453 dispatcher _dispatcher;
454 std::atomic<bool> _stopped;
455};
456
458{
459public:
460 watchdog(std::function<void()> operation, uint64_t timeout_ms) :
461 _timeout_ms(timeout_ms), _operation(std::move(operation))
462 {
463 _watcher = std::make_shared<active_object<>>([this](dispatcher::cancellable_timer cancellable_timer)
464 {
465 if(cancellable_timer.try_sleep( std::chrono::milliseconds( _timeout_ms )))
466 {
467 if(!_kicked)
468 _operation();
469 std::lock_guard<std::mutex> lk(_m);
470 _kicked = false;
471 }
472 });
473 }
474
476 {
477 if(_running)
478 stop();
479 }
480
481 void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running = true; }
482 void stop() { { std::lock_guard<std::mutex> lk(_m); _running = false; } _watcher->stop(); }
483 bool running() { std::lock_guard<std::mutex> lk(_m); return _running; }
484 void set_timeout(uint64_t timeout_ms) { std::lock_guard<std::mutex> lk(_m); _timeout_ms = timeout_ms; }
485 void kick() { std::lock_guard<std::mutex> lk(_m); _kicked = true; }
486
487private:
488 std::mutex _m;
489 uint64_t _timeout_ms;
490 bool _kicked = false;
491 bool _running = false;
492 std::function<void()> _operation;
493 std::shared_ptr<active_object<>> _watcher;
494};
Definition: concurrency.h:407
active_object(T operation)
Definition: concurrency.h:409
void start()
Definition: concurrency.h:414
~active_object()
Definition: concurrency.h:430
void stop()
Definition: concurrency.h:422
bool is_active() const
Definition: concurrency.h:435
Definition: concurrency.h:280
bool was_stopped() const
Definition: concurrency.h:288
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:284
bool try_sleep(Duration sleep_time)
Definition: concurrency.h:296
Definition: concurrency.h:273
std::function< void(cancellable_timer const &)> action
Definition: concurrency.h:311
dispatcher(unsigned int queue_capacity, std::function< void(action) > on_drop_callback=nullptr)
void start()
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)
Definition: concurrency.h:346
void stop()
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:332
bool flush(std::chrono::steady_clock::duration timeout=std::chrono::seconds(10))
bool empty() const
Definition: concurrency.h:322
Definition: concurrency.h:199
void start()
Definition: concurrency.h:249
bool empty() const
Definition: concurrency.h:259
bool try_dequeue(T *item)
Definition: concurrency.h:222
size_t size() const
Definition: concurrency.h:254
void clear()
Definition: concurrency.h:239
bool enqueue(T &&item)
Definition: concurrency.h:209
bool peek(Fn fn)
Definition: concurrency.h:234
bool stopped() const
Definition: concurrency.h:265
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:217
bool started() const
Definition: concurrency.h:264
void stop()
Definition: concurrency.h:244
bool peek(Fn fn) const
Definition: concurrency.h:228
Definition: concurrency.h:17
void _clear()
Definition: concurrency.h:168
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:94
bool blocking_enqueue(T &&item)
Definition: concurrency.h:69
bool peek(Fn fn)
Definition: concurrency.h:142
void start()
Definition: concurrency.h:178
bool peek(Fn fn) const
Definition: concurrency.h:132
bool stopped() const
Definition: concurrency.h:185
bool try_dequeue(T *item)
Definition: concurrency.h:116
void stop()
Definition: concurrency.h:151
bool enqueue(T &&item)
Definition: concurrency.h:39
bool started() const
Definition: concurrency.h:184
size_t size() const
Definition: concurrency.h:187
bool empty() const
Definition: concurrency.h:193
void clear()
Definition: concurrency.h:161
Definition: concurrency.h:458
void stop()
Definition: concurrency.h:482
watchdog(std::function< void()> operation, uint64_t timeout_ms)
Definition: concurrency.h:460
void kick()
Definition: concurrency.h:485
void set_timeout(uint64_t timeout_ms)
Definition: concurrency.h:484
~watchdog()
Definition: concurrency.h:475
void start()
Definition: concurrency.h:481
bool running()
Definition: concurrency.h:483
const int QUEUE_MAX_SIZE
Definition: concurrency.h:13