5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
8 #include "../rx-includes.hpp"
12 namespace schedulers {
16 class test_type :
public scheduler_interface
22 struct test_type_state :
public virtual_time<long, long>
24 typedef virtual_time<long, long> base;
27 using base::schedule_relative;
29 clock_type::time_point now()
const {
30 return to_time_point(clock_now);
33 virtual void schedule_absolute(
long when,
const schedulable& a)
const
35 if (when <= base::clock_now)
36 when = base::clock_now + 1;
41 virtual long add(
long absolute,
long relative)
const
43 return absolute + relative;
46 virtual clock_type::time_point to_time_point(
long absolute)
const
48 return clock_type::time_point(std::chrono::milliseconds(absolute));
51 virtual long to_relative(clock_type::duration d)
const
53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
58 mutable std::shared_ptr<test_type_state> state;
61 struct test_type_worker :
public worker_interface
63 mutable std::shared_ptr<test_type_state> state;
65 typedef test_type_state::absolute absolute;
66 typedef test_type_state::relative relative;
68 test_type_worker(std::shared_ptr<test_type_state> st)
69 : state(std::move(st))
73 virtual clock_type::time_point now()
const {
77 virtual void schedule(
const schedulable& scbl)
const {
78 state->schedule_absolute(state->clock(), scbl);
81 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
82 state->schedule_relative(state->to_relative(when - now()), scbl);
85 void schedule_absolute(absolute when,
const schedulable& scbl)
const {
86 state->schedule_absolute(when, scbl);
89 void schedule_relative(relative when,
const schedulable& scbl)
const {
90 state->schedule_relative(when, scbl);
93 bool is_enabled()
const {
return state->is_enabled();}
94 absolute clock()
const {
return state->clock();}
106 void advance_to(absolute time)
const
108 state->advance_to(time);
111 void advance_by(relative time)
const
113 state->advance_by(time);
116 void sleep(relative time)
const
127 : state(std::make_shared<test_type_state>())
131 virtual clock_type::time_point now()
const {
135 virtual worker create_worker(composite_subscription cs)
const {
136 return worker(cs, std::make_shared<test_type_worker>(state));
139 bool is_enabled()
const {
return state->is_enabled();}
141 return state->clock();
144 clock_type::time_point to_time_point(
long absolute)
const {
145 return state->to_time_point(absolute);
148 std::shared_ptr<test_type_worker> create_test_type_worker_interface()
const {
149 return std::make_shared<test_type_worker>(state);
153 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
156 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const;
161 :
public rxt::detail::test_subject_base<T>
163 typedef typename rxn::notification<T> notification_type;
164 typedef rxn::recorded<typename notification_type::type> recorded_type;
167 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
172 std::shared_ptr<test_type::test_type_state> sc;
173 std::vector<recorded_type> m;
175 virtual void on_subscribe(subscriber<T>)
const {
178 virtual std::vector<rxn::subscription> subscriptions()
const {
182 virtual std::vector<recorded_type> messages()
const {
190 typedef typename rxn::notification<T> notification_type;
191 typedef rxn::recorded<typename notification_type::type> recorded_type;
193 auto ts = std::make_shared<mock_observer<T>>(state);
195 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
200 recorded_type(ts->sc->clock(), notification_type::on_next(value)));
206 recorded_type(ts->sc->clock(), notification_type::on_error(e)));
212 recorded_type(ts->sc->clock(), notification_type::on_completed()));
217 class cold_observable
218 :
public rxt::detail::test_subject_base<T>
220 typedef cold_observable<T> this_type;
221 std::shared_ptr<test_type::test_type_state> sc;
222 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223 mutable std::vector<recorded_type> mv;
224 mutable std::vector<rxn::subscription> sv;
225 mutable worker controller;
229 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
236 template<
class Iterator>
237 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
244 virtual void on_subscribe(subscriber<T> o)
const {
245 sv.push_back(rxn::subscription(sc->clock()));
246 auto index = sv.size() - 1;
248 for (
auto& message : mv) {
249 auto n = message.value();
252 [n, o](
const schedulable&) {
253 if (o.is_subscribed()) {
259 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260 o.add([sharedThis, index]() {
261 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
265 virtual std::vector<rxn::subscription> subscriptions()
const {
269 virtual std::vector<recorded_type> messages()
const {
275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const
277 auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278 return rxt::testable_observable<T>(co);
283 :
public rxt::detail::test_subject_base<T>
285 typedef hot_observable<T> this_type;
286 std::shared_ptr<test_type::test_type_state> sc;
287 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288 typedef subscriber<T> observer_type;
289 mutable std::vector<recorded_type> mv;
290 mutable std::vector<rxn::subscription> sv;
291 mutable std::list<observer_type> observers;
292 mutable worker controller;
296 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
301 for (
auto& message : mv) {
302 auto n = message.value();
305 [
this, n](
const schedulable&) {
306 auto local = this->observers;
307 for (
auto& o : local) {
308 if (o.is_subscribed()) {
316 virtual ~hot_observable() {}
318 virtual void on_subscribe(observer_type o)
const {
319 auto olocation = observers.insert(observers.end(), o);
321 sv.push_back(rxn::subscription(sc->clock()));
322 auto index = sv.size() - 1;
324 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325 o.add([sharedThis, index, olocation]() {
326 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327 sharedThis->observers.erase(olocation);
331 virtual std::vector<rxn::subscription> subscriptions()
const {
335 virtual std::vector<recorded_type> messages()
const {
341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages)
const
343 auto worker = create_worker(composite_subscription());
344 auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
345 return rxt::testable_observable<T>(shared);
349 struct is_create_source_function
353 static auto check(
int) -> decltype((*(CF*)
nullptr)());
355 static not_void check(...);
357 static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
364 std::shared_ptr<detail::test_type> tester;
367 explicit test(std::shared_ptr<detail::test_type> t)
397 template<
typename Exception>
409 std::shared_ptr<detail::test_type::test_type_worker> tester;
422 long clock()
const {
return tester->clock();}
425 tester->schedule_absolute(when, a);
429 tester->schedule_relative(when, a);
432 template<
class Arg0,
class... ArgN>
434 ->
typename std::enable_if<
435 (detail::is_action_function<Arg0>::value ||
438 tester->schedule_absolute(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
441 template<
class Arg0,
class... ArgN>
443 ->
typename std::enable_if<
444 (detail::is_action_function<Arg0>::value ||
447 tester->schedule_relative(when,
make_schedulable(*
this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
452 tester->advance_to(time);
457 tester->advance_by(time);
465 template<
class T,
class F>
466 auto start(F createSource,
long created,
long subscribed,
long unsubscribed)
const
470 :
public std::enable_shared_from_this<state_type>
472 typedef decltype(createSource()) source_type;
474 std::unique_ptr<source_type> source;
483 auto state = std::make_shared<state_type>(this->make_subscriber<T>());
486 state->source.reset(
new typename state_type::source_type(createSource()));
489 state->source->subscribe(state->o);
492 state->o.unsubscribe();
500 template<
class T,
class F>
501 auto start(F&& createSource,
long unsubscribed)
const
507 template<
class T,
class F>
523 auto
start(F createSource,
long created,
long subscribed,
long unsubscribed) const
526 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
530 auto start(F createSource,
long unsubscribed)
const
531 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
538 ->
typename std::enable_if<detail::is_create_source_function<F>::value,
start_traits<F>>::type::subscriber_type
549 return tester->make_subscriber<T>();
553 clock_type::time_point
now()
const {
554 return tester->now();
558 return test_worker(cs, tester->create_test_type_worker_interface());
562 long clock()
const {
return tester->clock();}
565 return tester->to_time_point(absolute);
570 return tester->make_hot_observable(std::move(
messages));
573 template<
class T, std::
size_t size>
575 -> decltype(tester->make_hot_observable(std::vector<T>())) {
581 -> decltype(tester->make_hot_observable(std::vector<T>())) {
582 return tester->make_hot_observable(std::vector<T>(il));
587 return tester->make_cold_observable(std::move(
messages));
590 template<
class T, std::
size_t size>
592 -> decltype(tester->make_cold_observable(std::vector<T>())) {
598 -> decltype(tester->make_cold_observable(std::vector<T>())) {
599 return tester->make_cold_observable(std::vector<T>(il));
605 return test(std::make_shared<detail::test_type>());