RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-merge_delay_error.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP)
45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP
46 
47 #include "rx-merge.hpp"
48 
49 #include "../rx-composite_exception.hpp"
50 
51 namespace rxcpp {
52 
53 namespace operators {
54 
55 namespace detail {
56 
57 template<class T, class Observable, class Coordination>
58 struct merge_delay_error
59  : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
60 {
61  //static_assert(is_observable<Observable>::value, "merge requires an observable");
62  //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
63 
64  typedef merge_delay_error<T, Observable, Coordination> this_type;
65 
66  typedef rxu::decay_t<T> source_value_type;
67  typedef rxu::decay_t<Observable> source_type;
68 
69  typedef typename source_type::source_operator_type source_operator_type;
70  typedef typename source_value_type::value_type value_type;
71 
72  typedef rxu::decay_t<Coordination> coordination_type;
73  typedef typename coordination_type::coordinator_type coordinator_type;
74 
75  struct values
76  {
77  values(source_operator_type o, coordination_type sf)
78  : source_operator(std::move(o))
79  , coordination(std::move(sf))
80  {
81  }
82  source_operator_type source_operator;
83  coordination_type coordination;
84  };
85  values initial;
86 
87  merge_delay_error(const source_type& o, coordination_type sf)
88  : initial(o.source_operator, std::move(sf))
89  {
90  }
91 
92  template<class Subscriber>
93  void on_subscribe(Subscriber scbr) const {
94  static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
95 
96  typedef Subscriber output_type;
97 
98  struct merge_state_type
99  : public std::enable_shared_from_this<merge_state_type>
100  , public values
101  {
102  merge_state_type(values i, coordinator_type coor, output_type oarg)
103  : values(i)
104  , source(i.source_operator)
105  , pendingCompletions(0)
106  , coordinator(std::move(coor))
107  , out(std::move(oarg))
108  {
109  }
110  observable<source_value_type, source_operator_type> source;
111  // on_completed on the output must wait until all the
112  // subscriptions have received on_completed
113  int pendingCompletions;
114  composite_exception exception;;
115  coordinator_type coordinator;
116  output_type out;
117  };
118 
119  auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
120 
121  // take a copy of the values for each subscription
122  auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
123 
124  composite_subscription outercs;
125 
126  // when the out observer is unsubscribed all the
127  // inner subscriptions are unsubscribed as well
128  state->out.add(outercs);
129 
130  auto source = on_exception(
131  [&](){return state->coordinator.in(state->source);},
132  state->out);
133  if (source.empty()) {
134  return;
135  }
136 
137  ++state->pendingCompletions;
138  // this subscribe does not share the observer subscription
139  // so that when it is unsubscribed the observer can be called
140  // until the inner subscriptions have finished
141  auto sink = make_subscriber<source_value_type>(
142  state->out,
143  outercs,
144  // on_next
145  [state](source_value_type st) {
146 
147  composite_subscription innercs;
148 
149  // when the out observer is unsubscribed all the
150  // inner subscriptions are unsubscribed as well
151  auto innercstoken = state->out.add(innercs);
152 
153  innercs.add(make_subscription([state, innercstoken](){
154  state->out.remove(innercstoken);
155  }));
156 
157  auto selectedSource = state->coordinator.in(st);
158 
159  ++state->pendingCompletions;
160  // this subscribe does not share the source subscription
161  // so that when it is unsubscribed the source will continue
162  auto sinkInner = make_subscriber<value_type>(
163  state->out,
164  innercs,
165  // on_next
166  [state, st](value_type ct) {
167  state->out.on_next(std::move(ct));
168  },
169  // on_error
170  [state](rxu::error_ptr e) {
171  if(--state->pendingCompletions == 0) {
172  state->out.on_error(
173  rxu::make_error_ptr(std::move(state->exception.add(e))));
174  } else {
175  state->exception.add(e);
176  }
177  },
178  //on_completed
179  [state](){
180  if (--state->pendingCompletions == 0) {
181  if(!state->exception.empty()) {
182  state->out.on_error(
183  rxu::make_error_ptr(std::move(state->exception)));
184  } else {
185  state->out.on_completed();
186  }
187  }
188  }
189  );
190 
191  auto selectedSinkInner = state->coordinator.out(sinkInner);
192  selectedSource.subscribe(std::move(selectedSinkInner));
193  },
194  // on_error
195  [state](rxu::error_ptr e) {
196  if(--state->pendingCompletions == 0) {
197  state->out.on_error(
198  rxu::make_error_ptr(std::move(state->exception.add(e))));
199  } else {
200  state->exception.add(e);
201  }
202  },
203  // on_completed
204  [state]() {
205  if (--state->pendingCompletions == 0) {
206  if(!state->exception.empty()) {
207  state->out.on_error(
208  rxu::make_error_ptr(std::move(state->exception)));
209  } else {
210  state->out.on_completed();
211  }
212  }
213  }
214  );
215  auto selectedSink = on_exception(
216  [&](){return state->coordinator.out(sink);},
217  state->out);
218  if (selectedSink.empty()) {
219  return;
220  }
221  source->subscribe(std::move(selectedSink.get()));
222  }
223 };
224 
225 }
226 
229 template<class... AN>
230 auto merge_delay_error(AN&&... an)
232  return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234 
235 }
236 
237 template<>
239 {
240  template<class Observable,
241  class Enabled = rxu::enable_if_all_true_type_t<
243  class SourceValue = rxu::value_type_t<Observable>,
244  class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
245  class Value = rxu::value_type_t<SourceValue>,
246  class Result = observable<Value, Merge>
247  >
248  static Result member(Observable&& o) {
249  return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
250  }
251 
252  template<class Observable, class Coordination,
253  class Enabled = rxu::enable_if_all_true_type_t<
256  class SourceValue = rxu::value_type_t<Observable>,
257  class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
258  class Value = rxu::value_type_t<SourceValue>,
259  class Result = observable<Value, Merge>
260  >
261  static Result member(Observable&& o, Coordination&& cn) {
262  return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
263  }
264 
265  template<class Observable, class Value0, class... ValueN,
266  class Enabled = rxu::enable_if_all_true_type_t<
267  all_observables<Observable, Value0, ValueN...>>,
268  class EmittedValue = rxu::value_type_t<Observable>,
269  class SourceValue = observable<EmittedValue>,
270  class ObservableObservable = observable<SourceValue>,
272  class Value = rxu::value_type_t<Merge>,
273  class Result = observable<Value, Merge>
274  >
275  static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
276  return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
277  }
278 
279  template<class Observable, class Coordination, class Value0, class... ValueN,
280  class Enabled = rxu::enable_if_all_true_type_t<
281  all_observables<Observable, Value0, ValueN...>,
283  class EmittedValue = rxu::value_type_t<Observable>,
284  class SourceValue = observable<EmittedValue>,
285  class ObservableObservable = observable<SourceValue>,
287  class Value = rxu::value_type_t<Merge>,
288  class Result = observable<Value, Merge>
289  >
290  static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
291  return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
292  }
293 
294  template<class... AN>
295  static operators::detail::merge_invalid_t<AN...> member(AN...) {
296  std::terminate();
297  return {};
298  static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
299  }
300 };
301 
302 }
303 
304 #endif
Definition: rx-util.hpp:112
auto merge_delay_error(AN &&... an) -> operator_factory< merge_delay_error_tag, AN... >
Definition: rx-merge_delay_error.hpp:230
static operators::detail::merge_invalid_t< AN... > member(AN...)
Definition: rx-merge_delay_error.hpp:295
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&... vn)
Definition: rx-merge_delay_error.hpp:290
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-merge_delay_error.hpp:261
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:47
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
Definition: rx-operators.hpp:47
static Result member(Observable &&o, Value0 &&v0, ValueN &&... vn)
Definition: rx-merge_delay_error.hpp:275
static const bool value
Definition: rx-predef.hpp:123
static Result member(Observable &&o)
Definition: rx-merge_delay_error.hpp:248
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:126
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
Definition: rx-util.hpp:337
For each given observable subscribe. For each item emitted from all of the given observables,...
Definition: rx-operators.hpp:261
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37