RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-newthread.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15 
17 {
18 private:
19  typedef new_thread this_type;
20  new_thread(const this_type&);
21 
22  struct new_worker : public worker_interface
23  {
24  private:
25  typedef new_worker this_type;
26 
27  typedef detail::action_queue queue_type;
28 
29  new_worker(const this_type&);
30 
31  struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32  {
33  typedef detail::schedulable_queue<
34  typename clock_type::time_point> queue_item_time;
35 
36  typedef queue_item_time::item_type item_type;
37 
38  virtual ~new_worker_state()
39  {
40  }
41 
42  explicit new_worker_state(composite_subscription cs)
43  : lifetime(cs)
44  {
45  }
46 
47  composite_subscription lifetime;
48  mutable std::mutex lock;
49  mutable std::condition_variable wake;
50  mutable queue_item_time q;
51  std::thread worker;
52  recursion r;
53  };
54 
55  std::shared_ptr<new_worker_state> state;
56 
57  public:
58  virtual ~new_worker()
59  {
60  }
61 
62  explicit new_worker(std::shared_ptr<new_worker_state> ws)
63  : state(ws)
64  {
65  }
66 
67  new_worker(composite_subscription cs, thread_factory& tf)
68  : state(std::make_shared<new_worker_state>(cs))
69  {
70  auto keepAlive = state;
71 
72  state->lifetime.add([keepAlive](){
73  std::unique_lock<std::mutex> guard(keepAlive->lock);
74  auto expired = std::move(keepAlive->q);
75  keepAlive->q = new_worker_state::queue_item_time{};
76  if (!keepAlive->q.empty()) std::terminate();
77  keepAlive->wake.notify_one();
78 
79  if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
80  guard.unlock();
81  keepAlive->worker.join();
82  }
83  else {
84  keepAlive->worker.detach();
85  }
86  });
87 
88  state->worker = tf([keepAlive](){
89 
90  // take ownership
91  queue_type::ensure(std::make_shared<new_worker>(keepAlive));
92  // release ownership
94  queue_type::destroy();
95  });
96 
97  for(;;) {
98  std::unique_lock<std::mutex> guard(keepAlive->lock);
99  if (keepAlive->q.empty()) {
100  keepAlive->wake.wait(guard, [keepAlive](){
101  return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
102  });
103  }
104  if (!keepAlive->lifetime.is_subscribed()) {
105  break;
106  }
107  auto& peek = keepAlive->q.top();
108  if (!peek.what.is_subscribed()) {
109  keepAlive->q.pop();
110  continue;
111  }
112  if (clock_type::now() < peek.when) {
113  keepAlive->wake.wait_until(guard, peek.when);
114  continue;
115  }
116  auto what = peek.what;
117  keepAlive->q.pop();
118  keepAlive->r.reset(keepAlive->q.empty());
119  guard.unlock();
120  what(keepAlive->r.get_recurse());
121  }
122  });
123  }
124 
125  virtual clock_type::time_point now() const {
126  return clock_type::now();
127  }
128 
129  virtual void schedule(const schedulable& scbl) const {
130  schedule(now(), scbl);
131  }
132 
133  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
134  if (scbl.is_subscribed()) {
135  std::unique_lock<std::mutex> guard(state->lock);
136  state->q.push(new_worker_state::item_type(when, scbl));
137  state->r.reset(false);
138  }
139  state->wake.notify_one();
140  }
141  };
142 
143  mutable thread_factory factory;
144 
145 public:
147  : factory([](std::function<void()> start){
148  return std::thread(std::move(start));
149  })
150  {
151  }
153  : factory(tf)
154  {
155  }
156  virtual ~new_thread()
157  {
158  }
159 
160  virtual clock_type::time_point now() const {
161  return clock_type::now();
162  }
163 
165  return worker(cs, std::make_shared<new_worker>(cs, factory));
166  }
167 };
168 
170  static scheduler instance = make_scheduler<new_thread>();
171  return instance;
172 }
174  return make_scheduler<new_thread>(tf);
175 }
176 
177 }
178 
179 }
180 
181 #endif
Definition: rx-scheduler.hpp:163
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:160
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
new_thread()
Definition: rx-newthread.hpp:146
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:156
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:152
recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:164
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:1024
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:544
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200