public class MpscRelaxedArrayQueue<E> extends MpscRelaxedArrayQueueL4Pad<E> implements MessagePassingQueue<E>
MessagePassingQueue.Consumer<T>, MessagePassingQueue.ExitCondition, MessagePassingQueue.Supplier<T>, MessagePassingQueue.WaitStrategy| Modifier and Type | Field and Description |
|---|---|
private E[] |
buffer |
private int |
cycleIdBitShift |
private int |
cycleLength |
private int |
cycleLengthLog2 |
private long |
mask
Note on terminology:
- position/id: overall progress indicator, not an array index or offset at which to lookup/write.
|
private long |
maxCycleId |
private int |
positionWithinCycleMask |
p01, p02, p03, p04, p05, p06, p10, p11, p12, p13, p14, p15, p16, p17producerFirstCycleClaim, producerSecondCycleClaimp07consumerPositionp00UNBOUNDED_CAPACITY| Constructor and Description |
|---|
MpscRelaxedArrayQueue(int capacity) |
| Modifier and Type | Method and Description |
|---|---|
private static int |
calcElementIndexInBuffer(int positionWithinCycle,
int cycleIndex,
int cycleLengthLog2)
Convert [position within cycle, cycleIndex] to index in buffer.
|
int |
capacity() |
private static long |
circularArrayOffset(long consumerPosition,
long mask)
Used by the consumer only to compute offset in bytes, within the full circular buffer, for the given position.
|
void |
clear()
Removes all items from the queue.
|
private long |
detectSlowRotation(long claimCycleId,
long nextCycleId) |
int |
drain(MessagePassingQueue.Consumer<E> c)
Remove all available item from the queue and hand to consume.
|
int |
drain(MessagePassingQueue.Consumer<E> c,
int limit)
Remove up to limit elements from the queue and hand to consume.
|
void |
drain(MessagePassingQueue.Consumer<E> c,
MessagePassingQueue.WaitStrategy w,
MessagePassingQueue.ExitCondition exit)
Remove elements from the queue and hand to consume forever.
|
int |
fill(MessagePassingQueue.Supplier<E> s)
Stuff the queue with elements from the supplier.
|
int |
fill(MessagePassingQueue.Supplier<E> s,
int limit)
Stuff the queue with up to limit elements from the supplier.
|
void |
fill(MessagePassingQueue.Supplier<E> s,
MessagePassingQueue.WaitStrategy w,
MessagePassingQueue.ExitCondition exit)
Stuff the queue with elements from the supplier forever.
|
private boolean |
fixProducerOverClaim(int activeCycleIndex,
long producerCycleClaim,
boolean slowProducer)
It tries to fix a producer overclaim.
|
boolean |
isEmpty()
This method's accuracy is subject to concurrent modifications happening as the observation is carried
out.
|
private boolean |
isFull(long producerPosition)
Given the nature of getAndAdd progress on producerPosition and given the potential risk for over claiming it is
quite possible for this method to report a queue which is not full as full.
|
java.util.Iterator<E> |
iterator() |
boolean |
offer(E e)
Called from a producer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.offer(Object) interface. |
E |
peek()
Called from the consumer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.peek() interface. |
private E |
peekSlowPath(E[] buffer,
long consumerPosition,
long offset) |
E |
poll()
Called from the consumer thread subject to the restrictions appropriate to the implementation and
according to the
Queue.poll() interface. |
private E |
pollSlowPath(E[] buffer,
long offset,
long consumerPosition) |
private static int |
positionWithinCycle(long producerCycleClaim,
int positionOnCycleMask) |
private static long |
producerClaimCycleId(long producerCycleClaim,
int cycleIdBitShift) |
private static long |
producerPosition(int positionWithinCycle,
long cycleId,
int cycleLengthLog2)
Convert position in cycle and cycleId into a producer position (monotonically increasing reflection of offers
that is comparable with the consumerPosition to determine size/empty/full)
|
private static long |
producerPositionFromClaim(long producerCycleClaim,
int positionOnCycleMask,
int cycleIdBitShift,
int cycleLengthLog2) |
boolean |
relaxedOffer(E e)
Called from a producer thread subject to the restrictions appropriate to the implementation.
|
E |
relaxedPeek()
Called from the consumer thread subject to the restrictions appropriate to the implementation.
|
E |
relaxedPoll()
Called from the consumer thread subject to the restrictions appropriate to the implementation.
|
private void |
rotateCycle(long claimCycleId,
int cycleIdBitShift,
long maxCycleId) |
private void |
signalConsumerProgress(long consumerPosition,
E[] buffer,
long offset) |
int |
size()
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as
such is a best effort rather than absolute value.
|
private void |
soCycleElement(E[] buffer,
E e,
int activeCycleIndex,
int positionWithinCycle,
int cycleLengthLog2) |
private E |
spinForElement(E[] buffer,
long offset) |
java.lang.String |
toString() |
private boolean |
validateProducerClaim(int activeCycleIndex,
long producerCycleClaim,
long cycleId,
int positionOnCycle,
int cycleLengthLog2,
boolean slowProducer)
Validate a producer claim to find out if is an overclaim (beyond the producer limit).
|
private void |
validateSlowProducerOverClaim(int activeCycleIndex,
long producerCycleClaim)
Validates a slow producer over-claim throwing
IllegalStateException if the offer on it can't continue. |
casProducerCycleClaim, getAndIncrementProducerCycleClaim, lvProducerCycleClaim, soProducerCycleClaimlpConsumerPosition, lvConsumerPosition, soConsumerPositionlvProducerLimit, soProducerLimitactiveCycleIndex, casActiveCycleId, lvActiveCycleId, soActiveCycleIdcontains, containsAll, remove, removeAll, retainAll, toArray, toArrayclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitprivate final long mask
private final int cycleLength
private final int cycleLengthLog2
private final E[] buffer
private final int positionWithinCycleMask
private final int cycleIdBitShift
private final long maxCycleId
public java.util.Iterator<E> iterator()
public boolean offer(E e)
MessagePassingQueueQueue.offer(Object) interface.offer in interface java.util.Queue<E>offer in interface MessagePassingQueue<E>e - not null, will throw NPE if it isprivate boolean isFull(long producerPosition)
private void rotateCycle(long claimCycleId,
int cycleIdBitShift,
long maxCycleId)
private long detectSlowRotation(long claimCycleId,
long nextCycleId)
private boolean validateProducerClaim(int activeCycleIndex,
long producerCycleClaim,
long cycleId,
int positionOnCycle,
int cycleLengthLog2,
boolean slowProducer)
true if the claim is valid, false otherwise.private boolean fixProducerOverClaim(int activeCycleIndex,
long producerCycleClaim,
boolean slowProducer)
true if the claim is now safe to be used,false otherwise and is needed to retry the claim.private void validateSlowProducerOverClaim(int activeCycleIndex,
long producerCycleClaim)
IllegalStateException if the offer on it can't continue.private void soCycleElement(E[] buffer, E e, int activeCycleIndex, int positionWithinCycle, int cycleLengthLog2)
public E poll()
MessagePassingQueueQueue.poll() interface.poll in interface java.util.Queue<E>poll in interface MessagePassingQueue<E>private void signalConsumerProgress(long consumerPosition,
E[] buffer,
long offset)
public E peek()
MessagePassingQueueQueue.peek() interface.peek in interface java.util.Queue<E>peek in interface MessagePassingQueue<E>public int size()
MessagePassingQueuesize in interface java.util.Collection<E>size in interface MessagePassingQueue<E>size in class java.util.AbstractCollection<E>Integer.MAX_VALUE but less or equals to
capacity (if bounded).public void clear()
MessagePassingQueueCollection.clear() interface.clear in interface java.util.Collection<E>clear in interface MessagePassingQueue<E>clear in class java.util.AbstractQueue<E>public boolean isEmpty()
MessagePassingQueueisEmpty in interface java.util.Collection<E>isEmpty in interface MessagePassingQueue<E>isEmpty in class java.util.AbstractCollection<E>public int capacity()
capacity in interface MessagePassingQueue<E>MessagePassingQueue.UNBOUNDED_CAPACITY if not boundedpublic boolean relaxedOffer(E e)
MessagePassingQueueQueue.offer(Object) this method may return false without the queue being full.relaxedOffer in interface MessagePassingQueue<E>e - not null, will throw NPE if it ispublic E relaxedPoll()
MessagePassingQueueQueue.poll() this method may return null without the queue being empty.relaxedPoll in interface MessagePassingQueue<E>public E relaxedPeek()
MessagePassingQueueQueue.peek() this method may return null without the queue being empty.relaxedPeek in interface MessagePassingQueue<E>public int drain(MessagePassingQueue.Consumer<E> c)
MessagePassingQueue
M m;
while((m = relaxedPoll()) != null){
c.accept(m);
}
There's no strong commitment to the queue being empty at the end of a drain. Called from a
consumer thread subject to the restrictions appropriate to the implementation.drain in interface MessagePassingQueue<E>public int fill(MessagePassingQueue.Supplier<E> s)
MessagePassingQueue
while(relaxedOffer(s.get());
There's no strong commitment to the queue being full at the end of a fill. Called from a
producer thread subject to the restrictions appropriate to the implementation.fill in interface MessagePassingQueue<E>public int drain(MessagePassingQueue.Consumer<E> c, int limit)
MessagePassingQueue
M m;
int i = 0;
for(;i < limit && (m = relaxedPoll()) != null; i++){
c.accept(m);
}
return i;
There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.
drain in interface MessagePassingQueue<E>public int fill(MessagePassingQueue.Supplier<E> s, int limit)
MessagePassingQueue
for(int i=0; i < limit && relaxedOffer(s.get()); i++);
There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.
fill in interface MessagePassingQueue<E>public void drain(MessagePassingQueue.Consumer<E> c, MessagePassingQueue.WaitStrategy w, MessagePassingQueue.ExitCondition exit)
MessagePassingQueue
int idleCounter = 0;
while (exit.keepRunning()) {
E e = relaxedPoll();
if(e==null){
idleCounter = wait.idle(idleCounter);
continue;
}
idleCounter = 0;
c.accept(e);
}
Called from a consumer thread subject to the restrictions appropriate to the implementation.
drain in interface MessagePassingQueue<E>public void fill(MessagePassingQueue.Supplier<E> s, MessagePassingQueue.WaitStrategy w, MessagePassingQueue.ExitCondition exit)
MessagePassingQueue
int idleCounter = 0;
while (exit.keepRunning()) {
E e = s.get();
while (!relaxedOffer(e)) {
idleCounter = wait.idle(idleCounter);
continue;
}
idleCounter = 0;
}
Called from a producer thread subject to the restrictions appropriate to the implementation.
fill in interface MessagePassingQueue<E>private static int positionWithinCycle(long producerCycleClaim,
int positionOnCycleMask)
private static long producerClaimCycleId(long producerCycleClaim,
int cycleIdBitShift)
private static long producerPositionFromClaim(long producerCycleClaim,
int positionOnCycleMask,
int cycleIdBitShift,
int cycleLengthLog2)
private static long producerPosition(int positionWithinCycle,
long cycleId,
int cycleLengthLog2)
private static int calcElementIndexInBuffer(int positionWithinCycle,
int cycleIndex,
int cycleLengthLog2)
private static long circularArrayOffset(long consumerPosition,
long mask)
public java.lang.String toString()
toString in class java.util.AbstractCollection<E>