RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-error.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_ERROR_HPP)
6 #define RXCPP_SOURCES_RX_ERROR_HPP
7 
8 #include "../rx-includes.hpp"
9 
32 namespace rxcpp {
33 
34 namespace sources {
35 
36 namespace detail {
37 
38 template<class T, class Coordination>
39 struct error : public source_base<T>
40 {
41  typedef error<T, Coordination> this_type;
42 
43  typedef rxu::decay_t<Coordination> coordination_type;
44 
45  typedef typename coordination_type::coordinator_type coordinator_type;
46 
47  struct error_initial_type
48  {
49  error_initial_type(rxu::error_ptr e, coordination_type cn)
50  : exception(e)
51  , coordination(std::move(cn))
52  {
53  }
54  rxu::error_ptr exception;
55  coordination_type coordination;
56  };
57  error_initial_type initial;
58 
59  error(rxu::error_ptr e, coordination_type cn)
60  : initial(e, std::move(cn))
61  {
62  }
63 
64  template<class Subscriber>
65  void on_subscribe(Subscriber o) const {
66 
67  // creates a worker whose lifetime is the same as this subscription
68  auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
69  auto controller = coordinator.get_worker();
70  auto exception = initial.exception;
71 
72  auto producer = [=](const rxsc::schedulable&){
73  auto& dest = o;
74  if (!dest.is_subscribed()) {
75  // terminate loop
76  return;
77  }
78 
79  dest.on_error(exception);
80  // o is unsubscribed
81  };
82  auto selectedProducer = on_exception(
83  [&](){return coordinator.act(producer);},
84  o);
85  if (selectedProducer.empty()) {
86  return;
87  }
88  controller.schedule(selectedProducer.get());
89  }
90 };
91 
92 struct throw_ptr_tag{};
93 struct throw_instance_tag{};
94 
95 template <class T, class Coordination>
96 auto make_error(throw_ptr_tag&&, rxu::error_ptr exception, Coordination cn)
97  -> observable<T, error<T, Coordination>> {
98  return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
99 }
100 
101 template <class T, class E, class Coordination>
102 auto make_error(throw_instance_tag&&, E e, Coordination cn)
103  -> observable<T, error<T, Coordination>> {
105  return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(ep), std::move(cn)));
106 }
107 
108 }
109 
110 }
111 
112 namespace sources {
113 
116 template<class T, class E>
117 auto error(E e)
118  -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
119  return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
120 }
123 template<class T, class E, class Coordination>
124 auto error(E e, Coordination cn)
125  -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
126  return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
127 }
128 
129 }
130 
131 }
132 
133 #endif
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error,...
Definition: rx-error.hpp:117
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640