5 #if !defined(RXCPP_RX_NOTIFICATION_HPP) 6 #define RXCPP_RX_NOTIFICATION_HPP 12 namespace notifications {
21 : s(s), u(std::numeric_limits<long>::
max()) {
46 struct notification_base
47 :
public std::enable_shared_from_this<notification_base<T>>
50 typedef std::shared_ptr<notification_base<T>> type;
52 virtual ~notification_base() {}
54 virtual void out(std::ostream& out)
const =0;
55 virtual bool equals(
const type& other)
const = 0;
56 virtual void accept(
const observer_type& o)
const =0;
60 std::ostream&
operator<< (std::ostream& out,
const std::vector<T>& v);
63 auto to_stream(std::ostream& os,
const T& t,
int,
int)
64 -> decltype(os << t) {
70 std::ostream& to_stream(std::ostream& os,
const T&,
int, ...) {
71 return os <<
"< " <<
typeid(T).name() <<
" does not support ostream>";
76 std::ostream& to_stream(std::ostream& os,
const T&, ...) {
77 return os <<
"<the value does not support ostream>";
81 inline std::ostream& ostreamvector (std::ostream& os,
const std::vector<T>& v) {
90 to_stream(os, i, 0, 0);
97 inline std::ostream&
operator<< (std::ostream& os,
const std::vector<T>& v) {
98 return ostreamvector(os, v);
102 auto equals(
const T& lhs,
const T& rhs,
int)
103 -> decltype(
bool(lhs == rhs)) {
108 bool equals(
const T&,
const T&, ...) {
118 typedef typename detail::notification_base<T>::type
type;
122 typedef detail::notification_base<T> base;
124 struct on_next_notification :
public base {
125 on_next_notification(T value) : value(std::move(value)) {
127 on_next_notification(
const on_next_notification& o) : value(o.value) {}
128 on_next_notification(
const on_next_notification&& o) : value(std::move(o.value)) {}
129 on_next_notification& operator=(on_next_notification o) { value = std::move(o.value);
return *
this; }
130 virtual void out(std::ostream& os)
const {
132 detail::to_stream(os, value, 0, 0);
135 virtual bool equals(
const typename base::type& other)
const {
137 other->accept(make_subscriber<T>(make_observer_dynamic<T>([
this, &result](T v) {
138 result = detail::equals(this->value, v, 0);
148 struct on_error_notification :
public base {
151 on_error_notification(
const on_error_notification& o) : ep(o.ep) {}
152 on_error_notification(
const on_error_notification&& o) : ep(std::move(o.ep)) {}
153 on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep);
return *
this; }
154 virtual void out(std::ostream& os)
const {
159 virtual bool equals(
const typename base::type& other)
const {
162 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](
rxu::error_ptr){
167 virtual void accept(
const typename base::observer_type& o)
const {
173 struct on_completed_notification :
public base {
174 on_completed_notification() {
176 virtual void out(std::ostream& os)
const {
177 os <<
"on_completed()";
179 virtual bool equals(
const typename base::type& other)
const {
181 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
186 virtual void accept(
const typename base::observer_type& o)
const {
191 struct exception_tag {};
193 template<
typename Exception>
195 type make_on_error(exception_tag&&, Exception&& e) {
197 return std::make_shared<on_error_notification>(ep);
200 struct exception_ptr_tag {};
204 return std::make_shared<on_error_notification>(ep);
210 return std::make_shared<on_next_notification>(std::move(value));
214 return std::make_shared<on_completed_notification>();
217 template<
typename Exception>
219 return make_on_error(
typename std::conditional<
221 exception_ptr_tag, exception_tag>::
type(),
222 std::forward<Exception>(e));
227 bool operator == (
const std::shared_ptr<detail::notification_base<T>>& lhs,
const std::shared_ptr<detail::notification_base<T>>& rhs) {
228 if (!lhs && !rhs) {
return true;}
229 if (!lhs || !rhs) {
return false;}
230 return lhs->equals(rhs);
234 std::ostream&
operator<< (std::ostream& os,
const std::shared_ptr<detail::notification_base<T>>& n) {
264 out <<
"@" << r.
time() <<
"-" << r.
value();
269 namespace rxn=notifications;
271 inline std::ostream&
operator<< (std::ostream& out,
const std::vector<rxcpp::notifications::subscription>& vs) {
272 return rxcpp::notifications::detail::ostreamvector(out, vs);
276 return rxcpp::notifications::detail::ostreamvector(out, vr);
Definition: rx-notification.hpp:241
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
Definition: rx-all.hpp:26
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
bool operator==(subscription lhs, subscription rhs)
Definition: rx-notification.hpp:34
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
static type on_completed()
Definition: rx-notification.hpp:213
Definition: rx-notification.hpp:116
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
subscription(long s)
Definition: rx-notification.hpp:20
long time() const
Definition: rx-notification.hpp:249
std::ostream & operator<<(std::ostream &out, const std::vector< rxcpp::notifications::subscription > &vs)
Definition: rx-notification.hpp:271
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
static type on_error(Exception &&e)
Definition: rx-notification.hpp:218
subscription(long s, long u)
Definition: rx-notification.hpp:23
static type on_next(U value)
Definition: rx-notification.hpp:209
Definition: rx-notification.hpp:14
const T & value() const
Definition: rx-notification.hpp:252
detail::notification_base< T >::type type
Definition: rx-notification.hpp:118
detail::notification_base< T >::observer_type observer_type
Definition: rx-notification.hpp:119
long subscribe() const
Definition: rx-notification.hpp:26
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:544
std::ostream & operator<<(std::ostream &out, const subscription &s)
Definition: rx-notification.hpp:38
recorded(long t, T v)
Definition: rx-notification.hpp:246
long unsubscribe() const
Definition: rx-notification.hpp:29