5 #if !defined(RXCPP_RX_SCHEDULER_HPP) 6 #define RXCPP_RX_SCHEDULER_HPP 12 namespace schedulers {
20 typedef std::shared_ptr<action_type> action_ptr;
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
31 inline action_ptr shared_empty() {
32 static action_ptr shared_empty = std::make_shared<detail::action_type>();
66 mutable bool isrequested;
73 , requestor(isrequested)
97 mutable bool isallowed;
103 , recursor(isallowed)
108 , recursor(isallowed)
112 inline void reset(
bool b =
true)
const {
133 detail::action_ptr inner;
139 : inner(std::move(i))
145 return action(detail::shared_empty());
164 :
public std::enable_shared_from_this<worker_interface>
173 virtual clock_type::time_point
now()
const = 0;
182 struct is_action_function
186 static auto check(
int) -> decltype((*(CF*)
nullptr)(*(schedulable*)
nullptr));
188 static not_void check(...);
190 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)),
void>::value;
203 detail::worker_interface_ptr inner;
216 , lifetime(std::move(cs))
221 , lifetime(std::move(cs))
238 return lifetime.
add(std::move(s));
241 return lifetime.
remove(std::move(w));
244 return lifetime.clear();
253 inline clock_type::time_point
now()
const {
293 template<
class Arg0,
class... ArgN>
294 auto schedule(Arg0&& a0, ArgN&&... an)
const 295 ->
typename std::enable_if<
296 (detail::is_action_function<Arg0>::value ||
299 template<
class... ArgN>
304 template<
class Arg0,
class... ArgN>
305 auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an)
const 306 ->
typename std::enable_if<
307 (detail::is_action_function<Arg0>::value ||
311 template<
class... ArgN>
315 template<
class Arg0,
class... ArgN>
316 auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an)
const 317 ->
typename std::enable_if<
318 (detail::is_action_function<Arg0>::value ||
322 template<
class... ArgN>
327 return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
330 return !(lhs == rhs);
335 detail::worker_interface_weak_ptr inner;
344 , lifetime(owner.lifetime)
354 :
public std::enable_shared_from_this<scheduler_interface>
363 virtual clock_type::time_point
now()
const = 0;
386 detail::scheduler_interface_ptr inner;
395 : inner(std::move(i))
398 explicit scheduler(detail::const_scheduler_interface_ptr i)
404 inline clock_type::time_point
now()
const {
413 return inner->create_worker(cs);
417 template<
class Scheduler,
class... ArgN>
419 return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
451 class recursed_scope_type
455 class exit_recursed_scope_type
457 const recursed_scope_type* that;
459 ~exit_recursed_scope_type()
461 if (that !=
nullptr) {
462 that->requestor =
nullptr;
465 exit_recursed_scope_type(
const recursed_scope_type* that)
469 exit_recursed_scope_type(exit_recursed_scope_type && other)
RXCPP_NOEXCEPT 472 other.that =
nullptr;
476 recursed_scope_type()
480 recursed_scope_type(
const recursed_scope_type&)
485 recursed_scope_type& operator=(
const recursed_scope_type& )
490 exit_recursed_scope_type reset(
const recurse& r)
const {
492 return exit_recursed_scope_type(
this);
501 recursed_scope_type recursed_scope;
522 , activity(std::move(a))
528 : lifetime(std::move(cs))
530 , activity(std::move(a))
532 , action_scope(controller.lock().
add(lifetime))
539 , activity(std::move(a))
540 , scoped(scbl.scoped)
552 return controller.
lock();
555 return controller.
lock();
569 -> decltype(recursed_scope.reset(r)) {
570 return recursed_scope.reset(r);
576 return recursed_scope.is_recursed();
596 return lifetime.
add(std::move(s));
600 ->
typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value,
weak_subscription>::type {
604 return lifetime.
remove(std::move(w));
607 return lifetime.clear();
615 inline clock_type::time_point
now()
const {
616 return controller.
lock().
now();
625 inline void schedule(clock_type::time_point when)
const {
631 inline void schedule(clock_type::duration when)
const {
644 detacher protect(
this);
646 protect.that =
nullptr;
650 struct current_thread;
655 :
public std::enable_shared_from_this<action_type>
657 typedef action_type this_type;
660 typedef std::function<void(
const schedulable&,
const recurse&)> function_type;
670 action_type(function_type f)
675 inline void operator()(
const schedulable& s,
const recurse& r) {
683 class action_tailrecurser
684 :
public std::enable_shared_from_this<action_type>
686 typedef action_type this_type;
689 typedef std::function<void(
const schedulable&)> function_type;
695 action_tailrecurser()
699 action_tailrecurser(function_type f)
704 inline void operator()(
const schedulable& s,
const recurse& r) {
709 auto scope = s.set_recursed(r);
710 while (s.is_subscribed()) {
713 if (!r.is_allowed() || !r.is_requested()) {
714 if (r.is_requested()) {
736 static_assert(detail::is_action_function<F>::value,
"action function must be void(schedulable)");
737 auto fn = std::forward<F>(f);
738 return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
763 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
768 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
773 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
778 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
783 ->
typename std::enable_if<detail::is_action_function<F>::value,
schedulable>::type {
789 return schedulable(cs, scbl.get_worker(), scbl.get_action());
800 template<
class Arg0,
class... ArgN>
802 ->
typename std::enable_if<
803 (detail::is_action_function<Arg0>::value ||
806 auto scbl =
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
808 inner->schedule(std::move(scbl));
811 template<
class... ArgN>
815 inner->schedule(std::move(rescbl));
819 template<
class Arg0,
class... ArgN>
821 ->
typename std::enable_if<
822 (detail::is_action_function<Arg0>::value ||
825 auto scbl =
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
827 inner->schedule(when, std::move(scbl));
830 template<
class... ArgN>
834 inner->schedule(when, std::move(rescbl));
838 template<
class Arg0,
class... ArgN>
840 ->
typename std::enable_if<
841 (detail::is_action_function<Arg0>::value ||
844 schedule_periodically_rebind(initial, period,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
846 template<
class... ArgN>
848 auto keepAlive = *
this;
849 auto target = std::make_shared<clock_type::time_point>(initial);
850 auto activity =
make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
853 [keepAlive, target, period, activity](
schedulable self) {
861 self.schedule(*target);
863 trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
864 inner->schedule(*target, periodic);
870 template<
class TimePo
int>
871 struct time_schedulable
873 typedef TimePoint time_point_type;
888 template<
class TimePo
int>
889 class schedulable_queue {
891 typedef time_schedulable<TimePoint> item_type;
892 typedef std::pair<item_type, int64_t> elem_type;
893 typedef std::vector<elem_type> container_type;
894 typedef const item_type& const_reference;
899 bool operator()(
const elem_type& lhs,
const elem_type& rhs)
const {
900 if (lhs.first.when == rhs.first.when) {
901 return lhs.second > rhs.second;
904 return lhs.first.when > rhs.first.when;
909 typedef std::priority_queue<
925 const_reference top()
const {
926 return q.top().first;
937 void push(
const item_type& value) {
938 q.push(elem_type(value, ordinal++));
941 void push(item_type&& value) {
942 q.push(elem_type(std::move(value), ordinal++));
949 namespace rxsc=schedulers;
tag_action action_tag
Definition: rx-scheduler.hpp:124
void schedule_rebind(const schedulable &scbl, ArgN &&... an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:812
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:209
Definition: rx-scheduler.hpp:163
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:192
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
void schedule(clock_type::time_point when) const
put this on the queue of the stored scheduler to run at the specified time
Definition: rx-scheduler.hpp:625
virtual ~scheduler_interface()
Definition: rx-scheduler.hpp:361
Definition: rx-all.hpp:26
bool is_recursed() const
Definition: rx-scheduler.hpp:575
void operator()() const
request to be rescheduled
Definition: rx-scheduler.hpp:56
void clear() const
Definition: rx-scheduler.hpp:606
friend bool operator==(const worker &, const worker &)
Definition: rx-scheduler.hpp:326
worker(composite_subscription cs, detail::const_worker_interface_ptr i)
Definition: rx-scheduler.hpp:214
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:545
schedulable(composite_subscription cs, worker q, action a)
action and worker have independent lifetimes
Definition: rx-scheduler.hpp:527
const recurse & get_recurse() const
get the recurse to pass into each action being called
Definition: rx-scheduler.hpp:116
tag_scheduler scheduler_tag
Definition: rx-scheduler.hpp:155
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
void unsubscribe() const
Definition: rx-scheduler.hpp:609
Definition: rx-scheduler.hpp:152
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:742
action(detail::action_ptr i)
Definition: rx-scheduler.hpp:138
#define RXCPP_NOEXCEPT
Definition: rx-includes.hpp:157
virtual worker create_worker(composite_subscription cs) const =0
Definition: rx-scheduler.hpp:158
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:505
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:219
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:465
action make_action(F &&f)
Definition: rx-scheduler.hpp:735
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
void operator()(const schedulable &s, const recurse &r) const
call the function
Definition: rx-scheduler.hpp:726
bool operator!=(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:329
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:228
Definition: rx-scheduler.hpp:46
virtual void schedule(const schedulable &scbl) const =0
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:504
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
virtual ~worker_interface()
Definition: rx-scheduler.hpp:171
worker(composite_subscription cs, worker o)
Definition: rx-scheduler.hpp:219
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:595
void schedule(clock_type::time_point when, const schedulable &scbl) const
insert the supplied schedulable to be run at the time specified
Definition: rx-scheduler.hpp:264
void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:280
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
clock_type::time_point now() const
Definition: rx-scheduler.hpp:615
action & get_action()
Definition: rx-scheduler.hpp:560
weak_worker()
Definition: rx-scheduler.hpp:339
Definition: rx-subscription.hpp:31
action provides type-forgetting for a potentially recursive set of calls to a function that takes a s...
Definition: rx-scheduler.hpp:130
const action & get_action() const
Definition: rx-scheduler.hpp:557
Definition: rx-predef.hpp:21
scheduler make_scheduler(ArgN &&... an)
Definition: rx-scheduler.hpp:418
void operator()(const recurse &r) const
invokes the action
Definition: rx-scheduler.hpp:640
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
Definition: rx-predef.hpp:30
Definition: rx-scheduler.hpp:122
static action empty()
return the empty action
Definition: rx-scheduler.hpp:144
Definition: rx-predef.hpp:58
static composite_subscription empty()
Definition: rx-subscription.hpp:499
void reset() const
reset the function request. call before each call to the function.
Definition: rx-scheduler.hpp:85
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:208
void unsubscribe() const
Definition: rx-scheduler.hpp:246
scheduler(detail::scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:394
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:225
scheduler()
Definition: rx-scheduler.hpp:391
weak_worker(worker &owner)
Definition: rx-scheduler.hpp:342
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
clock_type::time_point now() const
return the current time for this worker
Definition: rx-scheduler.hpp:253
void schedule(clock_type::duration when, const schedulable &scbl) const
insert the supplied schedulable to be run at now() + the delay specified
Definition: rx-scheduler.hpp:273
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:287
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
action make_action_empty()
Definition: rx-scheduler.hpp:730
friend bool operator==(const scheduler &, const scheduler &)
virtual clock_type::time_point now() const =0
auto set_recursed(const recurse &r) const -> decltype(recursed_scope.reset(r))
Definition: rx-scheduler.hpp:568
bool is_allowed() const
does the scheduler allow tail-recursion now?
Definition: rx-scheduler.hpp:77
recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
Definition: rx-scheduler.hpp:95
recurse(bool &a)
Definition: rx-scheduler.hpp:70
void operator()() const
Definition: rx-scheduler.hpp:586
void clear() const
Definition: rx-scheduler.hpp:243
schedulable()
Definition: rx-scheduler.hpp:513
recursion()
Definition: rx-scheduler.hpp:101
tag_worker worker_tag
Definition: rx-scheduler.hpp:160
Definition: rx-scheduler.hpp:369
bool is_requested() const
did the function request to be recursed?
Definition: rx-scheduler.hpp:81
Definition: rx-scheduler.hpp:63
bool is_subscribed() const
Definition: rx-subscription.hpp:172
bool operator==(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:326
auto add(F f) const -> typename std::enable_if< rxcpp::detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-scheduler.hpp:599
Definition: rx-scheduler.hpp:353
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:603
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:548
Definition: rx-predef.hpp:56
bool is_subscribed() const
Definition: rx-scheduler.hpp:234
void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl, ArgN &&... an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:847
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
Definition: rx-scheduler.hpp:333
void unsubscribe() const
Definition: rx-subscription.hpp:178
Definition: rx-subscription.hpp:29
scheduler(detail::const_scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:398
recursed(bool &r)
Definition: rx-scheduler.hpp:51
worker get_worker()
Definition: rx-scheduler.hpp:554
void schedule() const
put this on the queue of the stored scheduler to run asap
Definition: rx-scheduler.hpp:619
Definition: rx-subscription.hpp:67
worker lock() const
Definition: rx-scheduler.hpp:348
static schedulable empty(worker sc)
Definition: rx-scheduler.hpp:564
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:544
schedulable(worker q, action a)
action and worker share lifetime
Definition: rx-scheduler.hpp:519
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:527
const recursed & get_recursed() const
get the recursed to set into the schedulable for the function to use to request recursion
Definition: rx-scheduler.hpp:89
void schedule(clock_type::duration when) const
put this on the queue of the stored scheduler to run after a delay from now
Definition: rx-scheduler.hpp:631
worker()
Definition: rx-scheduler.hpp:211
Definition: rx-scheduler.hpp:426
recursion(bool b)
Definition: rx-scheduler.hpp:106
action()
Definition: rx-scheduler.hpp:135
Definition: rx-predef.hpp:43
void reset(bool b=true) const
set whether tail-recursion is allowed
Definition: rx-scheduler.hpp:112
~schedulable()
Definition: rx-scheduler.hpp:507
virtual clock_type::time_point now() const =0
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:169
const worker get_worker() const
Definition: rx-scheduler.hpp:551
schedulable(schedulable scbl, worker q, action a)
inherit lifetimes
Definition: rx-scheduler.hpp:536
tag_schedulable schedulable_tag
Definition: rx-scheduler.hpp:374
Definition: rx-scheduler.hpp:200