RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-scheduler.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 class worker_interface;
16 
17 namespace detail {
18 
19 class action_type;
20 typedef std::shared_ptr<action_type> action_ptr;
21 
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
24 
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
27 
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
30 
31 inline action_ptr shared_empty() {
32  static action_ptr shared_empty = std::make_shared<detail::action_type>();
33  return shared_empty;
34 }
35 
36 }
37 
38 // It is essential to keep virtual function calls out of an inner loop.
39 // To make tail-recursion work efficiently the recursion objects create
40 // a space on the stack inside the virtual function call in the actor that
41 // allows the callback and the scheduler to share stack space that records
42 // the request and the allowance without any virtual calls in the loop.
43 
46 class recursed
47 {
48  bool& isrequested;
49  recursed operator=(const recursed&);
50 public:
51  explicit recursed(bool& r)
52  : isrequested(r)
53  {
54  }
56  inline void operator()() const {
57  isrequested = true;
58  }
59 };
60 
63 class recurse
64 {
65  bool& isallowed;
66  mutable bool isrequested;
67  recursed requestor;
68  recurse operator=(const recurse&);
69 public:
70  explicit recurse(bool& a)
71  : isallowed(a)
72  , isrequested(true)
73  , requestor(isrequested)
74  {
75  }
77  inline bool is_allowed() const {
78  return isallowed;
79  }
81  inline bool is_requested() const {
82  return isrequested;
83  }
85  inline void reset() const {
86  isrequested = false;
87  }
89  inline const recursed& get_recursed() const {
90  return requestor;
91  }
92 };
93 
95 class recursion
96 {
97  mutable bool isallowed;
98  recurse recursor;
99  recursion operator=(const recursion&);
100 public:
102  : isallowed(true)
103  , recursor(isallowed)
104  {
105  }
106  explicit recursion(bool b)
107  : isallowed(b)
108  , recursor(isallowed)
109  {
110  }
112  inline void reset(bool b = true) const {
113  isallowed = b;
114  }
116  inline const recurse& get_recurse() const {
117  return recursor;
118  }
119 };
120 
121 
123 {
125 };
126 
127 class schedulable;
128 
130 class action : public action_base
131 {
132  typedef action this_type;
133  detail::action_ptr inner;
134 public:
136  {
137  }
138  explicit action(detail::action_ptr i)
139  : inner(std::move(i))
140  {
141  }
142 
144  inline static action empty() {
145  return action(detail::shared_empty());
146  }
147 
149  inline void operator()(const schedulable& s, const recurse& r) const;
150 };
151 
153 {
154  typedef std::chrono::steady_clock clock_type;
156 };
157 
159 {
161 };
162 
164  : public std::enable_shared_from_this<worker_interface>
165 {
166  typedef worker_interface this_type;
167 
168 public:
170 
171  virtual ~worker_interface() {}
172 
173  virtual clock_type::time_point now() const = 0;
174 
175  virtual void schedule(const schedulable& scbl) const = 0;
176  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
177 };
178 
179 namespace detail {
180 
181 template<class F>
182 struct is_action_function
183 {
184  struct not_void {};
185  template<class CF>
186  static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
187  template<class CF>
188  static not_void check(...);
189 
190  static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
191 };
192 
193 }
194 
195 class weak_worker;
196 
200 class worker : public worker_base
201 {
202  typedef worker this_type;
203  detail::worker_interface_ptr inner;
204  composite_subscription lifetime;
205  friend bool operator==(const worker&, const worker&);
206  friend class weak_worker;
207 public:
210 
212  {
213  }
214  worker(composite_subscription cs, detail::const_worker_interface_ptr i)
215  : inner(std::const_pointer_cast<worker_interface>(i))
216  , lifetime(std::move(cs))
217  {
218  }
220  : inner(o.inner)
221  , lifetime(std::move(cs))
222  {
223  }
224 
225  inline const composite_subscription& get_subscription() const {
226  return lifetime;
227  }
229  return lifetime;
230  }
231 
232  // composite_subscription
233  //
234  inline bool is_subscribed() const {
235  return lifetime.is_subscribed();
236  }
238  return lifetime.add(std::move(s));
239  }
240  inline void remove(weak_subscription w) const {
241  return lifetime.remove(std::move(w));
242  }
243  inline void clear() const {
244  return lifetime.clear();
245  }
246  inline void unsubscribe() const {
247  return lifetime.unsubscribe();
248  }
249 
250  // worker_interface
251  //
253  inline clock_type::time_point now() const {
254  return inner->now();
255  }
256 
258  inline void schedule(const schedulable& scbl) const {
259  // force rebinding scbl to this worker
260  schedule_rebind(scbl);
261  }
262 
264  inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
265  // force rebinding scbl to this worker
266  schedule_rebind(when, scbl);
267  }
268 
269  // helpers
270  //
271 
273  inline void schedule(clock_type::duration when, const schedulable& scbl) const {
274  // force rebinding scbl to this worker
275  schedule_rebind(now() + when, scbl);
276  }
277 
280  inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
281  // force rebinding scbl to this worker
282  schedule_periodically_rebind(initial, period, scbl);
283  }
284 
287  inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
288  // force rebinding scbl to this worker
289  schedule_periodically_rebind(now() + initial, period, scbl);
290  }
291 
293  template<class Arg0, class... ArgN>
294  auto schedule(Arg0&& a0, ArgN&&... an) const
295  -> typename std::enable_if<
296  (detail::is_action_function<Arg0>::value ||
299  template<class... ArgN>
301  void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
302 
304  template<class Arg0, class... ArgN>
305  auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
306  -> typename std::enable_if<
307  (detail::is_action_function<Arg0>::value ||
311  template<class... ArgN>
312  void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
313 
315  template<class Arg0, class... ArgN>
316  auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
317  -> typename std::enable_if<
318  (detail::is_action_function<Arg0>::value ||
322  template<class... ArgN>
323  void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
324 };
325 
326 inline bool operator==(const worker& lhs, const worker& rhs) {
327  return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
328 }
329 inline bool operator!=(const worker& lhs, const worker& rhs) {
330  return !(lhs == rhs);
331 }
332 
334 {
335  detail::worker_interface_weak_ptr inner;
336  composite_subscription lifetime;
337 
338 public:
340  {
341  }
342  explicit weak_worker(worker& owner)
343  : inner(owner.inner)
344  , lifetime(owner.lifetime)
345  {
346  }
347 
348  worker lock() const {
349  return worker(lifetime, inner.lock());
350  }
351 };
352 
354  : public std::enable_shared_from_this<scheduler_interface>
355 {
357 
358 public:
360 
361  virtual ~scheduler_interface() {}
362 
363  virtual clock_type::time_point now() const = 0;
364 
365  virtual worker create_worker(composite_subscription cs) const = 0;
366 };
367 
368 
370  // public subscription_base, <- already in worker base
371  public worker_base,
372  public action_base
373 {
375 };
376 
383 class scheduler : public scheduler_base
384 {
385  typedef scheduler this_type;
386  detail::scheduler_interface_ptr inner;
387  friend bool operator==(const scheduler&, const scheduler&);
388 public:
390 
392  {
393  }
394  explicit scheduler(detail::scheduler_interface_ptr i)
395  : inner(std::move(i))
396  {
397  }
398  explicit scheduler(detail::const_scheduler_interface_ptr i)
399  : inner(std::const_pointer_cast<scheduler_interface>(i))
400  {
401  }
402 
404  inline clock_type::time_point now() const {
405  return inner->now();
406  }
413  return inner->create_worker(cs);
414  }
415 };
416 
417 template<class Scheduler, class... ArgN>
418 inline scheduler make_scheduler(ArgN&&... an) {
419  return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
420 }
421 
422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
423  return scheduler(si);
424 }
425 
427 {
428  typedef schedulable this_type;
429 
430  composite_subscription lifetime;
431  weak_worker controller;
432  action activity;
433  bool scoped;
435 
436  struct detacher
437  {
438  ~detacher()
439  {
440  if (that) {
441  that->unsubscribe();
442  }
443  }
444  detacher(const this_type* that)
445  : that(that)
446  {
447  }
448  const this_type* that;
449  };
450 
451  class recursed_scope_type
452  {
453  mutable const recursed* requestor;
454 
455  class exit_recursed_scope_type
456  {
457  const recursed_scope_type* that;
458  public:
459  ~exit_recursed_scope_type()
460  {
461  if (that != nullptr) {
462  that->requestor = nullptr;
463  }
464  }
465  exit_recursed_scope_type(const recursed_scope_type* that)
466  : that(that)
467  {
468  }
469  exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
470  : that(other.that)
471  {
472  other.that = nullptr;
473  }
474  };
475  public:
476  recursed_scope_type()
477  : requestor(nullptr)
478  {
479  }
480  recursed_scope_type(const recursed_scope_type&)
481  : requestor(nullptr)
482  {
483  // does not aquire recursion scope
484  }
485  recursed_scope_type& operator=(const recursed_scope_type& )
486  {
487  // no change in recursion scope
488  return *this;
489  }
490  exit_recursed_scope_type reset(const recurse& r) const {
491  requestor = std::addressof(r.get_recursed());
492  return exit_recursed_scope_type(this);
493  }
494  bool is_recursed() const {
495  return !!requestor;
496  }
497  void operator()() const {
498  (*requestor)();
499  }
500  };
501  recursed_scope_type recursed_scope;
502 
503 public:
506 
508  {
509  if (scoped) {
510  controller.lock().remove(action_scope);
511  }
512  }
514  : scoped(false)
515  {
516  }
517 
520  : lifetime(q.get_subscription())
521  , controller(q)
522  , activity(std::move(a))
523  , scoped(false)
524  {
525  }
528  : lifetime(std::move(cs))
529  , controller(q)
530  , activity(std::move(a))
531  , scoped(true)
532  , action_scope(controller.lock().add(lifetime))
533  {
534  }
537  : lifetime(scbl.get_subscription())
538  , controller(q)
539  , activity(std::move(a))
540  , scoped(scbl.scoped)
541  , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
542  {
543  }
544 
545  inline const composite_subscription& get_subscription() const {
546  return lifetime;
547  }
549  return lifetime;
550  }
551  inline const worker get_worker() const {
552  return controller.lock();
553  }
554  inline worker get_worker() {
555  return controller.lock();
556  }
557  inline const action& get_action() const {
558  return activity;
559  }
560  inline action& get_action() {
561  return activity;
562  }
563 
564  inline static schedulable empty(worker sc) {
566  }
567 
568  inline auto set_recursed(const recurse& r) const
569  -> decltype(recursed_scope.reset(r)) {
570  return recursed_scope.reset(r);
571  }
572 
573  // recursed
574  //
575  bool is_recursed() const {
576  return recursed_scope.is_recursed();
577  }
586  inline void operator()() const {
587  recursed_scope();
588  }
589 
590  // composite_subscription
591  //
592  inline bool is_subscribed() const {
593  return lifetime.is_subscribed();
594  }
596  return lifetime.add(std::move(s));
597  }
598  template<class F>
599  auto add(F f) const
600  -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
601  return lifetime.add(make_subscription(std::move(f)));
602  }
603  inline void remove(weak_subscription w) const {
604  return lifetime.remove(std::move(w));
605  }
606  inline void clear() const {
607  return lifetime.clear();
608  }
609  inline void unsubscribe() const {
610  return lifetime.unsubscribe();
611  }
612 
613  // scheduler
614  //
615  inline clock_type::time_point now() const {
616  return controller.lock().now();
617  }
619  inline void schedule() const {
620  if (is_subscribed()) {
621  get_worker().schedule(*this);
622  }
623  }
625  inline void schedule(clock_type::time_point when) const {
626  if (is_subscribed()) {
627  get_worker().schedule(when, *this);
628  }
629  }
631  inline void schedule(clock_type::duration when) const {
632  if (is_subscribed()) {
633  get_worker().schedule(when, *this);
634  }
635  }
636 
637  // action
638  //
640  inline void operator()(const recurse& r) const {
641  if (!is_subscribed()) {
642  return;
643  }
644  detacher protect(this);
645  activity(*this, r);
646  protect.that = nullptr;
647  }
648 };
649 
650 struct current_thread;
651 
652 namespace detail {
653 
654 class action_type
655  : public std::enable_shared_from_this<action_type>
656 {
657  typedef action_type this_type;
658 
659 public:
660  typedef std::function<void(const schedulable&, const recurse&)> function_type;
661 
662 private:
663  function_type f;
664 
665 public:
666  action_type()
667  {
668  }
669 
670  action_type(function_type f)
671  : f(std::move(f))
672  {
673  }
674 
675  inline void operator()(const schedulable& s, const recurse& r) {
676  if (!f) {
677  std::terminate();
678  }
679  f(s, r);
680  }
681 };
682 
683 class action_tailrecurser
684  : public std::enable_shared_from_this<action_type>
685 {
686  typedef action_type this_type;
687 
688 public:
689  typedef std::function<void(const schedulable&)> function_type;
690 
691 private:
692  function_type f;
693 
694 public:
695  action_tailrecurser()
696  {
697  }
698 
699  action_tailrecurser(function_type f)
700  : f(std::move(f))
701  {
702  }
703 
704  inline void operator()(const schedulable& s, const recurse& r) {
705  if (!f) {
706  std::terminate();
707  }
708  trace_activity().action_enter(s);
709  auto scope = s.set_recursed(r);
710  while (s.is_subscribed()) {
711  r.reset();
712  f(s);
713  if (!r.is_allowed() || !r.is_requested()) {
714  if (r.is_requested()) {
715  s.schedule();
716  }
717  break;
718  }
719  trace_activity().action_recurse(s);
720  }
721  trace_activity().action_return(s);
722  }
723 };
724 }
725 
726 inline void action::operator()(const schedulable& s, const recurse& r) const {
727  (*inner)(s, r);
728 }
729 
731  return action::empty();
732 }
733 
734 template<class F>
735 inline action make_action(F&& f) {
736  static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
737  auto fn = std::forward<F>(f);
738  return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
739 }
740 
741 // copy
742 inline auto make_schedulable(
743  const schedulable& scbl)
744  -> schedulable {
745  return schedulable(scbl);
746 }
747 // move
748 inline auto make_schedulable(
749  schedulable&& scbl)
750  -> schedulable {
751  return schedulable(std::move(scbl));
752 }
753 
755  return schedulable(sc, a);
756 }
758  return schedulable(cs, sc, a);
759 }
760 
761 template<class F>
762 auto make_schedulable(worker sc, F&& f)
763  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
764  return schedulable(sc, make_action(std::forward<F>(f)));
765 }
766 template<class F>
768  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
769  return schedulable(cs, sc, make_action(std::forward<F>(f)));
770 }
771 template<class F>
773  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
774  return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
775 }
776 template<class F>
777 auto make_schedulable(schedulable scbl, worker sc, F&& f)
778  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
779  return schedulable(scbl, sc, make_action(std::forward<F>(f)));
780 }
781 template<class F>
782 auto make_schedulable(schedulable scbl, F&& f)
783  -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
784  return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
785 }
786 
788  -> schedulable {
789  return schedulable(cs, scbl.get_worker(), scbl.get_action());
790 }
792  -> schedulable {
793  return schedulable(cs, sc, scbl.get_action());
794 }
795 inline auto make_schedulable(schedulable scbl, worker sc)
796  -> schedulable {
797  return schedulable(scbl, sc, scbl.get_action());
798 }
799 
800 template<class Arg0, class... ArgN>
801 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
802  -> typename std::enable_if<
803  (detail::is_action_function<Arg0>::value ||
806  auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
807  trace_activity().schedule_enter(*inner.get(), scbl);
808  inner->schedule(std::move(scbl));
809  trace_activity().schedule_return(*inner.get());
810 }
811 template<class... ArgN>
812 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
813  auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
814  trace_activity().schedule_enter(*inner.get(), rescbl);
815  inner->schedule(std::move(rescbl));
816  trace_activity().schedule_return(*inner.get());
817 }
818 
819 template<class Arg0, class... ArgN>
820 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
821  -> typename std::enable_if<
822  (detail::is_action_function<Arg0>::value ||
825  auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
826  trace_activity().schedule_when_enter(*inner.get(), when, scbl);
827  inner->schedule(when, std::move(scbl));
828  trace_activity().schedule_when_return(*inner.get());
829 }
830 template<class... ArgN>
831 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
832  auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
833  trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
834  inner->schedule(when, std::move(rescbl));
835  trace_activity().schedule_when_return(*inner.get());
836 }
837 
838 template<class Arg0, class... ArgN>
839 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
840  -> typename std::enable_if<
841  (detail::is_action_function<Arg0>::value ||
844  schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
845 }
846 template<class... ArgN>
847 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
848  auto keepAlive = *this;
849  auto target = std::make_shared<clock_type::time_point>(initial);
850  auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
851  auto periodic = make_schedulable(
852  activity,
853  [keepAlive, target, period, activity](schedulable self) {
854  // any recursion requests will be pushed to the scheduler queue
855  recursion r(false);
856  // call action
857  activity(r.get_recurse());
858 
859  // schedule next occurance (if the action took longer than 'period' target will be in the past)
860  *target += period;
861  self.schedule(*target);
862  });
863  trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
864  inner->schedule(*target, periodic);
865  trace_activity().schedule_when_return(*inner.get());
866 }
867 
868 namespace detail {
869 
870 template<class TimePoint>
871 struct time_schedulable
872 {
873  typedef TimePoint time_point_type;
874 
875  time_schedulable(TimePoint when, schedulable a)
876  : when(when)
877  , what(std::move(a))
878  {
879  }
880  TimePoint when;
881  schedulable what;
882 };
883 
884 
885 // Sorts time_schedulable items in priority order sorted
886 // on value of time_schedulable.when. Items with equal
887 // values for when are sorted in fifo order.
888 template<class TimePoint>
889 class schedulable_queue {
890 public:
891  typedef time_schedulable<TimePoint> item_type;
892  typedef std::pair<item_type, int64_t> elem_type;
893  typedef std::vector<elem_type> container_type;
894  typedef const item_type& const_reference;
895 
896 private:
897  struct compare_elem
898  {
899  bool operator()(const elem_type& lhs, const elem_type& rhs) const {
900  if (lhs.first.when == rhs.first.when) {
901  return lhs.second > rhs.second;
902  }
903  else {
904  return lhs.first.when > rhs.first.when;
905  }
906  }
907  };
908 
909  typedef std::priority_queue<
910  elem_type,
911  container_type,
912  compare_elem
913  > queue_type;
914 
915  queue_type q;
916 
917  int64_t ordinal;
918 public:
919 
920  schedulable_queue()
921  : ordinal(0)
922  {
923  }
924 
925  const_reference top() const {
926  return q.top().first;
927  }
928 
929  void pop() {
930  q.pop();
931  }
932 
933  bool empty() const {
934  return q.empty();
935  }
936 
937  void push(const item_type& value) {
938  q.push(elem_type(value, ordinal++));
939  }
940 
941  void push(item_type&& value) {
942  q.push(elem_type(std::move(value), ordinal++));
943  }
944 };
945 
946 }
947 
948 }
949 namespace rxsc=schedulers;
950 
951 }
952 
954 #include "schedulers/rx-runloop.hpp"
960 
961 #endif
tag_action action_tag
Definition: rx-scheduler.hpp:124
void schedule_rebind(const schedulable &scbl, ArgN &&... an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:812
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:209
Definition: rx-scheduler.hpp:163
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:192
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
void schedule(clock_type::time_point when) const
put this on the queue of the stored scheduler to run at the specified time
Definition: rx-scheduler.hpp:625
virtual ~scheduler_interface()
Definition: rx-scheduler.hpp:361
Definition: rx-all.hpp:26
bool is_recursed() const
Definition: rx-scheduler.hpp:575
void operator()() const
request to be rescheduled
Definition: rx-scheduler.hpp:56
void clear() const
Definition: rx-scheduler.hpp:606
friend bool operator==(const worker &, const worker &)
Definition: rx-scheduler.hpp:326
worker(composite_subscription cs, detail::const_worker_interface_ptr i)
Definition: rx-scheduler.hpp:214
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:545
schedulable(composite_subscription cs, worker q, action a)
action and worker have independent lifetimes
Definition: rx-scheduler.hpp:527
const recurse & get_recurse() const
get the recurse to pass into each action being called
Definition: rx-scheduler.hpp:116
tag_scheduler scheduler_tag
Definition: rx-scheduler.hpp:155
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
void unsubscribe() const
Definition: rx-scheduler.hpp:609
Definition: rx-scheduler.hpp:152
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
auto make_schedulable(const schedulable &scbl) -> schedulable
Definition: rx-scheduler.hpp:742
action(detail::action_ptr i)
Definition: rx-scheduler.hpp:138
#define RXCPP_NOEXCEPT
Definition: rx-includes.hpp:157
virtual worker create_worker(composite_subscription cs) const =0
Definition: rx-scheduler.hpp:158
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:505
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:219
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:465
action make_action(F &&f)
Definition: rx-scheduler.hpp:735
auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-empty.hpp:37
void operator()(const schedulable &s, const recurse &r) const
call the function
Definition: rx-scheduler.hpp:726
bool operator!=(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:329
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:228
Definition: rx-scheduler.hpp:46
virtual void schedule(const schedulable &scbl) const =0
composite_subscription::weak_subscription weak_subscription
Definition: rx-scheduler.hpp:504
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
virtual ~worker_interface()
Definition: rx-scheduler.hpp:171
worker(composite_subscription cs, worker o)
Definition: rx-scheduler.hpp:219
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:595
void schedule(clock_type::time_point when, const schedulable &scbl) const
insert the supplied schedulable to be run at the time specified
Definition: rx-scheduler.hpp:264
void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:280
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
clock_type::time_point now() const
Definition: rx-scheduler.hpp:615
action & get_action()
Definition: rx-scheduler.hpp:560
weak_worker()
Definition: rx-scheduler.hpp:339
Definition: rx-subscription.hpp:31
action provides type-forgetting for a potentially recursive set of calls to a function that takes a s...
Definition: rx-scheduler.hpp:130
const action & get_action() const
Definition: rx-scheduler.hpp:557
Definition: rx-predef.hpp:21
scheduler make_scheduler(ArgN &&... an)
Definition: rx-scheduler.hpp:418
void operator()(const recurse &r) const
invokes the action
Definition: rx-scheduler.hpp:640
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
Definition: rx-predef.hpp:30
Definition: rx-scheduler.hpp:122
static action empty()
return the empty action
Definition: rx-scheduler.hpp:144
Definition: rx-predef.hpp:58
static composite_subscription empty()
Definition: rx-subscription.hpp:499
void reset() const
reset the function request. call before each call to the function.
Definition: rx-scheduler.hpp:85
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:208
void unsubscribe() const
Definition: rx-scheduler.hpp:246
scheduler(detail::scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:394
const composite_subscription & get_subscription() const
Definition: rx-scheduler.hpp:225
scheduler()
Definition: rx-scheduler.hpp:391
weak_worker(worker &owner)
Definition: rx-scheduler.hpp:342
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:359
clock_type::time_point now() const
return the current time for this worker
Definition: rx-scheduler.hpp:253
void schedule(clock_type::duration when, const schedulable &scbl) const
insert the supplied schedulable to be run at now() + the delay specified
Definition: rx-scheduler.hpp:273
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable &scbl) const
Definition: rx-scheduler.hpp:287
std::chrono::steady_clock clock_type
Definition: rx-scheduler.hpp:154
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
action make_action_empty()
Definition: rx-scheduler.hpp:730
friend bool operator==(const scheduler &, const scheduler &)
virtual clock_type::time_point now() const =0
auto set_recursed(const recurse &r) const -> decltype(recursed_scope.reset(r))
Definition: rx-scheduler.hpp:568
bool is_allowed() const
does the scheduler allow tail-recursion now?
Definition: rx-scheduler.hpp:77
recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
Definition: rx-scheduler.hpp:95
recurse(bool &a)
Definition: rx-scheduler.hpp:70
void operator()() const
Definition: rx-scheduler.hpp:586
void clear() const
Definition: rx-scheduler.hpp:243
schedulable()
Definition: rx-scheduler.hpp:513
recursion()
Definition: rx-scheduler.hpp:101
tag_worker worker_tag
Definition: rx-scheduler.hpp:160
Definition: rx-scheduler.hpp:369
bool is_requested() const
did the function request to be recursed?
Definition: rx-scheduler.hpp:81
Definition: rx-scheduler.hpp:63
bool is_subscribed() const
Definition: rx-subscription.hpp:172
bool operator==(const worker &lhs, const worker &rhs)
Definition: rx-scheduler.hpp:326
auto add(F f) const -> typename std::enable_if< rxcpp::detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-scheduler.hpp:599
Definition: rx-scheduler.hpp:353
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:389
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:603
composite_subscription & get_subscription()
Definition: rx-scheduler.hpp:548
Definition: rx-predef.hpp:56
bool is_subscribed() const
Definition: rx-scheduler.hpp:234
void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable &scbl, ArgN &&... an) const
use the supplied arguments to make a schedulable and then insert it to be run
Definition: rx-scheduler.hpp:847
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
Definition: rx-scheduler.hpp:333
void unsubscribe() const
Definition: rx-subscription.hpp:178
Definition: rx-subscription.hpp:29
scheduler(detail::const_scheduler_interface_ptr i)
Definition: rx-scheduler.hpp:398
recursed(bool &r)
Definition: rx-scheduler.hpp:51
worker get_worker()
Definition: rx-scheduler.hpp:554
void schedule() const
put this on the queue of the stored scheduler to run asap
Definition: rx-scheduler.hpp:619
Definition: rx-subscription.hpp:67
worker lock() const
Definition: rx-scheduler.hpp:348
static schedulable empty(worker sc)
Definition: rx-scheduler.hpp:564
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:544
schedulable(worker q, action a)
action and worker share lifetime
Definition: rx-scheduler.hpp:519
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:527
const recursed & get_recursed() const
get the recursed to set into the schedulable for the function to use to request recursion
Definition: rx-scheduler.hpp:89
void schedule(clock_type::duration when) const
put this on the queue of the stored scheduler to run after a delay from now
Definition: rx-scheduler.hpp:631
worker()
Definition: rx-scheduler.hpp:211
Definition: rx-scheduler.hpp:426
recursion(bool b)
Definition: rx-scheduler.hpp:106
action()
Definition: rx-scheduler.hpp:135
Definition: rx-predef.hpp:43
void reset(bool b=true) const
set whether tail-recursion is allowed
Definition: rx-scheduler.hpp:112
~schedulable()
Definition: rx-scheduler.hpp:507
virtual clock_type::time_point now() const =0
scheduler_base::clock_type clock_type
Definition: rx-scheduler.hpp:169
const worker get_worker() const
Definition: rx-scheduler.hpp:551
schedulable(schedulable scbl, worker q, action a)
inherit lifetimes
Definition: rx-scheduler.hpp:536
tag_schedulable schedulable_tag
Definition: rx-scheduler.hpp:374
Definition: rx-scheduler.hpp:200