5 #if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
6 #define RXCPP_RX_SYNCHRONIZE_HPP
8 #include "../rx-includes.hpp"
16 template<
class T,
class Coordination>
17 class synchronize_observer :
public detail::multicast_observer<T>
19 typedef synchronize_observer<T, Coordination> this_type;
20 typedef detail::multicast_observer<T> base_type;
22 typedef rxu::decay_t<Coordination> coordination_type;
23 typedef typename coordination_type::coordinator_type coordinator_type;
24 typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
26 struct synchronize_observer_state :
public std::enable_shared_from_this<synchronize_observer_state>
28 typedef rxn::notification<T> notification_type;
30 typedef std::deque<base_notification_type> queue_type;
42 mutable std::mutex lock;
43 mutable std::condition_variable wake;
44 mutable queue_type fill_queue;
45 composite_subscription lifetime;
46 mutable typename mode::type current;
47 coordinator_type coordinator;
48 output_type destination;
50 void ensure_processing(std::unique_lock<std::mutex>& guard)
const {
51 if (!guard.owns_lock()) {
54 if (current == mode::Empty) {
55 current = mode::Processing;
56 auto keepAlive = this->shared_from_this();
58 auto drain_queue = [keepAlive,
this](
const rxsc::schedulable&
self){
60 std::unique_lock<std::mutex> guard(lock);
61 if (!destination.is_subscribed()) {
62 current = mode::Disposed;
65 lifetime.unsubscribe();
68 if (fill_queue.empty()) {
69 current = mode::Empty;
72 auto notification = std::move(fill_queue.front());
73 fill_queue.pop_front();
75 notification->accept(destination);
79 std::unique_lock<std::mutex> guard(lock);
80 current = mode::Empty;
85 [&](){
return coordinator.act(drain_queue);},
87 if (selectedDrain.empty()) {
91 auto processor = coordinator.get_worker();
92 processor.schedule(lifetime, selectedDrain.get());
96 synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
97 : lifetime(std::move(cs))
98 , current(mode::Empty)
99 , coordinator(std::move(coor))
100 , destination(std::move(scbr))
105 void on_next(V v)
const {
106 if (lifetime.is_subscribed()) {
107 std::unique_lock<std::mutex> guard(lock);
109 ensure_processing(guard);
114 if (lifetime.is_subscribed()) {
115 std::unique_lock<std::mutex> guard(lock);
117 ensure_processing(guard);
121 void on_completed()
const {
122 if (lifetime.is_subscribed()) {
123 std::unique_lock<std::mutex> guard(lock);
125 ensure_processing(guard);
131 std::shared_ptr<synchronize_observer_state> state;
134 synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
137 auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *
static_cast<base_type*
>(
this) ));
140 auto coordinator = cn.create_coordinator(dl);
142 state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
145 subscriber<T> get_subscriber()
const {
146 return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*
this)).as_dynamic();
150 void on_next(V v)
const {
151 state->on_next(std::move(v));
156 void on_completed()
const {
157 state->on_completed();
163 template<
class T,
class Coordination>
166 detail::synchronize_observer<T, Coordination> s;
175 return s.has_observers();
179 return s.get_subscriber();
185 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
205 , coordination(factory)
214 inline rxsc::scheduler::clock_type::time_point
now()
const {
215 return factory.
now();
217 template<
class Observable>
218 auto in(Observable o)
const
219 -> decltype(o.publish_synchronized(coordination).ref_count()) {
220 return o.publish_synchronized(coordination).ref_count();
222 template<
class Subscriber>
223 auto out(Subscriber s)
const
240 inline rxsc::scheduler::clock_type::time_point
now()
const {
241 return factory.
now();