Intel® RealSense™ Cross Platform API
Intel Realsense Cross-platform API
concurrency.h
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #pragma once
5 #include <queue>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include <atomic>
10 #include <functional>
11 
12 const int QUEUE_MAX_SIZE = 10;
13 // Simplest implementation of a blocking concurrent queue for thread messaging
14 template<class T>
16 {
17  std::deque<T> q;
18  std::mutex mutex;
19  std::condition_variable cv; // not empty signal
20  unsigned int cap;
21  bool accepting;
22 
23  // flush mechanism is required to abort wait on cv
24  // when need to stop
25  std::atomic<bool> need_to_flush;
26  std::atomic<bool> was_flushed;
27  std::condition_variable was_flushed_cv;
28  std::mutex was_flushed_mutex;
29 public:
30  explicit single_consumer_queue<T>(unsigned int cap = QUEUE_MAX_SIZE)
31  : q(), mutex(), cv(), cap(cap), need_to_flush(false), was_flushed(false), accepting(true)
32  {}
33 
34  void enqueue(T&& item)
35  {
36  std::unique_lock<std::mutex> lock(mutex);
37  if (accepting)
38  {
39  q.push_back(std::move(item));
40  if (q.size() > cap)
41  {
42  q.pop_front();
43  }
44  }
45  lock.unlock();
46  cv.notify_one();
47  }
48 
49  bool dequeue(T* item ,unsigned int timeout_ms = 5000)
50  {
51  std::unique_lock<std::mutex> lock(mutex);
52  accepting = true;
53  was_flushed = false;
54  const auto ready = [this]() { return (q.size() > 0) || need_to_flush; };
55  if (!ready() && !cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
56  {
57  return false;
58  }
59 
60  if (q.size() <= 0)
61  {
62  return false;
63  }
64  *item = std::move(q.front());
65  q.pop_front();
66  return true;
67  }
68 
69  bool peek(T** item)
70  {
71  std::unique_lock<std::mutex> lock(mutex);
72 
73  if (q.size() <= 0)
74  {
75  return false;
76  }
77  *item = &q.front();
78  return true;
79  }
80 
81  bool try_dequeue(T* item)
82  {
83  std::unique_lock<std::mutex> lock(mutex);
84  accepting = true;
85  if (q.size() > 0)
86  {
87  auto val = std::move(q.front());
88  q.pop_front();
89  *item = std::move(val);
90  return true;
91  }
92  return false;
93  }
94 
95  void clear()
96  {
97  std::unique_lock<std::mutex> lock(mutex);
98 
99  accepting = false;
100  need_to_flush = true;
101 
102  while (q.size() > 0)
103  {
104  auto item = std::move(q.front());
105  q.pop_front();
106  }
107  cv.notify_all();
108  }
109 
110  void start()
111  {
112  std::unique_lock<std::mutex> lock(mutex);
113  need_to_flush = false;
114  accepting = true;
115  }
116 
117  size_t size()
118  {
119  std::unique_lock<std::mutex> lock(mutex);
120  return q.size();
121  }
122 };
123 
124 
126 {
127 public:
129  {
130  public:
132  : _owner(owner)
133  {}
134 
135  bool try_sleep(int ms)
136  {
137  using namespace std::chrono;
138 
139  std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
140  auto good = [&]() { return _owner->_was_stopped.load(); };
141  return !(_owner->_was_stopped_cv.wait_for(lock, milliseconds(ms), good));
142  }
143 
144  private:
145  dispatcher* _owner;
146  };
147 
148  dispatcher(unsigned int cap)
149  : _queue(cap),
150  _was_stopped(true),
151  _was_flushed(false),
152  _is_alive(true)
153  {
154  _thread = std::thread([&]()
155  {
156  while (_is_alive)
157  {
158  std::function<void(cancellable_timer)> item;
159 
160  if (_queue.dequeue(&item))
161  {
162  cancellable_timer time(this);
163 
164  try
165  {
166  item(time);
167  }
168  catch(...){}
169  }
170 
171 #ifndef ANDROID
172  std::unique_lock<std::mutex> lock(_was_flushed_mutex);
173 #endif
174  _was_flushed = true;
175  _was_flushed_cv.notify_all();
176 #ifndef ANDROID
177  lock.unlock();
178 #endif
179  }
180  });
181  }
182 
183  template<class T>
184  void invoke(T item)
185  {
186  if (!_was_stopped)
187  {
188  _queue.enqueue(std::move(item));
189  }
190  }
191 
192  void start()
193  {
194  std::unique_lock<std::mutex> lock(_was_stopped_mutex);
195  _was_stopped = false;
196 
197  _queue.start();
198  }
199 
200  void stop()
201  {
202  {
203  std::unique_lock<std::mutex> lock(_was_stopped_mutex);
204  _was_stopped = true;
205  _was_stopped_cv.notify_all();
206  }
207 
208  _queue.clear();
209 
210  {
211  std::unique_lock<std::mutex> lock(_was_flushed_mutex);
212  _was_flushed = false;
213  }
214 
215  std::unique_lock<std::mutex> lock_was_flushed(_was_flushed_mutex);
216  _was_flushed_cv.wait_for(lock_was_flushed, std::chrono::hours(999999), [&]() { return _was_flushed.load(); });
217 
218  _queue.start();
219  }
220 
222  {
223  stop();
224  _queue.clear();
225  _is_alive = false;
226  _thread.join();
227  }
228 
229  bool flush()
230  {
231  std::mutex m;
232  std::condition_variable cv;
233  bool invoked = false;
234  auto wait_sucess = std::make_shared<std::atomic_bool>(true);
235  invoke([&, wait_sucess](cancellable_timer t)
236  {
238  if (_was_stopped || !(*wait_sucess))
239  return;
240 
241  {
242  std::lock_guard<std::mutex> locker(m);
243  invoked = true;
244  }
245  cv.notify_one();
246  });
247  std::unique_lock<std::mutex> locker(m);
248  *wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
249  return *wait_sucess;
250  }
251 private:
252  friend cancellable_timer;
254  std::thread _thread;
255 
256  std::atomic<bool> _was_stopped;
257  std::condition_variable _was_stopped_cv;
258  std::mutex _was_stopped_mutex;
259 
260  std::atomic<bool> _was_flushed;
261  std::condition_variable _was_flushed_cv;
262  std::mutex _was_flushed_mutex;
263 
264  std::atomic<bool> _is_alive;
265 };
266 
267 template<class T = std::function<void(dispatcher::cancellable_timer)>>
269 {
270 public:
271  active_object(T operation)
272  : _operation(std::move(operation)), _dispatcher(1), _stopped(true)
273  {
274  }
275 
276  void start()
277  {
278  _stopped = false;
279  _dispatcher.start();
280 
281  do_loop();
282  }
283 
284  void stop()
285  {
286  _stopped = true;
287  _dispatcher.stop();
288  }
289 
291  {
292  stop();
293  }
294 private:
295  void do_loop()
296  {
297  _dispatcher.invoke([this](dispatcher::cancellable_timer ct)
298  {
299  _operation(ct);
300  if (!_stopped)
301  {
302  do_loop();
303  }
304  });
305  }
306 
307  T _operation;
308  dispatcher _dispatcher;
309  std::atomic<bool> _stopped;
310 };
void enqueue(T &&item)
Definition: concurrency.h:34
active_object(T operation)
Definition: concurrency.h:271
size_t size()
Definition: concurrency.h:117
bool dequeue(T *item, unsigned int timeout_ms=5000)
Definition: concurrency.h:49
~dispatcher()
Definition: concurrency.h:221
void start()
Definition: concurrency.h:192
Definition: concurrency.h:268
Definition: concurrency.h:125
void stop()
Definition: concurrency.h:284
Definition: stream.h:188
dispatcher(unsigned int cap)
Definition: concurrency.h:148
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:131
bool try_sleep(int ms)
Definition: concurrency.h:135
void stop()
Definition: concurrency.h:200
bool peek(T **item)
Definition: concurrency.h:69
void invoke(T item)
Definition: concurrency.h:184
Definition: concurrency.h:128
void start()
Definition: concurrency.h:110
bool flush()
Definition: concurrency.h:229
void start()
Definition: concurrency.h:276
bool try_dequeue(T *item)
Definition: concurrency.h:81
void clear()
Definition: concurrency.h:95
Definition: concurrency.h:15
~active_object()
Definition: concurrency.h:290
const int QUEUE_MAX_SIZE
Definition: concurrency.h:12