public class MpscRelaxedArrayQueue<E> extends MpscRelaxedArrayQueueL4Pad<E> implements MessagePassingQueue<E>
MessagePassingQueue.Consumer<T>, MessagePassingQueue.ExitCondition, MessagePassingQueue.Supplier<T>, MessagePassingQueue.WaitStrategy
Modifier and Type | Field and Description |
---|---|
private E[] |
buffer |
private int |
cycleIdBitShift |
private int |
cycleLength |
private int |
cycleLengthLog2 |
private long |
mask
Note on terminology:
- position/id: overall progress indicator, not an array index or offset at which to lookup/write.
|
private long |
maxCycleId |
private int |
positionWithinCycleMask |
p01, p02, p03, p04, p05, p06, p10, p11, p12, p13, p14, p15, p16, p17
producerFirstCycleClaim, producerSecondCycleClaim
p07
consumerPosition
p00
UNBOUNDED_CAPACITY
Constructor and Description |
---|
MpscRelaxedArrayQueue(int capacity) |
Modifier and Type | Method and Description |
---|---|
private static int |
calcElementIndexInBuffer(int positionWithinCycle,
int cycleIndex,
int cycleLengthLog2)
Convert [position within cycle, cycleIndex] to index in buffer.
|
int |
capacity() |
private static long |
circularArrayOffset(long consumerPosition,
long mask)
Used by the consumer only to compute offset in bytes, within the full circular buffer, for the given position.
|
void |
clear()
Removes all items from the queue.
|
private long |
detectSlowRotation(long claimCycleId,
long nextCycleId) |
int |
drain(MessagePassingQueue.Consumer<E> c)
Remove all available item from the queue and hand to consume.
|
int |
drain(MessagePassingQueue.Consumer<E> c,
int limit)
Remove up to limit elements from the queue and hand to consume.
|
void |
drain(MessagePassingQueue.Consumer<E> c,
MessagePassingQueue.WaitStrategy w,
MessagePassingQueue.ExitCondition exit)
Remove elements from the queue and hand to consume forever.
|
int |
fill(MessagePassingQueue.Supplier<E> s)
Stuff the queue with elements from the supplier.
|
int |
fill(MessagePassingQueue.Supplier<E> s,
int limit)
Stuff the queue with up to limit elements from the supplier.
|
void |
fill(MessagePassingQueue.Supplier<E> s,
MessagePassingQueue.WaitStrategy w,
MessagePassingQueue.ExitCondition exit)
Stuff the queue with elements from the supplier forever.
|
private boolean |
fixProducerOverClaim(int activeCycleIndex,
long producerCycleClaim,
boolean slowProducer)
It tries to fix a producer overclaim.
|
boolean |
isEmpty()
This method's accuracy is subject to concurrent modifications happening as the observation is carried
out.
|
private boolean |
isFull(long producerPosition)
Given the nature of getAndAdd progress on producerPosition and given the potential risk for over claiming it is
quite possible for this method to report a queue which is not full as full.
|
java.util.Iterator<E> |
iterator() |
boolean |
offer(E e)
Called from a producer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.offer(Object) interface. |
E |
peek()
Called from the consumer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.peek() interface. |
private E |
peekSlowPath(E[] buffer,
long consumerPosition,
long offset) |
E |
poll()
Called from the consumer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.poll() interface. |
private E |
pollSlowPath(E[] buffer,
long offset,
long consumerPosition) |
private static int |
positionWithinCycle(long producerCycleClaim,
int positionOnCycleMask) |
private static long |
producerClaimCycleId(long producerCycleClaim,
int cycleIdBitShift) |
private static long |
producerPosition(int positionWithinCycle,
long cycleId,
int cycleLengthLog2)
Convert position in cycle and cycleId into a producer position (monotonically increasing reflection of offers
that is comparable with the consumerPosition to determine size/empty/full)
|
private static long |
producerPositionFromClaim(long producerCycleClaim,
int positionOnCycleMask,
int cycleIdBitShift,
int cycleLengthLog2) |
boolean |
relaxedOffer(E e)
Called from a producer thread subject to the restrictions appropriate to the implementation.
|
E |
relaxedPeek()
Called from the consumer thread subject to the restrictions appropriate to the implementation.
|
E |
relaxedPoll()
Called from the consumer thread subject to the restrictions appropriate to the implementation.
|
private void |
rotateCycle(long claimCycleId,
int cycleIdBitShift,
long maxCycleId) |
private void |
signalConsumerProgress(long consumerPosition,
E[] buffer,
long offset) |
int |
size()
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as
such is a best effort rather than absolute value.
|
private void |
soCycleElement(E[] buffer,
E e,
int activeCycleIndex,
int positionWithinCycle,
int cycleLengthLog2) |
private E |
spinForElement(E[] buffer,
long offset) |
java.lang.String |
toString() |
private boolean |
validateProducerClaim(int activeCycleIndex,
long producerCycleClaim,
long cycleId,
int positionOnCycle,
int cycleLengthLog2,
boolean slowProducer)
Validate a producer claim to find out if is an overclaim (beyond the producer limit).
|
private void |
validateSlowProducerOverClaim(int activeCycleIndex,
long producerCycleClaim)
Validates a slow producer over-claim throwing
IllegalStateException if the offer on it can't continue. |
casProducerCycleClaim, getAndIncrementProducerCycleClaim, lvProducerCycleClaim, soProducerCycleClaim
lpConsumerPosition, lvConsumerPosition, soConsumerPosition
lvProducerLimit, soProducerLimit
activeCycleIndex, casActiveCycleId, lvActiveCycleId, soActiveCycleId
contains, containsAll, remove, removeAll, retainAll, toArray, toArray
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
private final long mask
private final int cycleLength
private final int cycleLengthLog2
private final E[] buffer
private final int positionWithinCycleMask
private final int cycleIdBitShift
private final long maxCycleId
public java.util.Iterator<E> iterator()
public boolean offer(E e)
MessagePassingQueue
Queue.offer(Object)
interface.offer
in interface java.util.Queue<E>
offer
in interface MessagePassingQueue<E>
e
- not null, will throw NPE if it isprivate boolean isFull(long producerPosition)
private void rotateCycle(long claimCycleId, int cycleIdBitShift, long maxCycleId)
private long detectSlowRotation(long claimCycleId, long nextCycleId)
private boolean validateProducerClaim(int activeCycleIndex, long producerCycleClaim, long cycleId, int positionOnCycle, int cycleLengthLog2, boolean slowProducer)
true
if the claim is valid, false
otherwise.private boolean fixProducerOverClaim(int activeCycleIndex, long producerCycleClaim, boolean slowProducer)
true
if the claim is now safe to be used,false
otherwise and is needed to retry the claim.private void validateSlowProducerOverClaim(int activeCycleIndex, long producerCycleClaim)
IllegalStateException
if the offer on it can't continue.private void soCycleElement(E[] buffer, E e, int activeCycleIndex, int positionWithinCycle, int cycleLengthLog2)
public E poll()
MessagePassingQueue
Queue.poll()
interface.poll
in interface java.util.Queue<E>
poll
in interface MessagePassingQueue<E>
private void signalConsumerProgress(long consumerPosition, E[] buffer, long offset)
public E peek()
MessagePassingQueue
Queue.peek()
interface.peek
in interface java.util.Queue<E>
peek
in interface MessagePassingQueue<E>
public int size()
MessagePassingQueue
size
in interface java.util.Collection<E>
size
in interface MessagePassingQueue<E>
size
in class java.util.AbstractCollection<E>
Integer.MAX_VALUE
but less or equals to
capacity (if bounded).public void clear()
MessagePassingQueue
Collection.clear()
interface.clear
in interface java.util.Collection<E>
clear
in interface MessagePassingQueue<E>
clear
in class java.util.AbstractQueue<E>
public boolean isEmpty()
MessagePassingQueue
isEmpty
in interface java.util.Collection<E>
isEmpty
in interface MessagePassingQueue<E>
isEmpty
in class java.util.AbstractCollection<E>
public int capacity()
capacity
in interface MessagePassingQueue<E>
MessagePassingQueue.UNBOUNDED_CAPACITY
if not boundedpublic boolean relaxedOffer(E e)
MessagePassingQueue
Queue.offer(Object)
this method may return false without the queue being full.relaxedOffer
in interface MessagePassingQueue<E>
e
- not null, will throw NPE if it ispublic E relaxedPoll()
MessagePassingQueue
Queue.poll()
this method may return null without the queue being empty.relaxedPoll
in interface MessagePassingQueue<E>
public E relaxedPeek()
MessagePassingQueue
Queue.peek()
this method may return null without the queue being empty.relaxedPeek
in interface MessagePassingQueue<E>
public int drain(MessagePassingQueue.Consumer<E> c)
MessagePassingQueue
M m;
while((m = relaxedPoll()) != null){
c.accept(m);
}
There's no strong commitment to the queue being empty at the end of a drain. Called from a
consumer thread subject to the restrictions appropriate to the implementation.drain
in interface MessagePassingQueue<E>
public int fill(MessagePassingQueue.Supplier<E> s)
MessagePassingQueue
while(relaxedOffer(s.get());
There's no strong commitment to the queue being full at the end of a fill. Called from a
producer thread subject to the restrictions appropriate to the implementation.fill
in interface MessagePassingQueue<E>
public int drain(MessagePassingQueue.Consumer<E> c, int limit)
MessagePassingQueue
M m;
int i = 0;
for(;i < limit && (m = relaxedPoll()) != null; i++){
c.accept(m);
}
return i;
There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.
drain
in interface MessagePassingQueue<E>
public int fill(MessagePassingQueue.Supplier<E> s, int limit)
MessagePassingQueue
for(int i=0; i < limit && relaxedOffer(s.get()); i++);
There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.
fill
in interface MessagePassingQueue<E>
public void drain(MessagePassingQueue.Consumer<E> c, MessagePassingQueue.WaitStrategy w, MessagePassingQueue.ExitCondition exit)
MessagePassingQueue
int idleCounter = 0;
while (exit.keepRunning()) {
E e = relaxedPoll();
if(e==null){
idleCounter = wait.idle(idleCounter);
continue;
}
idleCounter = 0;
c.accept(e);
}
Called from a consumer thread subject to the restrictions appropriate to the implementation.
drain
in interface MessagePassingQueue<E>
public void fill(MessagePassingQueue.Supplier<E> s, MessagePassingQueue.WaitStrategy w, MessagePassingQueue.ExitCondition exit)
MessagePassingQueue
int idleCounter = 0;
while (exit.keepRunning()) {
E e = s.get();
while (!relaxedOffer(e)) {
idleCounter = wait.idle(idleCounter);
continue;
}
idleCounter = 0;
}
Called from a producer thread subject to the restrictions appropriate to the implementation.
fill
in interface MessagePassingQueue<E>
private static int positionWithinCycle(long producerCycleClaim, int positionOnCycleMask)
private static long producerClaimCycleId(long producerCycleClaim, int cycleIdBitShift)
private static long producerPositionFromClaim(long producerCycleClaim, int positionOnCycleMask, int cycleIdBitShift, int cycleLengthLog2)
private static long producerPosition(int positionWithinCycle, long cycleId, int cycleLengthLog2)
private static int calcElementIndexInBuffer(int positionWithinCycle, int cycleIndex, int cycleLengthLog2)
private static long circularArrayOffset(long consumerPosition, long mask)
public java.lang.String toString()
toString
in class java.util.AbstractCollection<E>