5 #if !defined(RXCPP_RX_NOTIFICATION_HPP)
6 #define RXCPP_RX_NOTIFICATION_HPP
12 namespace notifications {
21 : s(s), u(std::numeric_limits<long>::
max()) {
46 struct notification_base
47 :
public std::enable_shared_from_this<notification_base<T>>
50 typedef std::shared_ptr<notification_base<T>> type;
52 virtual ~notification_base() {}
54 virtual void out(std::ostream& out)
const =0;
55 virtual bool equals(
const type& other)
const = 0;
56 virtual void accept(
const observer_type& o)
const =0;
60 std::ostream&
operator<< (std::ostream& out,
const std::vector<T>& v);
63 auto to_stream(std::ostream& os,
const T& t,
int,
int)
64 -> decltype(os << t) {
70 std::ostream& to_stream(std::ostream& os,
const T&,
int, ...) {
71 return os <<
"< " <<
typeid(T).name() <<
" does not support ostream>";
76 std::ostream& to_stream(std::ostream& os,
const T&, ...) {
77 return os <<
"<the value does not support ostream>";
81 inline std::ostream& ostreamvector (std::ostream& os,
const std::vector<T>& v) {
90 to_stream(os, i, 0, 0);
97 inline std::ostream&
operator<< (std::ostream& os,
const std::vector<T>& v) {
98 return ostreamvector(os, v);
102 auto equals(
const T& lhs,
const T& rhs,
int)
103 -> decltype(
bool(lhs == rhs)) {
108 bool equals(
const T&,
const T&, ...) {
118 typedef typename detail::notification_base<T>::type
type;
122 typedef detail::notification_base<T> base;
124 struct on_next_notification :
public base {
125 on_next_notification(T value) : value(std::move(value)) {
127 on_next_notification(
const on_next_notification& o) : value(o.value) {}
128 on_next_notification(
const on_next_notification&& o) : value(std::move(o.value)) {}
129 on_next_notification& operator=(on_next_notification o) { value = std::move(o.value);
return *
this; }
130 virtual void out(std::ostream& os)
const {
132 detail::to_stream(os, value, 0, 0);
135 virtual bool equals(
const typename base::type& other)
const {
137 other->accept(make_subscriber<T>(make_observer_dynamic<T>([
this, &result](T v) {
138 result = detail::equals(this->value, v, 0);
148 struct on_error_notification :
public base {
151 on_error_notification(
const on_error_notification& o) : ep(o.ep) {}
152 on_error_notification(
const on_error_notification&& o) : ep(std::move(o.ep)) {}
153 on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep);
return *
this; }
154 virtual void out(std::ostream& os)
const {
159 virtual bool equals(
const typename base::type& other)
const {
162 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](
rxu::error_ptr){
167 virtual void accept(
const typename base::observer_type& o)
const {
173 struct on_completed_notification :
public base {
174 on_completed_notification() {
176 virtual void out(std::ostream& os)
const {
177 os <<
"on_completed()";
179 virtual bool equals(
const typename base::type& other)
const {
181 other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
186 virtual void accept(
const typename base::observer_type& o)
const {
191 struct exception_tag {};
193 template<
typename Exception>
195 type make_on_error(exception_tag&&, Exception&& e) {
197 return std::make_shared<on_error_notification>(ep);
200 struct exception_ptr_tag {};
204 return std::make_shared<on_error_notification>(ep);
210 return std::make_shared<on_next_notification>(std::move(value));
214 return std::make_shared<on_completed_notification>();
217 template<
typename Exception>
219 return make_on_error(
typename std::conditional<
221 exception_ptr_tag, exception_tag>::
type(),
222 std::forward<Exception>(e));
227 bool operator == (
const std::shared_ptr<detail::notification_base<T>>& lhs,
const std::shared_ptr<detail::notification_base<T>>& rhs) {
228 if (!lhs && !rhs) {
return true;}
229 if (!lhs || !rhs) {
return false;}
230 return lhs->equals(rhs);
234 std::ostream&
operator<< (std::ostream& os,
const std::shared_ptr<detail::notification_base<T>>& n) {
264 out <<
"@" << r.
time() <<
"-" << r.
value();
269 namespace rxn=notifications;
271 inline std::ostream&
operator<< (std::ostream& out,
const std::vector<rxcpp::notifications::subscription>& vs) {
272 return rxcpp::notifications::detail::ostreamvector(out, vs);
276 return rxcpp::notifications::detail::ostreamvector(out, vr);