5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP) 6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
26 typedef detail::schedulable_queue<
27 typename clock_type::time_point> queue_item_time;
29 typedef queue_item_time::item_type item_type;
33 std::shared_ptr<const scheduler_interface> alive;
36 virtual ~loop_worker()
44 auto token = controller.
add(cs);
50 virtual clock_type::time_point
now()
const {
51 return clock_type::now();
54 virtual void schedule(
const schedulable& scbl)
const {
58 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
65 mutable std::atomic<std::size_t> count;
67 std::vector<worker> loops;
71 : factory([](std::function<void()> start){
72 return std::thread(std::move(start));
77 auto remaining =
std::max(std::thread::hardware_concurrency(),
unsigned(4));
87 auto remaining =
std::max(std::thread::hardware_concurrency(),
unsigned(4));
97 virtual clock_type::time_point
now()
const {
98 return clock_type::now();
102 return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
107 static scheduler instance = make_scheduler<event_loop>();
111 return make_scheduler<event_loop>(tf);
Definition: rx-scheduler.hpp:163
void remove(weak_subscription w) const
Definition: rx-scheduler.hpp:240
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-eventloop.hpp:97
virtual ~event_loop()
Definition: rx-eventloop.hpp:92
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
weak_subscription add(subscription s) const
Definition: rx-scheduler.hpp:237
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:508
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412
Definition: rx-eventloop.hpp:14
const action & get_action() const
Definition: rx-scheduler.hpp:557
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
scheduler make_event_loop()
Definition: rx-eventloop.hpp:106
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
virtual worker create_worker(composite_subscription cs) const
Definition: rx-eventloop.hpp:101
void schedule(const schedulable &scbl) const
insert the supplied schedulable to be run as soon as possible
Definition: rx-scheduler.hpp:258
Definition: rx-scheduler.hpp:353
void unsubscribe() const
Definition: rx-subscription.hpp:178
event_loop()
Definition: rx-eventloop.hpp:70
event_loop(thread_factory tf)
Definition: rx-eventloop.hpp:82
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200