20 #if !defined(RXCPP_OPERATORS_RX_SKIP_LAST_HPP)
21 #define RXCPP_OPERATORS_RX_SKIP_LAST_HPP
23 #include "../rx-includes.hpp"
32 struct skip_last_invalid_arguments {};
35 struct skip_last_invalid :
public rxo::operator_base<skip_last_invalid_arguments<AN...>> {
36 using type = observable<skip_last_invalid_arguments<
AN...>, skip_last_invalid<
AN...>>;
39 using skip_last_invalid_t =
typename skip_last_invalid<
AN...>::type;
41 template<
class T,
class Observable,
class Count>
42 struct skip_last :
public operator_base<T>
44 typedef rxu::decay_t<Observable> source_type;
45 typedef rxu::decay_t<Count> count_type;
47 typedef std::queue<T> queue_type;
48 typedef typename queue_type::size_type queue_size_type;
52 values(source_type s, count_type t)
53 : source(std::move(s))
54 ,
count(
static_cast<queue_size_type
>(t))
58 queue_size_type
count;
63 : initial(std::move(s), std::move(t))
67 template<
class Subscriber>
68 void on_subscribe(
const Subscriber& s)
const {
70 typedef Subscriber output_type;
72 :
public std::enable_shared_from_this<state_type>
75 state_type(
const values& i,
const output_type& oarg)
84 auto state = std::make_shared<state_type>(initial, s);
86 composite_subscription source_lifetime;
88 s.add(source_lifetime);
90 state->source.subscribe(
95 if(state->count > 0) {
96 if (state->items.size() == state->count) {
97 state->out.on_next(std::move(state->items.front()));
100 state->items.push(t);
102 state->out.on_next(t);
107 state->out.on_error(e);
111 state->out.on_completed();
121 template<
class...
AN>
132 template<
class Observable,
class Count,
136 class SkipLast = rxo::detail::skip_last<SourceValue, rxu::decay_t<Observable>,
rxu::decay_t<Count>>,
139 static Result
member(Observable&& o, Count&& c) {
140 return Result(SkipLast(std::forward<Observable>(o), std::forward<Count>(c)));
143 template<
class...
AN>
144 static operators::detail::skip_last_invalid_t<
AN...>
member(
AN...) {
147 static_assert(
sizeof...(
AN) == 10000,
"skip_last takes (Count)");