5 #if !defined(RXCPP_RX_OBSERVABLE_HPP) 6 #define RXCPP_RX_OBSERVABLE_HPP 11 #define EXPLICIT_THIS this-> 20 template<
class Subscriber,
class T>
21 struct has_on_subscribe_for
24 template<
class CS,
class CT>
25 static auto check(
int) -> decltype((*(CT*)
nullptr).on_subscribe(*(CS*)
nullptr));
26 template<
class CS,
class CT>
27 static not_void check(...);
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
40 :
public std::enable_shared_from_this<state_type>
46 std::shared_ptr<state_type> state;
55 so.on_subscribe(std::move(o));
59 struct tag_function {};
61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
75 : state(std::make_shared<state_type>())
77 construct(std::forward<SOF>(sof),
82 state->on_subscribe(std::move(o));
85 template<
class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value,
void>::type
88 state->on_subscribe(o.as_dynamic());
94 return lhs.state == rhs.state;
101 template<
class T,
class Source>
107 template<
bool Selector,
class Default,
class SO>
108 struct resolve_observable;
110 template<
class Default,
class SO>
111 struct resolve_observable<true, Default, SO>
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value =
true;
116 typedef observable<value_type, type> observable_type;
117 template<
class...
AN>
118 static observable_type make(
const Default&,
AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
122 template<
class Default,
class SO>
123 struct resolve_observable<false, Default, SO>
125 static const bool value =
false;
126 typedef Default observable_type;
127 template<
class...
AN>
128 static observable_type make(
const observable_type& that,
const AN&...) {
133 struct resolve_observable<true, void, SO>
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value =
true;
138 typedef observable<value_type, type> observable_type;
139 template<
class...
AN>
140 static observable_type make(
AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
145 struct resolve_observable<false, void, SO>
147 static const bool value =
false;
148 typedef void observable_type;
149 template<
class...
AN>
150 static observable_type make(
const AN&...) {
156 template<
class Selector,
class Default,
template<
class... TN>
class SO, class...
AN>
158 :
public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
168 template<
class T,
class Observable>
171 template<
class Obsvbl,
class... ArgN>
172 static auto blocking_subscribe(
const Obsvbl&
source,
bool do_rethrow, ArgN&&... an)
175 std::condition_variable wake;
176 bool disposed =
false;
179 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
182 auto scbr = make_subscriber<T>(
184 [&](T t){dest.on_next(t);},
192 [&](){dest.on_completed();}
195 auto cs = scbr.get_subscription();
198 std::unique_lock<std::mutex> guard(lock);
203 source.subscribe(std::move(scbr));
205 std::unique_lock<std::mutex> guard(lock);
237 template<
class... ArgN>
240 return blocking_subscribe(
source,
false, std::forward<ArgN>(an)...);
262 template<
class... ArgN>
265 return blocking_subscribe(
source,
true, std::forward<ArgN>(an)...);
283 template<
class...
AN>
285 rxu::maybe<T> result;
293 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
311 template<
class...
AN>
313 rxu::maybe<T> result;
315 [&](T v){result.reset(v);});
319 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
336 source.count().as_blocking().subscribe_with_rethrow(
337 [&](
int v){result = v;});
359 return source.sum().as_blocking().last();
380 return source.average().as_blocking().last();
401 return source.max().as_blocking().last();
422 return source.min().as_blocking().last();
428 template<
class SourceOperator,
class Subscriber>
429 struct safe_subscriber
431 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
435 so->on_subscribe(*o);
437 if (!o->is_subscribed()) {
445 void operator()(
const rxsc::schedulable&) {
456 class observable<void, void>;
477 template<
class T,
class SourceOperator>
481 static_assert(std::is_same<T, typename SourceOperator::value_type>::value,
"SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
491 template<
class U,
class SO>
494 template<
class U,
class SO>
497 template<
class Subscriber>
498 auto detail_subscribe(Subscriber o)
const 504 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value,
"the value types in the sequence must match or be convertible");
505 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value,
"inner must have on_subscribe method that accepts this subscriber ");
509 if (!o.is_subscribed()) {
511 return o.get_subscription();
517 if (rxsc::current_thread::is_schedule_required()) {
519 sc.create_worker(o.get_subscription()).schedule(
subscriber);
522 subscriber.subscribe();
526 return o.get_subscription();
571 template<
class...
AN>
574 static_assert(
sizeof...(
AN) == 0,
"as_dynamic() was passed too many arguments.");
579 template<
class...
AN>
582 static_assert(
sizeof...(
AN) == 0,
"as_blocking() was passed too many arguments.");
592 template<
class OperatorFactory>
593 auto op(OperatorFactory&& of)
const 594 -> decltype(of(*(
const this_type*)
nullptr)) {
601 template<
class ResultType,
class Operator>
602 auto lift(Operator&& op)
const 603 -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
604 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
605 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
606 static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
"Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
614 template<
class ResultType,
class Operator>
615 auto lift_if(Operator&& op)
const 616 ->
typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
617 observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
618 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
619 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
626 template<
class ResultType,
class Operator>
627 auto lift_if(Operator&&) const
628 -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
629 decltype(rxs::
from<ResultType>())>::type {
630 return rxs::from<ResultType>();
636 template<
class... ArgN>
639 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
644 template<
class...
AN>
655 template<
class...
AN>
666 template<
class...
AN>
677 template<
class...
AN>
688 template<
class...
AN>
699 template<
class...
AN>
710 template<
class...
AN>
721 template<
class...
AN>
732 template<
class...
AN>
743 template<
class...
AN>
754 template<
class...
AN>
765 template<
class...
AN>
776 template<
class...
AN>
787 template<
class...
AN>
788 auto finally(
AN&&... an)
const 798 template<
class...
AN>
809 template<
class...
AN>
820 template<
class...
AN>
831 template<
class...
AN>
842 template<
class...
AN>
853 template<
class...
AN>
864 template<
class...
AN>
875 template<
class...
AN>
886 template<
class...
AN>
897 template<
class...
AN>
908 template<
class...
AN>
919 template<
class...
AN>
930 template<
class...
AN>
941 template<
class...
AN>
952 template<
class...
AN>
963 template<
class...
AN>
974 template<
class...
AN>
985 template<
class...
AN>
996 template<
class...
AN>
1007 template<
class...
AN>
1018 template<
class...
AN>
1029 template<
class...
AN>
1040 template<
class...
AN>
1051 template<
class...
AN>
1062 template<
class...
AN>
1073 template<
class...
AN>
1085 template<
class...
AN>
1096 template<
class...
AN>
1107 template<
class...
AN>
1118 template<
class...
AN>
1129 template<
class...
AN>
1140 template<
class...
AN>
1151 template<
class...
AN>
1162 template<
class...
AN>
1173 template<
class...
AN>
1184 template<
class...
AN>
1195 template<
class...
AN>
1206 template<
class...
AN>
1217 template<
class...
AN>
1224 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
1229 template<
class...
AN>
1236 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
1241 template<
class...
AN>
1248 static_assert(
sizeof...(
AN) == 0,
"count() was passed too many arguments.");
1253 template<
class...
AN>
1260 static_assert(
sizeof...(
AN) == 0,
"sum() was passed too many arguments.");
1265 template<
class...
AN>
1272 static_assert(
sizeof...(
AN) == 0,
"average() was passed too many arguments.");
1277 template<
class...
AN>
1284 static_assert(
sizeof...(
AN) == 0,
"max() was passed too many arguments.");
1289 template<
class...
AN>
1296 static_assert(
sizeof...(
AN) == 0,
"min() was passed too many arguments.");
1301 template<
class...
AN>
1312 template<
class...
AN>
1323 template<
class...
AN>
1334 template<
class...
AN>
1345 template<
class...
AN>
1356 template<
class...
AN>
1367 template<
class...
AN>
1378 template<
class...
AN>
1389 template<
class...
AN>
1400 template<
class...
AN>
1411 template<
class...
AN>
1422 template<
class...
AN>
1433 template<
class...
AN>
1444 template<
class...
AN>
1454 template<
class T,
class SourceOperator>
1458 template<
class T,
class SourceOperator>
1460 return !(lhs == rhs);
1552 template<
class T,
class OnSubscribe>
1554 -> decltype(rxs::create<T>(std::move(os))) {
1555 return rxs::create<T>(std::move(os));
1567 template<
class T,
class Coordination>
1569 -> decltype(rxs::range<T>(
first,
last, step, std::move(cn))) {
1570 return rxs::range<T>(
first,
last, step, std::move(cn));
1574 template<
class T,
class Coordination>
1576 -> decltype(rxs::range<T>(
first,
last, std::move(cn))) {
1577 return rxs::range<T>(
first,
last, std::move(cn));
1581 template<
class T,
class Coordination>
1583 -> decltype(rxs::range<T>(
first, std::move(cn))) {
1584 return rxs::range<T>(
first, std::move(cn));
1591 -> decltype(rxs::never<T>()) {
1592 return rxs::never<T>();
1597 template<
class ObservableFactory>
1605 template<
class...
AN>
1606 static auto interval(rxsc::scheduler::clock_type::duration period,
AN**...)
1609 static_assert(
sizeof...(
AN) == 0,
"interval(period) was passed too many arguments.");
1613 template<
class Coordination>
1614 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1620 template<
class...
AN>
1621 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period,
AN**...)
1624 static_assert(
sizeof...(
AN) == 0,
"interval(initial, period) was passed too many arguments.");
1628 template<
class Coordination>
1629 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1630 -> decltype(
rxs::interval(initial, period, std::move(cn))) {
1636 template<
class...
AN>
1637 static auto timer(rxsc::scheduler::clock_type::time_point at,
AN**...)
1640 static_assert(
sizeof...(
AN) == 0,
"timer(at) was passed too many arguments.");
1644 template<
class...
AN>
1645 static auto timer(rxsc::scheduler::clock_type::duration after,
AN**...)
1648 static_assert(
sizeof...(
AN) == 0,
"timer(after) was passed too many arguments.");
1652 template<
class Coordination>
1653 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1654 -> decltype(
rxs::timer(when, std::move(cn))) {
1659 template<
class Coordination>
1660 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1661 -> decltype(
rxs::timer(when, std::move(cn))) {
1667 template<
class Collection>
1674 template<
class Collection,
class Coordination>
1675 static auto iterate(Collection c, Coordination cn)
1676 -> decltype(
rxs::iterate(std::move(c), std::move(cn))) {
1684 -> decltype( rxs::from<T>()) {
1685 return rxs::from<T>();
1689 template<
class T,
class Coordination>
1691 ->
typename std::enable_if<is_coordination<Coordination>::value,
1692 decltype( rxs::from<T>(std::move(cn)))>::type {
1693 return rxs::from<T>(std::move(cn));
1697 template<
class Value0,
class... ValueN>
1698 static auto from(Value0 v0, ValueN... vn)
1699 ->
typename std::enable_if<!is_coordination<Value0>::value,
1700 decltype(
rxs::from(v0, vn...))>::type {
1705 template<
class Coordination,
class Value0,
class... ValueN>
1706 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1707 ->
typename std::enable_if<is_coordination<Coordination>::value,
1708 decltype(
rxs::from(std::move(cn), v0, vn...))>::type {
1709 return rxs::from(std::move(cn), v0, vn...);
1721 template<
class T,
class Coordination>
1722 static auto just(T v, Coordination cn)
1723 -> decltype(
rxs::just(std::move(v), std::move(cn))) {
1724 return rxs::just(std::move(v), std::move(cn));
1729 template<
class Observable,
class Value0,
class... ValueN>
1731 -> decltype(
rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1732 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1739 -> decltype(from<T>()) {
1744 template<
class T,
class Coordination>
1746 -> decltype(from<T>(std::move(cn))) {
1747 return from<T>(std::move(cn));
1752 template<
class T,
class Exception>
1754 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1755 return rxs::error<T>(std::forward<Exception>(e));
1759 template<
class T,
class Exception,
class Coordination>
1760 static auto error(Exception&& e, Coordination cn)
1761 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1762 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1767 template<
class ResourceFactory,
class ObservableFactory>
1768 static auto scope(ResourceFactory rf, ObservableFactory of)
1769 -> decltype(
rxs::scope(std::move(rf), std::move(of))) {
1770 return rxs::scope(std::move(rf), std::move(of));
1780 template<
class T,
class SourceOperator,
class OperatorFactory>
1782 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1783 return source.op(std::forward<OperatorFactory>(of));
1790 template<
class T,
class SourceOperator,
class OperatorFactory>
1792 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793 return source.op(std::forward<OperatorFactory>(of));
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:876
Definition: rx-operators.hpp:126
#define RXCPP_TRY
Definition: rx-util.hpp:38
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay.
Definition: rx-observable.hpp:854
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1346
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1074
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:755
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-error.hpp:117
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1063
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-observable.hpp:1290
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1030
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1423
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1357
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it.
Definition: rx-observable.hpp:1553
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1324
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1575
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:887
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:553
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:667
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1196
source_operator_type source_operator
Definition: rx-observable.hpp:487
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1690
RXCPP_NORETURN void rethrow_current_exception()
Definition: rx-util.hpp:933
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:766
T max() const
Definition: rx-observable.hpp:400
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1401
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1781
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:215
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:898
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
Definition: rx-observable.hpp:1152
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:637
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1086
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1675
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:542
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1668
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:530
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:744
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:238
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1097
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1302
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:263
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1434
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:486
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1606
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:678
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-observable.hpp:1278
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable.
Definition: rx-observable.hpp:711
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:700
~blocking_observable()
Definition: rx-observable.hpp:217
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1715
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1698
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1598
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item,...
Definition: rx-observable.hpp:689
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1730
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:416
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:843
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1130
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1108
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1637
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1379
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1653
~observable()
Definition: rx-observable.hpp:534
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:821
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1313
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1445
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot,...
Definition: rx-observable.hpp:1163
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:733
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
Definition: rx-observable.hpp:997
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1683
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void ** >::type=0)
Definition: rx-observable.hpp:74
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1412
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned.
Definition: rx-observable.hpp:1368
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1174
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:865
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1335
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-observable.hpp:1760
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1561
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1738
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables,...
Definition: rx-observable.hpp:986
error_ptr current_exception()
Definition: rx-util.hpp:943
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:942
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1582
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1621
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-observable.hpp:1753
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1629
observable_type source
Definition: rx-observable.hpp:216
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:964
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:810
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:975
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1019
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:572
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:909
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:645
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1008
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:920
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1660
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1390
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:832
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:334
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:178
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1119
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1706
blocking_observable(observable_type s)
Definition: rx-observable.hpp:220
T sum() const
Definition: rx-observable.hpp:358
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:777
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false.
Definition: rx-observable.hpp:656
T min() const
Definition: rx-observable.hpp:421
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:953
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1768
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes.
Definition: rx-observable.hpp:722
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1614
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1568
double average() const
Definition: rx-observable.hpp:379
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:284
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1722
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1185
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1207
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:558
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:931
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:799
observable()
Definition: rx-observable.hpp:538
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:312
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1590
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received....
Definition: rx-observable.hpp:1041
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1052
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions....
Definition: rx-observable.hpp:1141
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1745
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:546
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1645
Definition: rx-operators.hpp:514