44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP) 45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP 49 #include "../rx-composite_exception.hpp" 57 template<
class T,
class Observable,
class Coordination>
59 :
public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
64 typedef merge_delay_error<T, Observable, Coordination> this_type;
66 typedef rxu::decay_t<T> source_value_type;
67 typedef rxu::decay_t<Observable> source_type;
69 typedef typename source_type::source_operator_type source_operator_type;
70 typedef typename source_value_type::value_type value_type;
72 typedef rxu::decay_t<Coordination> coordination_type;
73 typedef typename coordination_type::coordinator_type coordinator_type;
77 values(source_operator_type o, coordination_type sf)
78 : source_operator(std::move(o))
79 , coordination(std::move(sf))
82 source_operator_type source_operator;
83 coordination_type coordination;
88 : initial(o.source_operator, std::move(sf))
92 template<
class Subscriber>
93 void on_subscribe(Subscriber scbr)
const {
96 typedef Subscriber output_type;
98 struct merge_state_type
99 :
public std::enable_shared_from_this<merge_state_type>
102 merge_state_type(values i, coordinator_type coor, output_type oarg)
104 , source(i.source_operator)
105 , pendingCompletions(0)
106 , coordinator(std::move(coor))
107 , out(std::move(oarg))
110 observable<source_value_type, source_operator_type> source;
113 int pendingCompletions;
114 composite_exception exception;;
115 coordinator_type coordinator;
119 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
122 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
124 composite_subscription outercs;
128 state->out.add(outercs);
131 [&](){
return state->coordinator.in(state->source);},
133 if (source.empty()) {
137 ++state->pendingCompletions;
141 auto sink = make_subscriber<source_value_type>(
145 [state](source_value_type st) {
147 composite_subscription innercs;
151 auto innercstoken = state->out.add(innercs);
153 innercs.add(make_subscription([state, innercstoken](){
154 state->out.remove(innercstoken);
157 auto selectedSource = state->coordinator.in(st);
159 ++state->pendingCompletions;
162 auto sinkInner = make_subscriber<value_type>(
166 [state, st](value_type ct) {
167 state->out.on_next(std::move(ct));
171 if(--state->pendingCompletions == 0) {
173 rxu::make_error_ptr(std::move(state->exception.add(e))));
175 state->exception.add(e);
180 if (--state->pendingCompletions == 0) {
181 if(!state->exception.empty()) {
185 state->out.on_completed();
191 auto selectedSinkInner = state->coordinator.out(sinkInner);
192 selectedSource.subscribe(std::move(selectedSinkInner));
196 if(--state->pendingCompletions == 0) {
200 state->exception.add(e);
205 if (--state->pendingCompletions == 0) {
206 if(!state->exception.empty()) {
210 state->out.on_completed();
216 [&](){
return state->coordinator.out(sink);},
218 if (selectedSink.empty()) {
221 source->subscribe(std::move(selectedSink.get()));
229 template<
class...
AN>
240 template<
class Observable,
244 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>,
identity_one_worker>,
252 template<
class Observable,
class Coordination,
261 static Result
member(Observable&& o, Coordination&& cn) {
262 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
265 template<
class Observable,
class Value0,
class... ValueN,
275 static Result
member(Observable&& o, Value0&& v0, ValueN&&... vn) {
279 template<
class Observable,
class Coordination,
class Value0,
class... ValueN,
290 static Result
member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
291 return Result(Merge(
rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
294 template<
class...
AN>
295 static operators::detail::merge_invalid_t<
AN...>
member(
AN...) {
298 static_assert(
sizeof...(
AN) == 10000,
"merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
Definition: rx-util.hpp:112
auto merge_delay_error(AN &&... an) -> operator_factory< merge_delay_error_tag, AN... >
Definition: rx-merge_delay_error.hpp:230
static operators::detail::merge_invalid_t< AN... > member(AN...)
Definition: rx-merge_delay_error.hpp:295
static Result member(Observable &&o, Coordination &&cn, Value0 &&v0, ValueN &&... vn)
Definition: rx-merge_delay_error.hpp:290
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
static Result member(Observable &&o, Coordination &&cn)
Definition: rx-merge_delay_error.hpp:261
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:47
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
Definition: rx-operators.hpp:47
static Result member(Observable &&o, Value0 &&v0, ValueN &&... vn)
Definition: rx-merge_delay_error.hpp:275
static const bool value
Definition: rx-predef.hpp:123
static Result member(Observable &&o)
Definition: rx-merge_delay_error.hpp:248
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:126
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
Definition: rx-util.hpp:337
For each given observable subscribe. For each item emitted from all of the given observables,...
Definition: rx-operators.hpp:261
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
Definition: rx-coordination.hpp:114
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37