5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
27 typedef detail::action_queue queue_type;
31 struct new_worker_state :
public std::enable_shared_from_this<new_worker_state>
33 typedef detail::schedulable_queue<
34 typename clock_type::time_point> queue_item_time;
36 typedef queue_item_time::item_type item_type;
38 virtual ~new_worker_state()
48 mutable std::mutex lock;
49 mutable std::condition_variable wake;
50 mutable queue_item_time q;
55 std::shared_ptr<new_worker_state> state;
62 explicit new_worker(std::shared_ptr<new_worker_state> ws)
68 : state(std::make_shared<new_worker_state>(cs))
70 auto keepAlive = state;
72 state->lifetime.add([keepAlive](){
73 std::unique_lock<std::mutex> guard(keepAlive->lock);
74 auto expired = std::move(keepAlive->q);
75 keepAlive->q = new_worker_state::queue_item_time{};
76 if (!keepAlive->q.empty()) std::terminate();
77 keepAlive->wake.notify_one();
79 if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
81 keepAlive->worker.join();
84 keepAlive->worker.detach();
88 state->worker = tf([keepAlive](){
91 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
94 queue_type::destroy();
98 std::unique_lock<std::mutex> guard(keepAlive->lock);
99 if (keepAlive->q.empty()) {
100 keepAlive->wake.wait(guard, [keepAlive](){
101 return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
104 if (!keepAlive->lifetime.is_subscribed()) {
107 auto& peek = keepAlive->q.top();
108 if (!peek.what.is_subscribed()) {
112 if (clock_type::now() < peek.when) {
113 keepAlive->wake.wait_until(guard, peek.when);
116 auto what = peek.what;
118 keepAlive->r.reset(keepAlive->q.empty());
120 what(keepAlive->r.get_recurse());
125 virtual clock_type::time_point
now()
const {
126 return clock_type::now();
129 virtual void schedule(
const schedulable& scbl)
const {
130 schedule(
now(), scbl);
133 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
135 std::unique_lock<std::mutex> guard(state->lock);
136 state->q.push(new_worker_state::item_type(when, scbl));
137 state->r.reset(
false);
139 state->wake.notify_one();
147 : factory([](std::function<void()> start){
148 return std::thread(std::move(start));
160 virtual clock_type::time_point
now()
const {
161 return clock_type::now();
165 return worker(cs, std::make_shared<new_worker>(cs, factory));
170 static scheduler instance = make_scheduler<new_thread>();
174 return make_scheduler<new_thread>(tf);
Definition: rx-scheduler.hpp:163
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:160
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
new_thread()
Definition: rx-newthread.hpp:146
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:156
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:152
recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:164
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:1024
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:544
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200