RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23  struct not_void {};
24  template<class CS, class CT>
25  static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26  template<class CS, class CT>
27  static not_void check(...);
28 
29  typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30  static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
37  : public rxs::source_base<T>
38 {
39  struct state_type
40  : public std::enable_shared_from_this<state_type>
41  {
42  typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44  onsubscribe_type on_subscribe;
45  };
46  std::shared_ptr<state_type> state;
47 
48  template<class U>
49  friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51  template<class SO>
52  void construct(SO&& source, rxs::tag_source&&) {
53  rxu::decay_t<SO> so = std::forward<SO>(source);
54  state->on_subscribe = [so](subscriber<T> o) mutable {
55  so.on_subscribe(std::move(o));
56  };
57  }
58 
59  struct tag_function {};
60  template<class F>
61  void construct(F&& f, tag_function&&) {
62  state->on_subscribe = std::forward<F>(f);
63  }
64 
65 public:
66 
68 
70  {
71  }
72 
73  template<class SOF>
74  explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75  : state(std::make_shared<state_type>())
76  {
77  construct(std::forward<SOF>(sof),
78  typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79  }
80 
81  void on_subscribe(subscriber<T> o) const {
82  state->on_subscribe(std::move(o));
83  }
84 
85  template<class Subscriber>
86  typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
87  on_subscribe(Subscriber o) const {
88  state->on_subscribe(o.as_dynamic());
89  }
90 };
91 
92 template<class T>
93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94  return lhs.state == rhs.state;
95 }
96 template<class T>
97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98  return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
103  return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113  typedef typename SO::type type;
114  typedef typename type::value_type value_type;
115  static const bool value = true;
116  typedef observable<value_type, type> observable_type;
117  template<class... AN>
118  static observable_type make(const Default&, AN&&... an) {
119  return observable_type(type(std::forward<AN>(an)...));
120  }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125  static const bool value = false;
126  typedef Default observable_type;
127  template<class... AN>
128  static observable_type make(const observable_type& that, const AN&...) {
129  return that;
130  }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135  typedef typename SO::type type;
136  typedef typename type::value_type value_type;
137  static const bool value = true;
138  typedef observable<value_type, type> observable_type;
139  template<class... AN>
140  static observable_type make(AN&&... an) {
141  return observable_type(type(std::forward<AN>(an)...));
142  }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147  static const bool value = false;
148  typedef void observable_type;
149  template<class... AN>
150  static observable_type make(const AN&...) {
151  }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
158  : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
168 template<class T, class Observable>
170 {
171  template<class Obsvbl, class... ArgN>
172  static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173  -> void {
174  std::mutex lock;
175  std::condition_variable wake;
176  bool disposed = false;
178 
179  auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
180 
181  // keep any error to rethrow at the end.
182  auto scbr = make_subscriber<T>(
183  dest,
184  [&](T t){dest.on_next(t);},
185  [&](rxu::error_ptr e){
186  if (do_rethrow) {
187  error = e;
188  } else {
189  dest.on_error(e);
190  }
191  },
192  [&](){dest.on_completed();}
193  );
194 
195  auto cs = scbr.get_subscription();
196  cs.add(
197  [&](){
198  std::unique_lock<std::mutex> guard(lock);
199  wake.notify_one();
200  disposed = true;
201  });
202 
203  source.subscribe(std::move(scbr));
204 
205  std::unique_lock<std::mutex> guard(lock);
206  wake.wait(guard,
207  [&](){
208  return disposed;
209  });
210 
212  }
213 
214 public:
218  {
219  }
221 
237  template<class... ArgN>
238  auto subscribe(ArgN&&... an) const
239  -> void {
240  return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
241  }
242 
262  template<class... ArgN>
263  auto subscribe_with_rethrow(ArgN&&... an) const
264  -> void {
265  return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
266  }
267 
283  template<class... AN>
284  auto first(AN**...) -> delayed_type_t<T, AN...> const {
285  rxu::maybe<T> result;
288  cs,
289  [&](T v){result.reset(v); cs.unsubscribe();});
290  if (result.empty())
291  rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
292  return result.get();
293  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
294  }
295 
311  template<class... AN>
312  auto last(AN**...) -> delayed_type_t<T, AN...> const {
313  rxu::maybe<T> result;
315  [&](T v){result.reset(v);});
316  if (result.empty())
317  rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
318  return result.get();
319  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
320  }
321 
334  int count() const {
335  int result = 0;
336  source.count().as_blocking().subscribe_with_rethrow(
337  [&](int v){result = v;});
338  return result;
339  }
340 
358  T sum() const {
359  return source.sum().as_blocking().last();
360  }
361 
379  double average() const {
380  return source.average().as_blocking().last();
381  }
382 
400  T max() const {
401  return source.max().as_blocking().last();
402  }
403 
421  T min() const {
422  return source.min().as_blocking().last();
423  }
424 };
425 
426 namespace detail {
427 
428 template<class SourceOperator, class Subscriber>
429 struct safe_subscriber
430 {
431  safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
432 
433  void subscribe() {
434  RXCPP_TRY {
435  so->on_subscribe(*o);
436  } RXCPP_CATCH(...) {
437  if (!o->is_subscribed()) {
439  }
441  o->unsubscribe();
442  }
443  }
444 
445  void operator()(const rxsc::schedulable&) {
446  subscribe();
447  }
448 
449  SourceOperator* so;
450  Subscriber* o;
451 };
452 
453 }
454 
455 template<>
456 class observable<void, void>;
457 
477 template<class T, class SourceOperator>
479  : public observable_base<T>
480 {
481  static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
482 
484 
485 public:
488 
489 private:
490 
491  template<class U, class SO>
492  friend class observable;
493 
494  template<class U, class SO>
495  friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
496 
497  template<class Subscriber>
498  auto detail_subscribe(Subscriber o) const
500 
501  typedef rxu::decay_t<Subscriber> subscriber_type;
502 
503  static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
504  static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
505  static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
506 
507  trace_activity().subscribe_enter(*this, o);
508 
509  if (!o.is_subscribed()) {
510  trace_activity().subscribe_return(*this);
511  return o.get_subscription();
512  }
513 
514  detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
515 
516  // make sure to let current_thread take ownership of the thread as early as possible.
517  if (rxsc::current_thread::is_schedule_required()) {
518  const auto& sc = rxsc::make_current_thread();
519  sc.create_worker(o.get_subscription()).schedule(subscriber);
520  } else {
521  // current_thread already owns this thread.
522  subscriber.subscribe();
523  }
524 
525  trace_activity().subscribe_return(*this);
526  return o.get_subscription();
527  }
528 
529 public:
530  typedef T value_type;
531 
532  static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
533 
535  {
536  }
537 
539  {
540  }
541 
542  explicit observable(const source_operator_type& o)
543  : source_operator(o)
544  {
545  }
547  : source_operator(std::move(o))
548  {
549  }
550 
552  template<class SO>
555  {}
557  template<class SO>
559  : source_operator(std::move(o.source_operator))
560  {}
561 
562 #if 0
563  template<class I>
564  void on_subscribe(observer<T, I> o) const {
565  source_operator.on_subscribe(o);
566  }
567 #endif
568 
571  template<class... AN>
573  return *this;
574  static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
575  }
576 
579  template<class... AN>
581  return blocking_observable<T, this_type>(*this);
582  static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
583  }
584 
586 
592  template<class OperatorFactory>
593  auto op(OperatorFactory&& of) const
594  -> decltype(of(*(const this_type*)nullptr)) {
595  return of(*this);
596  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
597  }
598 
601  template<class ResultType, class Operator>
602  auto lift(Operator&& op) const
603  -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
604  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
605  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
606  static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
607  }
608 
614  template<class ResultType, class Operator>
615  auto lift_if(Operator&& op) const
616  -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
617  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
618  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
619  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
620  }
626  template<class ResultType, class Operator>
627  auto lift_if(Operator&&) const
628  -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
629  decltype(rxs::from<ResultType>())>::type {
630  return rxs::from<ResultType>();
631  }
633 
636  template<class... ArgN>
637  auto subscribe(ArgN&&... an) const
639  return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
640  }
641 
644  template<class... AN>
645  auto all(AN&&... an) const
647  -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
649  {
650  return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
651  }
652 
655  template<class... AN>
656  auto is_empty(AN&&... an) const
658  -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
660  {
661  return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
662  }
663 
666  template<class... AN>
667  auto any(AN&&... an) const
669  -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
671  {
672  return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
673  }
674 
677  template<class... AN>
678  auto exists(AN&&... an) const
680  -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
682  {
683  return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
684  }
685 
688  template<class... AN>
689  auto contains(AN&&... an) const
691  -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
693  {
694  return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
695  }
696 
699  template<class... AN>
700  auto filter(AN&&... an) const
702  -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
704  {
705  return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
706  }
707 
710  template<class... AN>
711  auto switch_if_empty(AN&&... an) const
713  -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
715  {
716  return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
717  }
718 
721  template<class... AN>
722  auto default_if_empty(AN&&... an) const
724  -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
726  {
727  return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
728  }
729 
732  template<class... AN>
733  auto sequence_equal(AN... an) const
735  -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
737  {
738  return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
739  }
740 
743  template<class... AN>
744  auto tap(AN&&... an) const
746  -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
748  {
749  return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
750  }
751 
754  template<class... AN>
755  auto time_interval(AN&&... an) const
757  -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
759  {
760  return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
761  }
762 
765  template<class... AN>
766  auto timeout(AN&&... an) const
768  -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
770  {
771  return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
772  }
773 
776  template<class... AN>
777  auto timestamp(AN&&... an) const
779  -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
781  {
782  return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
783  }
784 
787  template<class... AN>
788  auto finally(AN&&... an) const
790  -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
792  {
793  return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
794  }
795 
798  template<class... AN>
799  auto on_error_resume_next(AN&&... an) const
801  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
803  {
804  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
805  }
806 
809  template<class... AN>
810  auto switch_on_error(AN&&... an) const
812  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
814  {
815  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
816  }
817 
820  template<class... AN>
821  auto map(AN&&... an) const
823  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
825  {
826  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
827  }
828 
831  template<class... AN>
832  auto transform(AN&&... an) const
834  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
836  {
837  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
838  }
839 
842  template<class... AN>
843  auto debounce(AN&&... an) const
845  -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
847  {
848  return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
849  }
850 
853  template<class... AN>
854  auto delay(AN&&... an) const
856  -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
858  {
859  return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
860  }
861 
864  template<class... AN>
865  auto distinct(AN&&... an) const
867  -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
869  {
870  return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
871  }
872 
875  template<class... AN>
876  auto distinct_until_changed(AN&&... an) const
878  -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
880  {
881  return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
882  }
883 
886  template<class... AN>
887  auto element_at(AN&&... an) const
889  -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
891  {
892  return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
893  }
894 
897  template<class... AN>
898  auto window(AN&&... an) const
900  -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
902  {
903  return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
904  }
905 
908  template<class... AN>
909  auto window_with_time(AN&&... an) const
911  -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
913  {
914  return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
915  }
916 
919  template<class... AN>
920  auto window_with_time_or_count(AN&&... an) const
922  -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
924  {
925  return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
926  }
927 
930  template<class... AN>
931  auto window_toggle(AN&&... an) const
933  -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
935  {
936  return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
937  }
938 
941  template<class... AN>
942  auto buffer(AN&&... an) const
944  -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
946  {
947  return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
948  }
949 
952  template<class... AN>
953  auto buffer_with_time(AN&&... an) const
955  -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
957  {
958  return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
959  }
960 
963  template<class... AN>
964  auto buffer_with_time_or_count(AN&&... an) const
966  -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
968  {
969  return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
970  }
971 
974  template<class... AN>
975  auto switch_on_next(AN&&... an) const
977  -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
979  {
980  return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
981  }
982 
985  template<class... AN>
986  auto merge(AN... an) const
988  -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
990  {
991  return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
992  }
993 
996  template<class... AN>
997  auto merge_delay_error(AN... an) const
999  -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1001  {
1002  return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1003  }
1004 
1007  template<class... AN>
1008  auto amb(AN... an) const
1010  -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1012  {
1013  return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1014  }
1015 
1018  template<class... AN>
1019  auto flat_map(AN&&... an) const
1021  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1023  {
1024  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1025  }
1026 
1029  template<class... AN>
1030  auto merge_transform(AN&&... an) const
1032  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1034  {
1035  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1036  }
1037 
1040  template<class... AN>
1041  auto concat(AN... an) const
1043  -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1045  {
1046  return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1047  }
1048 
1051  template<class... AN>
1052  auto concat_map(AN&&... an) const
1054  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1056  {
1057  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1058  }
1059 
1062  template<class... AN>
1063  auto concat_transform(AN&&... an) const
1065  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1067  {
1068  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1069  }
1070 
1073  template<class... AN>
1074  auto with_latest_from(AN... an) const
1076  -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1078  {
1079  return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1080  }
1081 
1082 
1085  template<class... AN>
1086  auto combine_latest(AN... an) const
1088  -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1090  {
1091  return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1092  }
1093 
1096  template<class... AN>
1097  auto zip(AN&&... an) const
1099  -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1101  {
1102  return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1103  }
1104 
1107  template<class... AN>
1108  inline auto group_by(AN&&... an) const
1110  -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1112  {
1113  return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1114  }
1115 
1118  template<class... AN>
1119  auto ignore_elements(AN&&... an) const
1121  -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1123  {
1124  return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1125  }
1126 
1129  template<class... AN>
1130  auto multicast(AN&&... an) const
1132  -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1134  {
1135  return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1136  }
1137 
1140  template<class... AN>
1141  auto publish(AN&&... an) const
1143  -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1145  {
1146  return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1147  }
1148 
1151  template<class... AN>
1152  auto publish_synchronized(AN&&... an) const
1154  -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1156  {
1157  return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1158  }
1159 
1162  template<class... AN>
1163  auto replay(AN&&... an) const
1165  -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1167  {
1168  return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1169  }
1170 
1173  template<class... AN>
1174  auto subscribe_on(AN&&... an) const
1176  -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1178  {
1179  return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1180  }
1181 
1184  template<class... AN>
1185  auto observe_on(AN&&... an) const
1187  -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1189  {
1190  return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1191  }
1192 
1195  template<class... AN>
1196  auto reduce(AN&&... an) const
1198  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1200  {
1201  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1202  }
1203 
1206  template<class... AN>
1207  auto accumulate(AN&&... an) const
1209  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1211  {
1212  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1213  }
1214 
1217  template<class... AN>
1218  auto first(AN**...) const
1222  {
1224  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1225  }
1226 
1229  template<class... AN>
1230  auto last(AN**...) const
1234  {
1236  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1237  }
1238 
1241  template<class... AN>
1242  auto count(AN**...) const
1246  {
1248  static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1249  }
1250 
1253  template<class... AN>
1254  auto sum(AN**...) const
1258  {
1260  static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1261  }
1262 
1265  template<class... AN>
1266  auto average(AN**...) const
1270  {
1272  static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1273  }
1274 
1277  template<class... AN>
1278  auto max(AN**...) const
1282  {
1284  static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1285  }
1286 
1289  template<class... AN>
1290  auto min(AN**...) const
1294  {
1296  static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1297  }
1298 
1301  template<class... AN>
1302  auto scan(AN... an) const
1304  -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1306  {
1307  return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1308  }
1309 
1312  template<class... AN>
1313  auto sample_with_time(AN&&... an) const
1315  -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1317  {
1318  return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1319  }
1320 
1323  template<class... AN>
1324  auto skip(AN... an) const
1326  -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1328  {
1329  return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1330  }
1331 
1334  template<class... AN>
1335  auto skip_while(AN... an) const
1337  -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1339  {
1340  return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1341  }
1342 
1345  template<class... AN>
1346  auto skip_last(AN... an) const
1348  -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1350  {
1351  return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1352  }
1353 
1356  template<class... AN>
1357  auto skip_until(AN... an) const
1359  -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1361  {
1362  return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1363  }
1364 
1367  template<class... AN>
1368  auto take(AN... an) const
1370  -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1372  {
1373  return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1374  }
1375 
1378  template<class... AN>
1379  auto take_last(AN&&... an) const
1381  -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1383  {
1384  return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1385  }
1386 
1389  template<class... AN>
1390  auto take_until(AN&&... an) const
1392  -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1394  {
1395  return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1396  }
1397 
1400  template<class... AN>
1401  auto take_while(AN&&... an) const
1403  -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1405  {
1406  return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1407  }
1408 
1411  template<class... AN>
1412  auto repeat(AN... an) const
1414  -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1416  {
1417  return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1418  }
1419 
1422  template<class... AN>
1423  auto retry(AN... an) const
1425  -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1427  {
1428  return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1429  }
1430 
1433  template<class... AN>
1434  auto start_with(AN... an) const
1436  -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1438  {
1439  return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1440  }
1441 
1444  template<class... AN>
1445  auto pairwise(AN... an) const
1447  -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1449  {
1450  return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1451  }
1452 };
1453 
1454 template<class T, class SourceOperator>
1456  return lhs.source_operator == rhs.source_operator;
1457 }
1458 template<class T, class SourceOperator>
1460  return !(lhs == rhs);
1461 }
1462 
1545 template<>
1546 class observable<void, void>
1547 {
1548  ~observable();
1549 public:
1552  template<class T, class OnSubscribe>
1553  static auto create(OnSubscribe os)
1554  -> decltype(rxs::create<T>(std::move(os))) {
1555  return rxs::create<T>(std::move(os));
1556  }
1557 
1560  template<class T>
1561  static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1562  -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1563  return rxs::range<T>(first, last, step, identity_current_thread());
1564  }
1567  template<class T, class Coordination>
1568  static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1569  -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1570  return rxs::range<T>(first, last, step, std::move(cn));
1571  }
1574  template<class T, class Coordination>
1575  static auto range(T first, T last, Coordination cn)
1576  -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1577  return rxs::range<T>(first, last, std::move(cn));
1578  }
1581  template<class T, class Coordination>
1582  static auto range(T first, Coordination cn)
1583  -> decltype(rxs::range<T>(first, std::move(cn))) {
1584  return rxs::range<T>(first, std::move(cn));
1585  }
1586 
1589  template<class T>
1590  static auto never()
1591  -> decltype(rxs::never<T>()) {
1592  return rxs::never<T>();
1593  }
1594 
1597  template<class ObservableFactory>
1598  static auto defer(ObservableFactory of)
1599  -> decltype(rxs::defer(std::move(of))) {
1600  return rxs::defer(std::move(of));
1601  }
1602 
1605  template<class... AN>
1606  static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1607  -> decltype(rxs::interval(period)) {
1608  return rxs::interval(period);
1609  static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1610  }
1613  template<class Coordination>
1614  static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1615  -> decltype(rxs::interval(period, std::move(cn))) {
1616  return rxs::interval(period, std::move(cn));
1617  }
1620  template<class... AN>
1621  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1622  -> decltype(rxs::interval(initial, period)) {
1623  return rxs::interval(initial, period);
1624  static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1625  }
1628  template<class Coordination>
1629  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1630  -> decltype(rxs::interval(initial, period, std::move(cn))) {
1631  return rxs::interval(initial, period, std::move(cn));
1632  }
1633 
1636  template<class... AN>
1637  static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1638  -> decltype(rxs::timer(at)) {
1639  return rxs::timer(at);
1640  static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1641  }
1644  template<class... AN>
1645  static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1646  -> decltype(rxs::timer(after)) {
1647  return rxs::timer(after);
1648  static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1649  }
1652  template<class Coordination>
1653  static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1654  -> decltype(rxs::timer(when, std::move(cn))) {
1655  return rxs::timer(when, std::move(cn));
1656  }
1659  template<class Coordination>
1660  static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1661  -> decltype(rxs::timer(when, std::move(cn))) {
1662  return rxs::timer(when, std::move(cn));
1663  }
1664 
1667  template<class Collection>
1668  static auto iterate(Collection c)
1669  -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1670  return rxs::iterate(std::move(c), identity_current_thread());
1671  }
1674  template<class Collection, class Coordination>
1675  static auto iterate(Collection c, Coordination cn)
1676  -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1677  return rxs::iterate(std::move(c), std::move(cn));
1678  }
1679 
1682  template<class T>
1683  static auto from()
1684  -> decltype( rxs::from<T>()) {
1685  return rxs::from<T>();
1686  }
1689  template<class T, class Coordination>
1690  static auto from(Coordination cn)
1691  -> typename std::enable_if<is_coordination<Coordination>::value,
1692  decltype( rxs::from<T>(std::move(cn)))>::type {
1693  return rxs::from<T>(std::move(cn));
1694  }
1697  template<class Value0, class... ValueN>
1698  static auto from(Value0 v0, ValueN... vn)
1699  -> typename std::enable_if<!is_coordination<Value0>::value,
1700  decltype( rxs::from(v0, vn...))>::type {
1701  return rxs::from(v0, vn...);
1702  }
1705  template<class Coordination, class Value0, class... ValueN>
1706  static auto from(Coordination cn, Value0 v0, ValueN... vn)
1707  -> typename std::enable_if<is_coordination<Coordination>::value,
1708  decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1709  return rxs::from(std::move(cn), v0, vn...);
1710  }
1711 
1714  template<class T>
1715  static auto just(T v)
1716  -> decltype(rxs::just(std::move(v))) {
1717  return rxs::just(std::move(v));
1718  }
1721  template<class T, class Coordination>
1722  static auto just(T v, Coordination cn)
1723  -> decltype(rxs::just(std::move(v), std::move(cn))) {
1724  return rxs::just(std::move(v), std::move(cn));
1725  }
1726 
1729  template<class Observable, class Value0, class... ValueN>
1730  static auto start_with(Observable o, Value0 v0, ValueN... vn)
1731  -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1732  return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1733  }
1734 
1737  template<class T>
1738  static auto empty()
1739  -> decltype(from<T>()) {
1740  return from<T>();
1741  }
1744  template<class T, class Coordination>
1745  static auto empty(Coordination cn)
1746  -> decltype(from<T>(std::move(cn))) {
1747  return from<T>(std::move(cn));
1748  }
1749 
1752  template<class T, class Exception>
1753  static auto error(Exception&& e)
1754  -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1755  return rxs::error<T>(std::forward<Exception>(e));
1756  }
1759  template<class T, class Exception, class Coordination>
1760  static auto error(Exception&& e, Coordination cn)
1761  -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1762  return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1763  }
1764 
1767  template<class ResourceFactory, class ObservableFactory>
1768  static auto scope(ResourceFactory rf, ObservableFactory of)
1769  -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1770  return rxs::scope(std::move(rf), std::move(of));
1771  }
1772 };
1773 
1774 }
1775 
1776 //
1777 // support range() >> filter() >> subscribe() syntax
1778 // '>>' is spelled 'stream'
1779 //
1780 template<class T, class SourceOperator, class OperatorFactory>
1781 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1782  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1783  return source.op(std::forward<OperatorFactory>(of));
1784 }
1785 
1786 //
1787 // support range() | filter() | subscribe() syntax
1788 // '|' is spelled 'pipe'
1789 //
1790 template<class T, class SourceOperator, class OperatorFactory>
1791 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1792  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793  return source.op(std::forward<OperatorFactory>(of));
1794 }
1795 
1796 #endif
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:876
Definition: rx-operators.hpp:126
#define RXCPP_TRY
Definition: rx-util.hpp:38
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay.
Definition: rx-observable.hpp:854
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1346
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1074
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:755
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-error.hpp:117
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1063
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-observable.hpp:1290
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1030
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1423
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1357
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it.
Definition: rx-observable.hpp:1553
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1324
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1575
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:887
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:553
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:667
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1196
source_operator_type source_operator
Definition: rx-observable.hpp:487
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1690
RXCPP_NORETURN void rethrow_current_exception()
Definition: rx-util.hpp:933
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:766
T max() const
Definition: rx-observable.hpp:400
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1401
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1781
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:215
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:898
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
Definition: rx-observable.hpp:1152
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:637
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1086
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1675
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:542
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1668
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:530
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:744
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:238
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1097
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1302
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:263
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1434
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:486
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1606
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:678
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-observable.hpp:1278
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable.
Definition: rx-observable.hpp:711
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:700
~blocking_observable()
Definition: rx-observable.hpp:217
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1715
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1698
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1598
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item,...
Definition: rx-observable.hpp:689
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1730
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:416
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:843
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1130
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1108
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1637
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1379
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1653
~observable()
Definition: rx-observable.hpp:534
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:821
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1313
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1445
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot,...
Definition: rx-observable.hpp:1163
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:733
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
Definition: rx-observable.hpp:997
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1683
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void ** >::type=0)
Definition: rx-observable.hpp:74
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1412
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned.
Definition: rx-observable.hpp:1368
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1174
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:865
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1335
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-observable.hpp:1760
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1561
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1738
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables,...
Definition: rx-observable.hpp:986
error_ptr current_exception()
Definition: rx-util.hpp:943
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:942
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1582
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1621
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-observable.hpp:1753
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1629
observable_type source
Definition: rx-observable.hpp:216
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:964
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:810
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:975
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1019
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:572
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:909
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:645
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1008
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:920
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1660
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1390
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:832
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:334
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:178
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1119
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1706
blocking_observable(observable_type s)
Definition: rx-observable.hpp:220
T sum() const
Definition: rx-observable.hpp:358
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:777
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false.
Definition: rx-observable.hpp:656
T min() const
Definition: rx-observable.hpp:421
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:953
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1768
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes.
Definition: rx-observable.hpp:722
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval,...
Definition: rx-observable.hpp:1614
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value....
Definition: rx-observable.hpp:1568
double average() const
Definition: rx-observable.hpp:379
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:284
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1722
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1185
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1207
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:558
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:931
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:799
observable()
Definition: rx-observable.hpp:538
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:312
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1590
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received....
Definition: rx-observable.hpp:1041
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1052
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions....
Definition: rx-observable.hpp:1141
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1745
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:546
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1645
Definition: rx-operators.hpp:514