RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
|
Go to the documentation of this file.
5 #if !defined(RXCPP_RX_COORDINATION_HPP)
6 #define RXCPP_RX_COORDINATION_HPP
15 template<
class T,
class C = rxu::types_checked>
19 struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
20 :
public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
27 template<
class T,
class C = rxu::types_checked>
31 struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
32 :
public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
36 template<
class T,
class Decayed = rxu::decay_t<T>>
41 template<
class Coordination,
class DecayedCoordination = rxu::decay_t<Coordination>>
51 struct not_supported {
typedef not_supported type;};
53 template<
class Observable>
56 typedef decltype((*(
input_type*)
nullptr).
in((*(Observable*)
nullptr))) type;
59 template<
class Subscriber>
62 typedef decltype((*(
input_type*)
nullptr).
out((*(Subscriber*)
nullptr))) type;
66 struct get_action_function
68 typedef decltype((*(
input_type*)
nullptr).
act((*(F*)
nullptr))) type;
77 typedef typename std::conditional<
78 rxsc::detail::is_action_function<T>::value, get_action_function<T>,
typename std::conditional<
86 return input.get_worker();
89 return input.get_scheduler();
92 template<
class Observable>
93 auto in(Observable o)
const
94 ->
typename get_observable<Observable>::type {
95 return input.in(std::move(o));
99 template<
class Subscriber>
100 auto out(Subscriber s)
const
101 ->
typename get_subscriber<Subscriber>::type {
102 return input.out(std::move(s));
108 ->
typename get_action_function<F>::type {
109 return input.act(std::move(f));
110 static_assert(rxsc::detail::is_action_function<F>::value,
"can only synchronize action functions");
134 inline rxsc::scheduler::clock_type::time_point
now()
const {
135 return factory.
now();
137 template<
class Observable>
138 auto in(Observable o)
const
142 template<
class Subscriber>
143 auto out(Subscriber s)
const
160 inline rxsc::scheduler::clock_type::time_point
now()
const {
161 return factory.
now();
189 struct serialize_action
192 std::shared_ptr<std::mutex> lock;
193 serialize_action(F d, std::shared_ptr<std::mutex> m)
202 -> decltype(dest(scbl)) {
203 std::unique_lock<std::mutex> guard(*lock);
208 template<
class Observer>
209 struct serialize_observer
211 typedef serialize_observer<Observer> this_type;
213 typedef typename dest_type::value_type value_type;
216 std::shared_ptr<std::mutex> lock;
218 serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
226 void on_next(value_type v)
const {
227 std::unique_lock<std::mutex> guard(*lock);
231 std::unique_lock<std::mutex> guard(*lock);
234 void on_completed()
const {
235 std::unique_lock<std::mutex> guard(*lock);
239 template<
class Subscriber>
241 return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
249 std::shared_ptr<std::mutex> lock;
251 explicit input_type(
rxsc::worker w, std::shared_ptr<std::mutex> m)
263 inline rxsc::scheduler::clock_type::time_point
now()
const {
264 return factory.
now();
266 template<
class Observable>
267 auto in(Observable o)
const
271 template<
class Subscriber>
272 auto out(
const Subscriber& s)
const
273 -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
274 return serialize_observer<decltype(s.get_observer())>::make(s, lock);
278 -> serialize_action<F> {
279 return serialize_action<F>(std::move(f), lock);
289 inline rxsc::scheduler::clock_type::time_point
now()
const {
290 return factory.
now();
295 std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:289
rxsc::scheduler get_scheduler() const
Definition: rx-coordination.hpp:88
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:287
Input input_type
Definition: rx-coordination.hpp:48
rxsc::scheduler::clock_type::time_point now() const
Definition: rx-coordination.hpp:160
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
clock_type::time_point now() const
return the current time for this scheduler
Definition: rx-scheduler.hpp:404
Definition: rx-coordination.hpp:184
identity_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:156
auto act(F f) const -> typename get_action_function< F >::type
Definition: rx-coordination.hpp:107
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:293
coordinator_type create_coordinator(composite_subscription cs=composite_subscription()) const
Definition: rx-coordination.hpp:164
Definition: rx-coordination.hpp:12
Definition: rx-scheduler.hpp:200
identity_one_worker identity_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:180
Definition: rx-predef.hpp:177
typename DecayedCoordination::coordination_tag coordination_tag_t
Definition: rx-coordination.hpp:42
serialize_one_worker serialize_same_worker(rxsc::worker w)
Definition: rx-coordination.hpp:310
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-coordination.hpp:114
scheduler make_event_loop()
Definition: rx-eventloop.hpp:106
scheduler make_same_worker(rxsc::worker w)
Definition: rx-sameworker.hpp:44
scheduler make_new_thread()
Definition: rx-newthread.hpp:169
coordinator< input_type > coordinator_type
Definition: rx-coordination.hpp:158
identity_one_worker identity_immediate()
Definition: rx-coordination.hpp:170
input_type input
Definition: rx-coordination.hpp:72
tag_coordinator coordinator_tag
Definition: rx-coordination.hpp:13
Definition: rx-coordination.hpp:45
tag_coordination coordination_tag
Definition: rx-coordination.hpp:23
Definition: rx-coordination.hpp:23
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
serialize_one_worker serialize_event_loop()
Definition: rx-coordination.hpp:300
Definition: rx-coordination.hpp:16
serialize_one_worker(rxsc::scheduler sc)
Definition: rx-coordination.hpp:285
Definition: rx-coordination.hpp:37
Definition: rx-coordination.hpp:13
auto in(Observable o) const -> typename get_observable< Observable >::type
Definition: rx-coordination.hpp:93
serialize_one_worker serialize_new_thread()
Definition: rx-coordination.hpp:305
Definition: rx-scheduler.hpp:426
Definition: rx-coordination.hpp:22
coordinator(Input i)
Definition: rx-coordination.hpp:83
Definition: rx-predef.hpp:115
auto out(Subscriber s) const -> typename get_subscriber< Subscriber >::type
Definition: rx-coordination.hpp:100
std::conditional< rxsc::detail::is_action_function< T >::value, get_action_function< T >, typename std::conditional< is_observable< T >::value, get_observable< T >, typename std::conditional< is_subscriber< T >::value, get_subscriber< T >, not_supported >::type >::type >::type::type type
Definition: rx-coordination.hpp:80
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
const scheduler & make_immediate()
Definition: rx-immediate.hpp:75
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
Definition: rx-coordination.hpp:75
Definition: rx-all.hpp:26
worker create_worker(composite_subscription cs=composite_subscription()) const
Definition: rx-scheduler.hpp:412