private static final class SubmissionPublisher.BufferedSubscription<T> extends java.lang.Object implements Flow.Subscription, java.util.concurrent.ForkJoinPool.ManagedBlocker
The publisher guarantees a single producer via its lock. We ensure in this class that there is at most one consumer. The request and cancel methods must be fully thread-safe but are coded to exploit the most common case in which they are only called by consumers (usually within onNext).
Execution control is managed using the ACTIVE ctl bit. We ensure that a task is active when consumable items (and usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and there is demand (unfilled requests). This is complicated on the creation side by the possibility of exceptions when trying to execute tasks. These eventually force DISABLED state, but sometimes not directly. On the task side, termination (clearing ACTIVE) that would otherwise race with producers or request() calls uses the CONSUME keep-alive bit to force a recheck.
The ctl field also manages run state. When DISABLED, no further updates are possible. Disabling may be preceded by setting ERROR or COMPLETE (or both -- ERROR has precedence), in which case the associated Subscriber methods are invoked, possibly synchronously if there is no active consumer task (including cases where execute() failed). The cancel() method is supported by treating as ERROR but suppressing onError signal.
Support for blocking also exploits the fact that there is only one possible waiter. ManagedBlocker-compatible control fields are placed in this class itself rather than in wait-nodes. Blocking control relies on the "waiter" field. Producers set the field before trying to block, but must then recheck (via offer) before parking. Signalling then just unparks and clears waiter field. If the producer and/or consumer are using a ForkJoinPool, the producer attempts to help run consumer tasks via ForkJoinPool.helpAsyncBlocker before blocking.
This class uses @Contended and heuristic field declaration ordering to reduce false-sharing-based memory contention among instances of BufferedSubscription, but it does not currently attempt to avoid memory contention among buffers. This field and element packing can hurt performance especially when each publisher has only one client operating at a high rate. Addressing this may require allocating substantially more space than users expect.
Modifier and Type | Field and Description |
---|---|
private static int |
ABASE |
(package private) static int |
ACTIVE |
(package private) java.lang.Object[] |
array |
private static int |
ASHIFT |
(package private) static int |
COMPLETE |
(package private) static int |
CONSUME |
(package private) int |
ctl |
private static long |
CTL |
(package private) static int |
DEFAULT_INITIAL_CAP
Initial buffer capacity used when maxBufferCapacity is
greater.
|
(package private) long |
demand |
private static long |
DEMAND |
(package private) static int |
DISABLED |
(package private) static int |
ERROR |
(package private) java.util.concurrent.Executor |
executor |
(package private) int |
head |
private static long |
HEAD |
(package private) static long |
INTERRUPTED |
(package private) int |
maxCapacity |
(package private) SubmissionPublisher.BufferedSubscription<T> |
next |
(package private) SubmissionPublisher.BufferedSubscription<T> |
nextRetry |
(package private) java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> |
onNextHandler |
(package private) java.lang.Throwable |
pendingError |
(package private) T |
putItem |
(package private) int |
putStat |
(package private) static int |
SUBSCRIBE |
(package private) Flow.Subscriber<? super T> |
subscriber |
(package private) int |
tail |
private static long |
TAIL |
(package private) long |
timeout |
private static sun.misc.Unsafe |
U |
(package private) java.lang.Thread |
waiter |
Constructor and Description |
---|
BufferedSubscription(Flow.Subscriber<? super T> subscriber,
java.util.concurrent.Executor executor,
java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler,
int maxBufferCapacity) |
Modifier and Type | Method and Description |
---|---|
boolean |
block() |
void |
cancel()
Causes consumer task to exit if active (without reporting
onError unless there is already a pending error), and
disables.
|
private boolean |
checkControl(Flow.Subscriber<? super T> s,
int c)
Responds to control events in consume().
|
private boolean |
checkDemand(int c)
Responds to apparent zero demand in consume().
|
private boolean |
checkEmpty(Flow.Subscriber<? super T> s,
int c)
Responds to apparent emptiness in consume().
|
(package private) void |
consume()
Consumer loop, called from ConsumerTask, or indirectly
when helping during submit.
|
private void |
detach()
Nulls out most fields, mainly to avoid garbage retention
until publisher unsubscribes, but also to help cleanly stop
upon error by nulling required components.
|
(package private) int |
estimateLag()
Returns estimated number of buffered items, or -1 if
disabled.
|
private int |
growAndAdd(java.lang.Object[] a,
T item)
Tries to create or expand buffer, then adds item if possible.
|
private void |
handleOnNext(Flow.Subscriber<? super T> s,
java.lang.Throwable ex)
Processes exception in Subscriber.onNext.
|
(package private) boolean |
isDisabled() |
boolean |
isReleasable() |
(package private) int |
offer(T item)
Tries to add item and start consumer task if necessary.
|
(package private) void |
onComplete() |
(package private) void |
onError(java.lang.Throwable ex)
Issues error signal, asynchronously if a task is running,
else synchronously.
|
(package private) void |
onSubscribe() |
void |
request(long n)
Adds to demand and possibly starts task.
|
private void |
signalWaiter(java.lang.Thread w) |
private int |
startOnOffer(int stat)
Tries to start consumer task after offer.
|
private void |
startOrDisable()
Tries to start consumer task upon a signal or request;
disables on failure.
|
(package private) int |
submit(T item)
Spins/helps/blocks while offer returns 0.
|
(package private) int |
timedOffer(T item,
long nanos)
Timeout version; similar to submit.
|
java.lang.String |
toString() |
long timeout
volatile long demand
int maxCapacity
int putStat
volatile int ctl
volatile int head
int tail
java.lang.Object[] array
Flow.Subscriber<? super T> subscriber
java.util.concurrent.Executor executor
java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler
volatile java.lang.Throwable pendingError
volatile java.lang.Thread waiter
T putItem
SubmissionPublisher.BufferedSubscription<T> next
SubmissionPublisher.BufferedSubscription<T> nextRetry
static final int ACTIVE
static final int CONSUME
static final int DISABLED
static final int ERROR
static final int SUBSCRIBE
static final int COMPLETE
static final long INTERRUPTED
static final int DEFAULT_INITIAL_CAP
private static final sun.misc.Unsafe U
private static final long CTL
private static final long TAIL
private static final long HEAD
private static final long DEMAND
private static final int ABASE
private static final int ASHIFT
BufferedSubscription(Flow.Subscriber<? super T> subscriber, java.util.concurrent.Executor executor, java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler, int maxBufferCapacity)
public java.lang.String toString()
toString
in class java.lang.Object
final boolean isDisabled()
final int estimateLag()
final int offer(T item)
private int growAndAdd(java.lang.Object[] a, T item)
final int submit(T item)
final int timedOffer(T item, long nanos)
private int startOnOffer(int stat)
private void signalWaiter(java.lang.Thread w)
private void detach()
final void onError(java.lang.Throwable ex)
private void startOrDisable()
final void onComplete()
final void onSubscribe()
public void cancel()
cancel
in interface Flow.Subscription
public void request(long n)
request
in interface Flow.Subscription
n
- the increment of demand; a value of Long.MAX_VALUE
may be considered as effectively unboundedpublic final boolean isReleasable()
isReleasable
in interface java.util.concurrent.ForkJoinPool.ManagedBlocker
public final boolean block()
block
in interface java.util.concurrent.ForkJoinPool.ManagedBlocker
final void consume()
private boolean checkControl(Flow.Subscriber<? super T> s, int c)
private boolean checkEmpty(Flow.Subscriber<? super T> s, int c)
private boolean checkDemand(int c)
private void handleOnNext(Flow.Subscriber<? super T> s, java.lang.Throwable ex)