RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23  struct not_void {};
24  template<class CS, class CT>
25  static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26  template<class CS, class CT>
27  static not_void check(...);
28 
29  typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30  static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
37  : public rxs::source_base<T>
38 {
39  struct state_type
40  : public std::enable_shared_from_this<state_type>
41  {
42  typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44  onsubscribe_type on_subscribe;
45  };
46  std::shared_ptr<state_type> state;
47 
48  template<class U>
49  friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51  template<class SO>
52  void construct(SO&& source, rxs::tag_source&&) {
53  rxu::decay_t<SO> so = std::forward<SO>(source);
54  state->on_subscribe = [so](subscriber<T> o) mutable {
55  so.on_subscribe(std::move(o));
56  };
57  }
58 
59  struct tag_function {};
60  template<class F>
61  void construct(F&& f, tag_function&&) {
62  state->on_subscribe = std::forward<F>(f);
63  }
64 
65 public:
66 
68 
70  {
71  }
72 
73  template<class SOF>
74  explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75  : state(std::make_shared<state_type>())
76  {
77  construct(std::forward<SOF>(sof),
78  typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79  }
80 
81  void on_subscribe(subscriber<T> o) const {
82  state->on_subscribe(std::move(o));
83  }
84 
85  template<class Subscriber>
86  typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
87  on_subscribe(Subscriber o) const {
88  state->on_subscribe(o.as_dynamic());
89  }
90 };
91 
92 template<class T>
93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94  return lhs.state == rhs.state;
95 }
96 template<class T>
97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98  return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
103  return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113  typedef typename SO::type type;
114  typedef typename type::value_type value_type;
115  static const bool value = true;
116  typedef observable<value_type, type> observable_type;
117  template<class... AN>
118  static observable_type make(const Default&, AN&&... an) {
119  return observable_type(type(std::forward<AN>(an)...));
120  }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125  static const bool value = false;
126  typedef Default observable_type;
127  template<class... AN>
128  static observable_type make(const observable_type& that, const AN&...) {
129  return that;
130  }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135  typedef typename SO::type type;
136  typedef typename type::value_type value_type;
137  static const bool value = true;
138  typedef observable<value_type, type> observable_type;
139  template<class... AN>
140  static observable_type make(AN&&... an) {
141  return observable_type(type(std::forward<AN>(an)...));
142  }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147  static const bool value = false;
148  typedef void observable_type;
149  template<class... AN>
150  static observable_type make(const AN&...) {
151  }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
158  : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
168 template<class T, class Observable>
170 {
171  template<class Obsvbl, class... ArgN>
172  static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173  -> void {
174  std::mutex lock;
175  std::condition_variable wake;
176  bool disposed = false;
178 
179  auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
180 
181  // keep any error to rethrow at the end.
182  auto scbr = make_subscriber<T>(
183  dest,
184  [&](T t){dest.on_next(t);},
185  [&](rxu::error_ptr e){
186  if (do_rethrow) {
187  error = e;
188  } else {
189  dest.on_error(e);
190  }
191  },
192  [&](){dest.on_completed();}
193  );
194 
195  auto cs = scbr.get_subscription();
196  cs.add(
197  [&](){
198  std::unique_lock<std::mutex> guard(lock);
199  wake.notify_one();
200  disposed = true;
201  });
202 
203  source.subscribe(std::move(scbr));
204 
205  std::unique_lock<std::mutex> guard(lock);
206  wake.wait(guard,
207  [&](){
208  return disposed;
209  });
210 
212  }
213 
214 public:
218  {
219  }
221 
237  template<class... ArgN>
238  auto subscribe(ArgN&&... an) const
239  -> void {
240  return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
241  }
242 
262  template<class... ArgN>
263  auto subscribe_with_rethrow(ArgN&&... an) const
264  -> void {
265  return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
266  }
267 
283  template<class... AN>
284  auto first(AN**...) -> delayed_type_t<T, AN...> const {
285  rxu::maybe<T> result;
288  cs,
289  [&](T v){result.reset(v); cs.unsubscribe();});
290  if (result.empty())
291  rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
292  return result.get();
293  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
294  }
295 
311  template<class... AN>
312  auto last(AN**...) -> delayed_type_t<T, AN...> const {
313  rxu::maybe<T> result;
315  [&](T v){result.reset(v);});
316  if (result.empty())
317  rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
318  return result.get();
319  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
320  }
321 
334  int count() const {
335  int result = 0;
336  source.count().as_blocking().subscribe_with_rethrow(
337  [&](int v){result = v;});
338  return result;
339  }
340 
358  T sum() const {
359  return source.sum().as_blocking().last();
360  }
361 
379  double average() const {
380  return source.average().as_blocking().last();
381  }
382 
400  T max() const {
401  return source.max().as_blocking().last();
402  }
403 
421  T min() const {
422  return source.min().as_blocking().last();
423  }
424 };
425 
426 namespace detail {
427 
428 template<class SourceOperator, class Subscriber>
429 struct safe_subscriber
430 {
431  safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
432 
433  void subscribe() {
434  RXCPP_TRY {
435  so->on_subscribe(*o);
436  } RXCPP_CATCH(...) {
437  if (!o->is_subscribed()) {
439  }
441  o->unsubscribe();
442  }
443  }
444 
445  void operator()(const rxsc::schedulable&) {
446  subscribe();
447  }
448 
449  SourceOperator* so;
450  Subscriber* o;
451 };
452 
453 }
454 
455 template<>
456 class observable<void, void>;
457 
477 template<class T, class SourceOperator>
479  : public observable_base<T>
480 {
481  static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
482 
484 
485 public:
488 
489 private:
490 
491  template<class U, class SO>
492  friend class observable;
493 
494  template<class U, class SO>
495  friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
496 
497  template<class Subscriber>
498  auto detail_subscribe(Subscriber o) const
500 
501  typedef rxu::decay_t<Subscriber> subscriber_type;
502 
503  static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
504  static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
505  static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
506 
507  trace_activity().subscribe_enter(*this, o);
508 
509  if (!o.is_subscribed()) {
510  trace_activity().subscribe_return(*this);
511  return o.get_subscription();
512  }
513 
514  detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
515 
516  // make sure to let current_thread take ownership of the thread as early as possible.
517  if (rxsc::current_thread::is_schedule_required()) {
518  const auto& sc = rxsc::make_current_thread();
519  sc.create_worker(o.get_subscription()).schedule(subscriber);
520  } else {
521  // current_thread already owns this thread.
522  subscriber.subscribe();
523  }
524 
525  trace_activity().subscribe_return(*this);
526  return o.get_subscription();
527  }
528 
529 public:
530  typedef T value_type;
531 
532  static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
533 
535  {
536  }
537 
539  {
540  }
541 
542  explicit observable(const source_operator_type& o)
543  : source_operator(o)
544  {
545  }
547  : source_operator(std::move(o))
548  {
549  }
550 
552  template<class SO>
555  {}
557  template<class SO>
559  : source_operator(std::move(o.source_operator))
560  {}
561 
562 #if 0
563  template<class I>
564  void on_subscribe(observer<T, I> o) const {
565  source_operator.on_subscribe(o);
566  }
567 #endif
568 
571  template<class... AN>
573  return *this;
574  static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
575  }
576 
579  template<class... AN>
581  return blocking_observable<T, this_type>(*this);
582  static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
583  }
584 
586 
592  template<class OperatorFactory>
593  auto op(OperatorFactory&& of) const
594  -> decltype(of(*(const this_type*)nullptr)) {
595  return of(*this);
596  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
597  }
598 
601  template<class ResultType, class Operator>
602  auto lift(Operator&& op) const
603  -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
604  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
605  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
606  static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
607  }
608 
614  template<class ResultType, class Operator>
615  auto lift_if(Operator&& op) const
616  -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
617  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
618  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
619  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
620  }
626  template<class ResultType, class Operator>
627  auto lift_if(Operator&&) const
628  -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
629  decltype(rxs::from<ResultType>())>::type {
630  return rxs::from<ResultType>();
631  }
633 
636  template<class... ArgN>
637  auto subscribe(ArgN&&... an) const
639  return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
640  }
641 
644  template<class... AN>
645  auto all(AN&&... an) const
647  -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
649  {
650  return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
651  }
652 
655  template<class... AN>
656  auto is_empty(AN&&... an) const
658  -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
660  {
661  return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
662  }
663 
666  template<class... AN>
667  auto any(AN&&... an) const
669  -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
671  {
672  return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
673  }
674 
677  template<class... AN>
678  auto exists(AN&&... an) const
680  -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
682  {
683  return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
684  }
685 
688  template<class... AN>
689  auto contains(AN&&... an) const
691  -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
693  {
694  return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
695  }
696 
699  template<class... AN>
700  auto filter(AN&&... an) const
702  -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
704  {
705  return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
706  }
707 
710  template<class... AN>
711  auto switch_if_empty(AN&&... an) const
713  -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
715  {
716  return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
717  }
718 
721  template<class... AN>
722  auto default_if_empty(AN&&... an) const
724  -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
726  {
727  return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
728  }
729 
732  template<class... AN>
733  auto sequence_equal(AN... an) const
735  -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
737  {
738  return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
739  }
740 
743  template<class... AN>
744  auto tap(AN&&... an) const
746  -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
748  {
749  return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
750  }
751 
754  template<class... AN>
755  auto time_interval(AN&&... an) const
757  -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
759  {
760  return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
761  }
762 
765  template<class... AN>
766  auto timeout(AN&&... an) const
768  -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
770  {
771  return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
772  }
773 
776  template<class... AN>
777  auto timestamp(AN&&... an) const
779  -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
781  {
782  return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
783  }
784 
787  template<class... AN>
788  auto finally(AN&&... an) const
790  -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
792  {
793  return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
794  }
795 
798  template<class... AN>
799  auto on_error_resume_next(AN&&... an) const
801  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
803  {
804  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
805  }
806 
809  template<class... AN>
810  auto switch_on_error(AN&&... an) const
812  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
814  {
815  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
816  }
817 
820  template<class... AN>
821  auto map(AN&&... an) const
823  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
825  {
826  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
827  }
828 
831  template<class... AN>
832  auto transform(AN&&... an) const
834  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
836  {
837  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
838  }
839 
842  template<class... AN>
843  auto debounce(AN&&... an) const
845  -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
847  {
848  return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
849  }
850 
853  template<class... AN>
854  auto delay(AN&&... an) const
856  -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
858  {
859  return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
860  }
861 
864  template<class... AN>
865  auto distinct(AN&&... an) const
867  -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
869  {
870  return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
871  }
872 
875  template<class... AN>
876  auto distinct_until_changed(AN&&... an) const
878  -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
880  {
881  return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
882  }
883 
886  template<class... AN>
887  auto element_at(AN&&... an) const
889  -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
891  {
892  return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
893  }
894 
897  template<class... AN>
898  auto window(AN&&... an) const
900  -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
902  {
903  return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
904  }
905 
908  template<class... AN>
909  auto window_with_time(AN&&... an) const
911  -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
913  {
914  return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
915  }
916 
919  template<class... AN>
920  auto window_with_time_or_count(AN&&... an) const
922  -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
924  {
925  return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
926  }
927 
930  template<class... AN>
931  auto window_toggle(AN&&... an) const
933  -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
935  {
936  return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
937  }
938 
941  template<class... AN>
942  auto buffer(AN&&... an) const
944  -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
946  {
947  return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
948  }
949 
952  template<class... AN>
953  auto buffer_with_time(AN&&... an) const
955  -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
957  {
958  return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
959  }
960 
963  template<class... AN>
964  auto buffer_with_time_or_count(AN&&... an) const
966  -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
968  {
969  return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
970  }
971 
974  template<class... AN>
975  auto switch_on_next(AN&&... an) const
977  -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
979  {
980  return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
981  }
982 
985  template<class... AN>
986  auto merge(AN... an) const
988  -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
990  {
991  return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
992  }
993 
996  template<class... AN>
997  auto merge_delay_error(AN... an) const
999  -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1001  {
1002  return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1003  }
1004 
1007  template<class... AN>
1008  auto amb(AN... an) const
1010  -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1012  {
1013  return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1014  }
1015 
1018  template<class... AN>
1019  auto flat_map(AN&&... an) const
1021  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1023  {
1024  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1025  }
1026 
1029  template<class... AN>
1030  auto merge_transform(AN&&... an) const
1032  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1034  {
1035  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1036  }
1037 
1040  template<class... AN>
1041  auto concat(AN... an) const
1043  -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1045  {
1046  return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1047  }
1048 
1051  template<class... AN>
1052  auto concat_map(AN&&... an) const
1054  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1056  {
1057  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1058  }
1059 
1062  template<class... AN>
1063  auto concat_transform(AN&&... an) const
1065  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1067  {
1068  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1069  }
1070 
1073  template<class... AN>
1074  auto with_latest_from(AN... an) const
1076  -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1078  {
1079  return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1080  }
1081 
1082 
1085  template<class... AN>
1086  auto combine_latest(AN... an) const
1088  -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1090  {
1091  return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1092  }
1093 
1096  template<class... AN>
1097  auto zip(AN&&... an) const
1099  -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1101  {
1102  return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1103  }
1104 
1107  template<class... AN>
1108  inline auto group_by(AN&&... an) const
1110  -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1112  {
1113  return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1114  }
1115 
1118  template<class... AN>
1119  auto ignore_elements(AN&&... an) const
1121  -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1123  {
1124  return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1125  }
1126 
1129  template<class... AN>
1130  auto multicast(AN&&... an) const
1132  -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1134  {
1135  return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1136  }
1137 
1140  template<class... AN>
1141  auto publish(AN&&... an) const
1143  -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1145  {
1146  return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1147  }
1148 
1151  template<class... AN>
1152  auto publish_synchronized(AN&&... an) const
1154  -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1156  {
1157  return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1158  }
1159 
1162  template<class... AN>
1163  auto replay(AN&&... an) const
1165  -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1167  {
1168  return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1169  }
1170 
1173  template<class... AN>
1174  auto subscribe_on(AN&&... an) const
1176  -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1178  {
1179  return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1180  }
1181 
1184  template<class... AN>
1185  auto observe_on(AN&&... an) const
1187  -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1189  {
1190  return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1191  }
1192 
1195  template<class... AN>
1196  auto reduce(AN&&... an) const
1198  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1200  {
1201  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1202  }
1203 
1206  template<class... AN>
1207  auto accumulate(AN&&... an) const
1209  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1211  {
1212  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1213  }
1214 
1217  template<class... AN>
1218  auto first(AN**...) const
1222  {
1224  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1225  }
1226 
1229  template<class... AN>
1230  auto last(AN**...) const
1234  {
1236  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1237  }
1238 
1241  template<class... AN>
1242  auto count(AN**...) const
1246  {
1248  static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1249  }
1250 
1253  template<class... AN>
1254  auto sum(AN**...) const
1258  {
1260  static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1261  }
1262 
1265  template<class... AN>
1266  auto average(AN**...) const
1270  {
1272  static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1273  }
1274 
1277  template<class... AN>
1278  auto max(AN**...) const
1282  {
1284  static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1285  }
1286 
1289  template<class... AN>
1290  auto min(AN**...) const
1294  {
1296  static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1297  }
1298 
1301  template<class... AN>
1302  auto scan(AN... an) const
1304  -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1306  {
1307  return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1308  }
1309 
1312  template<class... AN>
1313  auto sample_with_time(AN&&... an) const
1315  -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1317  {
1318  return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1319  }
1320 
1323  template<class... AN>
1324  auto skip(AN... an) const
1326  -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1328  {
1329  return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1330  }
1331 
1334  template<class... AN>
1335  auto skip_while(AN... an) const
1337  -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1339  {
1340  return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1341  }
1342 
1345  template<class... AN>
1346  auto skip_last(AN... an) const
1348  -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1350  {
1351  return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1352  }
1353 
1356  template<class... AN>
1357  auto skip_until(AN... an) const
1359  -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1361  {
1362  return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1363  }
1364 
1367  template<class... AN>
1368  auto take(AN... an) const
1370  -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1372  {
1373  return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1374  }
1375 
1378  template<class... AN>
1379  auto take_last(AN&&... an) const
1381  -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1383  {
1384  return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1385  }
1386 
1389  template<class... AN>
1390  auto take_until(AN&&... an) const
1392  -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1394  {
1395  return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1396  }
1397 
1400  template<class... AN>
1401  auto take_while(AN&&... an) const
1403  -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1405  {
1406  return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1407  }
1408 
1411  template<class... AN>
1412  auto repeat(AN... an) const
1414  -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1416  {
1417  return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1418  }
1419 
1422  template<class... AN>
1423  auto retry(AN... an) const
1425  -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1427  {
1428  return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1429  }
1430 
1433  template<class... AN>
1434  auto start_with(AN... an) const
1436  -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1438  {
1439  return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1440  }
1441 
1444  template<class... AN>
1445  auto pairwise(AN... an) const
1447  -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1449  {
1450  return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1451  }
1452 };
1453 
1454 template<class T, class SourceOperator>
1456  return lhs.source_operator == rhs.source_operator;
1457 }
1458 template<class T, class SourceOperator>
1460  return !(lhs == rhs);
1461 }
1462 
1545 template<>
1546 class observable<void, void>
1547 {
1548  ~observable();
1549 public:
1552  template<class T, class OnSubscribe>
1553  static auto create(OnSubscribe os)
1554  -> decltype(rxs::create<T>(std::move(os))) {
1555  return rxs::create<T>(std::move(os));
1556  }
1557 
1560  template<class T>
1561  static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1562  -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1563  return rxs::range<T>(first, last, step, identity_current_thread());
1564  }
1567  template<class T, class Coordination>
1568  static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1569  -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1570  return rxs::range<T>(first, last, step, std::move(cn));
1571  }
1574  template<class T, class Coordination>
1575  static auto range(T first, T last, Coordination cn)
1576  -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1577  return rxs::range<T>(first, last, std::move(cn));
1578  }
1581  template<class T, class Coordination>
1582  static auto range(T first, Coordination cn)
1583  -> decltype(rxs::range<T>(first, std::move(cn))) {
1584  return rxs::range<T>(first, std::move(cn));
1585  }
1586 
1589  template<class T>
1590  static auto never()
1591  -> decltype(rxs::never<T>()) {
1592  return rxs::never<T>();
1593  }
1594 
1597  template<class ObservableFactory>
1598  static auto defer(ObservableFactory of)
1599  -> decltype(rxs::defer(std::move(of))) {
1600  return rxs::defer(std::move(of));
1601  }
1602 
1605  template<class... AN>
1606  static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1607  -> decltype(rxs::interval(period)) {
1608  return rxs::interval(period);
1609  static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1610  }
1613  template<class Coordination>
1614  static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1615  -> decltype(rxs::interval(period, std::move(cn))) {
1616  return rxs::interval(period, std::move(cn));
1617  }
1620  template<class... AN>
1621  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1622  -> decltype(rxs::interval(initial, period)) {
1623  return rxs::interval(initial, period);
1624  static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1625  }
1628  template<class Coordination>
1629  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1630  -> decltype(rxs::interval(initial, period, std::move(cn))) {
1631  return rxs::interval(initial, period, std::move(cn));
1632  }
1633 
1636  template<class... AN>
1637  static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1638  -> decltype(rxs::timer(at)) {
1639  return rxs::timer(at);
1640  static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1641  }
1644  template<class... AN>
1645  static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1646  -> decltype(rxs::timer(after)) {
1647  return rxs::timer(after);
1648  static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1649  }
1652  template<class Coordination>
1653  static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1654  -> decltype(rxs::timer(when, std::move(cn))) {
1655  return rxs::timer(when, std::move(cn));
1656  }
1659  template<class Coordination>
1660  static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1661  -> decltype(rxs::timer(when, std::move(cn))) {
1662  return rxs::timer(when, std::move(cn));
1663  }
1664 
1667  template<class Collection>
1668  static auto iterate(Collection c)
1669  -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1670  return rxs::iterate(std::move(c), identity_current_thread());
1671  }
1674  template<class Collection, class Coordination>
1675  static auto iterate(Collection c, Coordination cn)
1676  -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1677  return rxs::iterate(std::move(c), std::move(cn));
1678  }
1679 
1682  template<class T>
1683  static auto from()
1684  -> decltype( rxs::from<T>()) {
1685  return rxs::from<T>();
1686  }
1689  template<class T, class Coordination>
1690  static auto from(Coordination cn)
1691  -> typename std::enable_if<is_coordination<Coordination>::value,
1692  decltype( rxs::from<T>(std::move(cn)))>::type {
1693  return rxs::from<T>(std::move(cn));
1694  }
1697  template<class Value0, class... ValueN>
1698  static auto from(Value0 v0, ValueN... vn)
1699  -> typename std::enable_if<!is_coordination<Value0>::value,
1700  decltype( rxs::from(v0, vn...))>::type {
1701  return rxs::from(v0, vn...);
1702  }
1705  template<class Coordination, class Value0, class... ValueN>
1706  static auto from(Coordination cn, Value0 v0, ValueN... vn)
1707  -> typename std::enable_if<is_coordination<Coordination>::value,
1708  decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1709  return rxs::from(std::move(cn), v0, vn...);
1710  }
1711 
1714  template<class T>
1715  static auto just(T v)
1716  -> decltype(rxs::just(std::move(v))) {
1717  return rxs::just(std::move(v));
1718  }
1721  template<class T, class Coordination>
1722  static auto just(T v, Coordination cn)
1723  -> decltype(rxs::just(std::move(v), std::move(cn))) {
1724  return rxs::just(std::move(v), std::move(cn));
1725  }
1726 
1729  template<class Observable, class Value0, class... ValueN>
1730  static auto start_with(Observable o, Value0 v0, ValueN... vn)
1731  -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1732  return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1733  }
1734 
1737  template<class T>
1738  static auto empty()
1739  -> decltype(from<T>()) {
1740  return from<T>();
1741  }
1744  template<class T, class Coordination>
1745  static auto empty(Coordination cn)
1746  -> decltype(from<T>(std::move(cn))) {
1747  return from<T>(std::move(cn));
1748  }
1749 
1752  template<class T, class Exception>
1753  static auto error(Exception&& e)
1754  -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1755  return rxs::error<T>(std::forward<Exception>(e));
1756  }
1759  template<class T, class Exception, class Coordination>
1760  static auto error(Exception&& e, Coordination cn)
1761  -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1762  return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1763  }
1764 
1767  template<class ResourceFactory, class ObservableFactory>
1768  static auto scope(ResourceFactory rf, ObservableFactory of)
1769  -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1770  return rxs::scope(std::move(rf), std::move(of));
1771  }
1772 };
1773 
1774 }
1775 
1776 //
1777 // support range() >> filter() >> subscribe() syntax
1778 // '>>' is spelled 'stream'
1779 //
1780 template<class T, class SourceOperator, class OperatorFactory>
1781 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1782  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1783  return source.op(std::forward<OperatorFactory>(of));
1784 }
1785 
1786 //
1787 // support range() | filter() | subscribe() syntax
1788 // '|' is spelled 'pipe'
1789 //
1790 template<class T, class SourceOperator, class OperatorFactory>
1791 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1792  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1793  return source.op(std::forward<OperatorFactory>(of));
1794 }
1795 
1796 #endif
rxcpp::dynamic_observable::operator==
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
rxcpp::operators::AN
auto AN
Definition: rx-finally.hpp:105
rx-includes.hpp
rxcpp::observable::concat_transform
auto concat_transform(AN &&... an) const
Definition: rx-observable.hpp:1063
rxcpp::observable< void, void >::error
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Definition: rx-observable.hpp:1753
rxcpp::sources::iterate
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Definition: rx-iterate.hpp:160
rxcpp::is_empty_tag
Definition: rx-operators.hpp:117
rxcpp::operators::max
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
rxcpp::observable< void, void >::from
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1683
rxcpp::blocking_observable::observable_type
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:215
rxcpp::observer
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
rxcpp::observable::debounce
auto debounce(AN &&... an) const
Definition: rx-observable.hpp:843
rxcpp::util::error_ptr
std::shared_ptr< util::detail::error_base > error_ptr
Definition: rx-util.hpp:874
rxcpp::blocking_observable::sum
T sum() const
Definition: rx-observable.hpp:358
rxcpp::sources::source_base
Definition: rx-sources.hpp:17
rxcpp::amb_tag
Definition: rx-operators.hpp:103
rxcpp::identity_for
Definition: rx-predef.hpp:302
rxcpp::start_with_tag
Definition: rx-operators.hpp:401
rxcpp::sources::scope
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Definition: rx-scope.hpp:114
rxcpp::observable< void, void >::error
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Definition: rx-observable.hpp:1760
rxcpp::defer_observable
Definition: rx-observable.hpp:157
rxcpp::subscriber
binds an observer that consumes values with a composite_subscription that controls lifetime.
Definition: rx-subscriber.hpp:25
rxcpp::util::rethrow_current_exception
RXCPP_NORETURN void rethrow_current_exception()
Definition: rx-util.hpp:933
rxcpp::ignore_elements_tag
Definition: rx-operators.hpp:241
rxcpp::blocking_observable::first
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:284
rxcpp::observable::combine_latest
auto combine_latest(AN... an) const
Definition: rx-observable.hpp:1086
rxcpp::observable< void, void >::timer
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Definition: rx-observable.hpp:1653
rxcpp::observable::exists
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:678
rxcpp::skip_until_tag
Definition: rx-operators.hpp:394
rxcpp::sources::timer
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Definition: rx-timer.hpp:114
rxcpp::observable::any
auto any(AN &&... an) const
Definition: rx-observable.hpp:667
rxcpp::observable::distinct
auto distinct(AN &&... an) const
Definition: rx-observable.hpp:865
rxcpp::blocking_observable::max
T max() const
Definition: rx-observable.hpp:400
rxcpp::group_by_tag
Definition: rx-operators.hpp:234
rxcpp::operators::start_with
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Definition: rx-start_with.hpp:53
rxcpp::contains_tag
Definition: rx-operators.hpp:127
rxcpp::observable< void, void >::interval
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Definition: rx-observable.hpp:1614
rxcpp::empty_error
Definition: rx-operators.hpp:289
rxcpp::blocking_observable::last
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:312
rxcpp::all_tag
Definition: rx-operators.hpp:110
rxcpp::observable::as_blocking
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
rxcpp::operators::is_operator
Definition: rx-operators.hpp:38
rxcpp::observable< void, void >::interval
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Definition: rx-observable.hpp:1621
rxcpp::observable< void, void >::range
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Definition: rx-observable.hpp:1561
rxcpp::observable::with_latest_from
auto with_latest_from(AN... an) const
Definition: rx-observable.hpp:1074
rxcpp::observable::tap
auto tap(AN &&... an) const
Definition: rx-observable.hpp:744
rxcpp::blocking_observable::~blocking_observable
~blocking_observable()
Definition: rx-observable.hpp:217
rxcpp::zip_tag
Definition: rx-operators.hpp:521
rxcpp::util::decay_t
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:48
rxcpp::buffer_count_tag
Definition: rx-operators.hpp:129
rxcpp::observable::take
auto take(AN... an) const
Definition: rx-observable.hpp:1368
rxcpp::trace_activity
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
rxcpp::observable::window_toggle
auto window_toggle(AN &&... an) const
Definition: rx-observable.hpp:931
rxcpp::observable::buffer
auto buffer(AN &&... an) const
Definition: rx-observable.hpp:942
rxcpp::observable::observable
observable(const source_operator_type &o)
Definition: rx-observable.hpp:542
rxcpp::sources::is_source
Definition: rx-sources.hpp:23
rxcpp::filter_tag
Definition: rx-operators.hpp:213
rxcpp::observable::observe_on
auto observe_on(AN &&... an) const
Definition: rx-observable.hpp:1185
rxcpp::take_tag
Definition: rx-operators.hpp:430
rxcpp::debounce_tag
Definition: rx-operators.hpp:178
rxcpp::observe_on_tag
Definition: rx-operators.hpp:275
rxcpp::timeout_tag
Definition: rx-operators.hpp:465
rxcpp::observable::~observable
~observable()
Definition: rx-observable.hpp:534
rxcpp::observable_member
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxcpp::observable::accumulate
auto accumulate(AN &&... an) const
Definition: rx-observable.hpp:1207
rxcpp::observable::retry
auto retry(AN... an) const
Definition: rx-observable.hpp:1423
rxcpp::util::make_error_ptr
error_ptr make_error_ptr(error_ptr e)
Definition: rx-util.hpp:883
rxcpp::observable::skip_last
auto skip_last(AN... an) const
Definition: rx-observable.hpp:1346
rxcpp::is_dynamic_observable
Definition: rx-predef.hpp:128
rxcpp::default_if_empty_tag
Definition: rx-operators.hpp:421
rxcpp::skip_tag
Definition: rx-operators.hpp:373
rxcpp::multicast_tag
Definition: rx-operators.hpp:268
rxcpp::tag_dynamic_observable
Definition: rx-predef.hpp:126
rxcpp::switch_if_empty_tag
Definition: rx-operators.hpp:415
rxcpp::observable::all
auto all(AN &&... an) const
Definition: rx-observable.hpp:645
rxcpp::observable< void, void >::from
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1698
rxcpp::observable::take_until
auto take_until(AN &&... an) const
Definition: rx-observable.hpp:1390
rxcpp::observable_base
Definition: rx-predef.hpp:156
rxcpp::observable::observable
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:558
rxcpp::observable< void, void >::defer
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Definition: rx-observable.hpp:1598
rxcpp::observable::source_operator_type
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:486
rxcpp::observable< void, void >::start_with
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1730
rxcpp::observable< void, void >::range
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Definition: rx-observable.hpp:1582
rxcpp::retry_tag
Definition: rx-operators.hpp:345
rxcpp::sources::tag_source
Definition: rx-sources.hpp:15
rxcpp::blocking_observable::subscribe
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:238
rxcpp::observable::group_by
auto group_by(AN &&... an) const
Definition: rx-observable.hpp:1108
rxcpp::on_error_resume_next_tag
Definition: rx-operators.hpp:282
rxcpp::observable::repeat
auto repeat(AN... an) const
Definition: rx-observable.hpp:1412
rxcpp::observable::skip_while
auto skip_while(AN... an) const
Definition: rx-observable.hpp:1335
rxcpp::buffer_with_time_tag
Definition: rx-operators.hpp:136
rxcpp::observable::operator==
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
rxcpp::observable::scan
auto scan(AN... an) const
Definition: rx-observable.hpp:1302
rxcpp::observable::on_error_resume_next
auto on_error_resume_next(AN &&... an) const
Definition: rx-observable.hpp:799
rxcpp::map_tag
Definition: rx-operators.hpp:248
rxcpp::operator!=
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
rxcpp::observable::amb
auto amb(AN... an) const
Definition: rx-observable.hpp:1008
rxcpp::composite_subscription::unsubscribe
void unsubscribe() const
Definition: rx-subscription.hpp:178
rxcpp::blocking_observable::subscribe_with_rethrow
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:263
rxcpp::concat_tag
Definition: rx-operators.hpp:157
rxcpp::observable::merge_transform
auto merge_transform(AN &&... an) const
Definition: rx-observable.hpp:1030
rxcpp::observable::sum
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
rxcpp::merge_tag
Definition: rx-operators.hpp:255
rxcpp::observable::skip
auto skip(AN... an) const
Definition: rx-observable.hpp:1324
rxcpp::identity_current_thread
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
rxcpp::observable< void, void >::never
static auto never() -> decltype(rxs::never< T >())
Definition: rx-observable.hpp:1590
rxcpp::distinct_until_changed_tag
Definition: rx-operators.hpp:199
rxcpp::replay_tag
Definition: rx-operators.hpp:338
rxcpp::take_until_tag
Definition: rx-operators.hpp:451
rxcpp::blocking_observable::min
T min() const
Definition: rx-observable.hpp:421
rxcpp::delayed_type
Definition: rx-operators.hpp:57
rxcpp::observable::timeout
auto timeout(AN &&... an) const
Definition: rx-observable.hpp:766
rxcpp::observable::flat_map
auto flat_map(AN &&... an) const
Definition: rx-observable.hpp:1019
cpplinq::from
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
rxcpp::blocking_observable
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
rxcpp::util::throw_exception
RXCPP_NORETURN void throw_exception(E &&e)
Definition: rx-util.hpp:920
rxcpp::observable::replay
auto replay(AN &&... an) const
Definition: rx-observable.hpp:1163
rxcpp::observable::window_with_time
auto window_with_time(AN &&... an) const
Definition: rx-observable.hpp:909
rxcpp::observable::zip
auto zip(AN &&... an) const
Definition: rx-observable.hpp:1097
rxcpp::observable::skip_until
auto skip_until(AN... an) const
Definition: rx-observable.hpp:1357
rxcpp::sources::defer
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Definition: rx-defer.hpp:73
rxcpp::observable::count
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
rxcpp::sources::error
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< rxu::error_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Definition: rx-error.hpp:117
rxcpp::observable::reduce
auto reduce(AN &&... an) const
Definition: rx-observable.hpp:1196
rxcpp::observable::merge_delay_error
auto merge_delay_error(AN... an) const
Definition: rx-observable.hpp:997
rxcpp::observable::timestamp
auto timestamp(AN &&... an) const
Definition: rx-observable.hpp:777
rxcpp::tap_tag
Definition: rx-operators.hpp:458
rxcpp::concat_map_tag
Definition: rx-operators.hpp:164
rxcpp::combine_latest_tag
Definition: rx-operators.hpp:150
rxcpp::observable< void, void >::just
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1722
rxcpp::dynamic_observable
Definition: rx-observable.hpp:36
rxcpp::observable< void, void >::range
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Definition: rx-observable.hpp:1568
rxcpp::observable< void, void >::empty
static auto empty() -> decltype(from< T >())
Definition: rx-observable.hpp:1738
rxcpp::observable::is_empty
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false.
Definition: rx-observable.hpp:656
rxcpp::observable::time_interval
auto time_interval(AN &&... an) const
Definition: rx-observable.hpp:755
rxcpp::exists_tag
Definition: rx-operators.hpp:126
rxcpp::observable< void, void >::iterate
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Definition: rx-observable.hpp:1675
rxcpp::make_observable_dynamic
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
rxcpp::distinct_tag
Definition: rx-operators.hpp:192
rxcpp::time_interval_tag
Definition: rx-operators.hpp:472
rxcpp::observable::delay
auto delay(AN &&... an) const
Definition: rx-observable.hpp:854
rxcpp::element_at_tag
Definition: rx-operators.hpp:206
rxcpp::observable::last
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
rxcpp::observable::distinct_until_changed
auto distinct_until_changed(AN &&... an) const
Definition: rx-observable.hpp:876
rxcpp::operators::subscribe
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Definition: rx-subscribe.hpp:87
rxcpp::window_tag
Definition: rx-operators.hpp:486
rxcpp::observable::observable
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:553
rxcpp::observable::buffer_with_time
auto buffer_with_time(AN &&... an) const
Definition: rx-observable.hpp:953
rxcpp::subscribe_on_tag
Definition: rx-operators.hpp:408
rxcpp::observable::take_last
auto take_last(AN &&... an) const
Definition: rx-observable.hpp:1379
rxcpp::observable< void, void >::empty
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Definition: rx-observable.hpp:1745
rxcpp::util::rethrow_exception
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
rxcpp::observable::buffer_with_time_or_count
auto buffer_with_time_or_count(AN &&... an) const
Definition: rx-observable.hpp:964
rxcpp::sources::interval
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Definition: rx-interval.hpp:113
rxcpp::observable::concat_map
auto concat_map(AN &&... an) const
Definition: rx-observable.hpp:1052
rxcpp::finally_tag
Definition: rx-operators.hpp:220
rxcpp::observable::max
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-observable.hpp:1278
rxcpp::observable::switch_if_empty
auto switch_if_empty(AN &&... an) const
Definition: rx-observable.hpp:711
rxcpp::schedulers::make_current_thread
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
rxcpp::any_tag
Definition: rx-operators.hpp:119
operator>>
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1781
rxcpp::observable< void, void >::interval
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Definition: rx-observable.hpp:1606
rxcpp::observable::multicast
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1130
rxcpp::observable< void, void >::create
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Definition: rx-observable.hpp:1553
rxcpp::window_with_time_tag
Definition: rx-operators.hpp:493
rxcpp::observable::subscribe
auto subscribe(ArgN &&... an) const -> composite_subscription
Definition: rx-observable.hpp:637
rxcpp::observable::element_at
auto element_at(AN &&... an) const
Definition: rx-observable.hpp:887
rxcpp::observable< void, void >::timer
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Definition: rx-observable.hpp:1645
rxcpp::util::current_exception
error_ptr current_exception()
Definition: rx-util.hpp:943
rxcpp::dynamic_observable::dynamic_observable_tag
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
rxcpp::switch_on_next_tag
Definition: rx-operators.hpp:423
rxcpp::observable::sample_with_time
auto sample_with_time(AN &&... an) const
Definition: rx-observable.hpp:1313
rxcpp::timestamp_tag
Definition: rx-operators.hpp:479
rxcpp::buffer_with_time_or_count_tag
Definition: rx-operators.hpp:143
rxcpp::take_while_tag
Definition: rx-operators.hpp:444
rxcpp::sequence_equal_tag
Definition: rx-operators.hpp:366
rxcpp::with_latest_from_tag
Definition: rx-operators.hpp:514
rxcpp::publish_tag
Definition: rx-operators.hpp:323
rxcpp::observable::sequence_equal
auto sequence_equal(AN... an) const
Definition: rx-observable.hpp:733
rxcpp::observable< void, void >::just
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1715
rxcpp::sample_with_time_tag
Definition: rx-operators.hpp:352
rxcpp::is_subscriber
Definition: rx-predef.hpp:115
rxcpp::observable::subscribe_on
auto subscribe_on(AN &&... an) const
Definition: rx-observable.hpp:1174
rxcpp::observable::window
auto window(AN &&... an) const
Definition: rx-observable.hpp:898
rxcpp::dynamic_observable::on_subscribe
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
rxcpp::observable::observable
observable(source_operator_type &&o)
Definition: rx-observable.hpp:546
rxcpp::reduce_tag
Definition: rx-operators.hpp:296
rxcpp::skip_while_tag
Definition: rx-operators.hpp:380
rxcpp::merge_delay_error_tag
Definition: rx-operators.hpp:261
rxcpp::window_toggle_tag
Definition: rx-operators.hpp:507
rxcpp::observable::publish
auto publish(AN &&... an) const
Definition: rx-observable.hpp:1141
rxcpp::observable::transform
auto transform(AN &&... an) const
Definition: rx-observable.hpp:832
rxcpp::dynamic_observable::dynamic_observable
dynamic_observable()
Definition: rx-observable.hpp:69
rxcpp::window_with_time_or_count_tag
Definition: rx-operators.hpp:500
rxcpp::flat_map_tag
Definition: rx-operators.hpp:227
rxcpp::observable::map
auto map(AN &&... an) const
Definition: rx-observable.hpp:821
rxcpp::is_operator_factory_for
Definition: rx-predef.hpp:270
rxcpp::observable::observable
observable()
Definition: rx-observable.hpp:538
rxcpp::delayed_type_t
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
rxcpp::delay_tag
Definition: rx-operators.hpp:185
rxcpp::observable< void, void >::timer
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Definition: rx-observable.hpp:1660
rxcpp::skip_last_tag
Definition: rx-operators.hpp:387
RXCPP_CATCH
#define RXCPP_CATCH(...)
Definition: rx-util.hpp:39
rxcpp::blocking_observable::count
int count() const
Definition: rx-observable.hpp:334
rxcpp::observable::value_type
T value_type
Definition: rx-observable.hpp:530
rxcpp::observable::contains
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item,...
Definition: rx-observable.hpp:689
rxcpp::observable< void, void >::timer
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Definition: rx-observable.hpp:1637
rxcpp::observable::default_if_empty
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes.
Definition: rx-observable.hpp:722
rxcpp::observable< void, void >::from
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1690
rxcpp::blocking_observable::source
observable_type source
Definition: rx-observable.hpp:216
rxcpp::observable::ignore_elements
auto ignore_elements(AN &&... an) const
Definition: rx-observable.hpp:1119
rxcpp::dynamic_observable::dynamic_observable
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void ** >::type=0)
Definition: rx-observable.hpp:74
rxcpp::observable::take_while
auto take_while(AN &&... an) const
Definition: rx-observable.hpp:1401
rxcpp::observable::merge
auto merge(AN... an) const
Definition: rx-observable.hpp:986
rxcpp::observable::first
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
rxcpp::observable::source_operator
source_operator_type source_operator
Definition: rx-observable.hpp:487
rxcpp::observable::switch_on_next
auto switch_on_next(AN &&... an) const
Definition: rx-observable.hpp:975
rxcpp::take_last_tag
Definition: rx-operators.hpp:437
rxcpp::observable< void, void >::interval
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Definition: rx-observable.hpp:1629
RXCPP_TRY
#define RXCPP_TRY
Definition: rx-util.hpp:38
rxcpp::blocking_observable::average
double average() const
Definition: rx-observable.hpp:379
rxcpp::observable< void, void >::scope
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Definition: rx-observable.hpp:1768
rxcpp::observable::filter
auto filter(AN &&... an) const
Definition: rx-observable.hpp:700
rxcpp::composite_subscription
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:459
rxcpp::pairwise_tag
Definition: rx-operators.hpp:316
rxcpp::operators::lift
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
operator|
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1791
rxcpp::observable::concat
auto concat(AN... an) const
Definition: rx-observable.hpp:1041
rxcpp::observable
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
rxcpp::observable::as_dynamic
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:572
rxcpp::observable::pairwise
auto pairwise(AN... an) const
Definition: rx-observable.hpp:1445
rxcpp::observable::average
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
rxcpp::sources::just
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
rxcpp::observable< void, void >::iterate
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Definition: rx-observable.hpp:1668
rxcpp
Definition: rx-all.hpp:26
rxcpp::observable::start_with
auto start_with(AN... an) const
Definition: rx-observable.hpp:1434
rxcpp::observable::min
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-observable.hpp:1290
rxcpp::observable< void, void >::from
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1706
rxcpp::repeat_tag
Definition: rx-operators.hpp:331
rxcpp::operator==
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
rxcpp::blocking_observable::blocking_observable
blocking_observable(observable_type s)
Definition: rx-observable.hpp:220
rxcpp::observable::window_with_time_or_count
auto window_with_time_or_count(AN &&... an) const
Definition: rx-observable.hpp:920
rxcpp::util::count
Definition: rx-util.hpp:416
rxcpp::observable::publish_synchronized
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
Definition: rx-observable.hpp:1152
rxcpp::observable< void, void >::range
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Definition: rx-observable.hpp:1575
rxcpp::dynamic_observable::on_subscribe
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
rxcpp::publish_synchronized_tag
Definition: rx-operators.hpp:329
rxcpp::scan_tag
Definition: rx-operators.hpp:359
rxcpp::observable::switch_on_error
auto switch_on_error(AN &&... an) const
Definition: rx-observable.hpp:810