21 #if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP)
22 #define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP
24 #include "../rx-includes.hpp"
33 struct time_interval_invalid_arguments {};
36 struct time_interval_invalid :
public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
37 using type = observable<time_interval_invalid_arguments<
AN...>, time_interval_invalid<
AN...>>;
40 using time_interval_invalid_t =
typename time_interval_invalid<
AN...>::type;
42 template<
class T,
class Coordination>
45 typedef rxu::decay_t<T> source_value_type;
46 typedef rxu::decay_t<Coordination> coordination_type;
48 struct time_interval_values {
49 time_interval_values(coordination_type c)
54 coordination_type coordination;
56 time_interval_values initial;
59 : initial(coordination)
63 template<
class Subscriber>
64 struct time_interval_observer
66 typedef time_interval_observer<Subscriber> this_type;
67 typedef source_value_type value_type;
68 typedef rxu::decay_t<Subscriber> dest_type;
69 typedef observer<value_type, this_type> observer_type;
70 typedef rxsc::scheduler::clock_type::time_point time_point;
72 coordination_type coord;
73 mutable time_point
last;
75 time_interval_observer(dest_type d, coordination_type coordination)
77 coord(std::move(coordination)),
82 void on_next(source_value_type)
const {
83 time_point now = coord.now();
84 dest.on_next(now -
last);
90 void on_completed()
const {
94 static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
95 return make_subscriber<value_type>(d, this_type(d, v.coordination));
99 template<
class Subscriber>
100 auto operator()(Subscriber dest)
const
101 -> decltype(time_interval_observer<Subscriber>::make(std::move(dest), initial)) {
102 return time_interval_observer<Subscriber>::make(std::move(dest), initial);
110 template<
class...
AN>
121 template<
class Observable,
125 class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
126 class Value =
typename rxsc::scheduler::clock_type::time_point::duration>
132 template<
class Observable,
class Coordination,
137 class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
138 class Value =
typename rxsc::scheduler::clock_type::time_point::duration>
139 static auto member(Observable&& o, Coordination&& cn)
140 -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
141 return o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
144 template<
class...
AN>
145 static operators::detail::time_interval_invalid_t<
AN...>
member(
AN...) {
148 static_assert(
sizeof...(
AN) == 10000,
"time_interval takes (optional Coordination)");