RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-replaysubject.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP)
6 #define RXCPP_RX_REPLAYSUBJECT_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class Coordination>
17 struct replay_traits
18 {
19  typedef rxu::maybe<std::size_t> count_type;
20  typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
21  typedef rxsc::scheduler::clock_type::time_point time_point_type;
22  typedef rxu::decay_t<Coordination> coordination_type;
23  typedef typename coordination_type::coordinator_type coordinator_type;
24 };
25 
26 template<class T, class Coordination>
27 class replay_observer : public detail::multicast_observer<T>
28 {
29  typedef replay_observer<T, Coordination> this_type;
30  typedef detail::multicast_observer<T> base_type;
31 
32  typedef replay_traits<Coordination> traits;
33  typedef typename traits::count_type count_type;
34  typedef typename traits::period_type period_type;
35  typedef typename traits::time_point_type time_point_type;
36  typedef typename traits::coordination_type coordination_type;
37  typedef typename traits::coordinator_type coordinator_type;
38 
39  class replay_observer_state : public std::enable_shared_from_this<replay_observer_state>
40  {
41  mutable std::mutex lock;
42  mutable std::list<T> values;
43  mutable std::list<time_point_type> time_points;
44  mutable count_type count;
45  mutable period_type period;
46  mutable composite_subscription replayLifetime;
47  public:
48  mutable coordination_type coordination;
49  mutable coordinator_type coordinator;
50 
51  private:
52  void remove_oldest() const {
53  values.pop_front();
54  if (!period.empty()) {
55  time_points.pop_front();
56  }
57  }
58 
59  public:
60  ~replay_observer_state(){
61  replayLifetime.unsubscribe();
62  }
63  explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
64  : count(_count)
65  , period(_period)
66  , replayLifetime(_replayLifetime)
67  , coordination(std::move(_coordination))
68  , coordinator(std::move(_coordinator))
69  {
70  }
71 
72  void add(T v) const {
73  std::unique_lock<std::mutex> guard(lock);
74 
75  if (!count.empty()) {
76  if (values.size() == count.get())
77  remove_oldest();
78  }
79 
80  if (!period.empty()) {
81  auto now = coordination.now();
82  while (!time_points.empty() && (now - time_points.front() > period.get()))
83  remove_oldest();
84  time_points.push_back(now);
85  }
86 
87  values.push_back(std::move(v));
88  }
89  std::list<T> get() const {
90  std::unique_lock<std::mutex> guard(lock);
91  return values;
92  }
93  };
94 
95  std::shared_ptr<replay_observer_state> state;
96 
97 public:
98  replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
99  : base_type(subscriberLifetime)
100  {
101  replayLifetime.add(subscriberLifetime);
102  auto coordinator = coordination.create_coordinator(replayLifetime);
103  state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
104  }
105 
106  subscriber<T> get_subscriber() const {
107  return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic();
108  }
109 
110  std::list<T> get_values() const {
111  return state->get();
112  }
113 
114  coordinator_type& get_coordinator() const {
115  return state->coordinator;
116  }
117 
118  template<class V>
119  void on_next(V v) const {
120  state->add(v);
121  base_type::on_next(std::move(v));
122  }
123 };
124 
125 }
126 
127 template<class T, class Coordination>
128 class replay
129 {
130  typedef detail::replay_traits<Coordination> traits;
131  typedef typename traits::count_type count_type;
132  typedef typename traits::period_type period_type;
133  typedef typename traits::time_point_type time_point_type;
134 
135  detail::replay_observer<T, Coordination> s;
136 
137 public:
138  explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
139  : s(count_type(), period_type(), cn, cs, composite_subscription{})
140  {
141  }
142 
143  replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
144  : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
145  {
146  }
147 
148  replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
149  : s(count_type(), period_type(period), cn, cs, composite_subscription{})
150  {
151  }
152 
153  replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
154  : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
155  {
156  }
157 
158  bool has_observers() const {
159  return s.has_observers();
160  }
161 
162  std::list<T> get_values() const {
163  return s.get_values();
164  }
165 
167  return s.get_subscriber();
168  }
169 
171  auto keepAlive = s;
172  auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
173  for (auto&& value: get_values()) {
174  o.on_next(value);
175  }
176  keepAlive.add(keepAlive.get_subscriber(), std::move(o));
177  });
178  return s.get_coordinator().in(observable);
179  }
180 };
181 
182 }
183 
184 }
185 
186 #endif
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
Definition: rx-all.hpp:26
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:148
replay(std::size_t count, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:143
observable< T > get_observable() const
Definition: rx-replaysubject.hpp:170
void on_next(V &&v) const
Definition: rx-subscriber.hpp:176
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
replay(Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:138
subscriber< T > get_subscriber() const
Definition: rx-replaysubject.hpp:166
Definition: rx-replaysubject.hpp:128
std::list< T > get_values() const
Definition: rx-replaysubject.hpp:162
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs=composite_subscription())
Definition: rx-replaysubject.hpp:153
bool has_observers() const
Definition: rx-replaysubject.hpp:158