public class ConcurrentMergeScheduler extends MergeScheduler
MergeScheduler
that runs each merge using a
separate thread.
Specify the max number of threads that may run at
once, and the maximum number of simultaneous merges
with setMaxMergesAndThreads(int, int)
.
If the number of merges exceeds the max number of threads then the largest merges are paused until one of the smaller merges completes.
If more than getMaxMergeCount()
merges are
requested then this class will forcefully throttle the
incoming threads by pausing until one more merges
complete.
This class attempts to detect whether the index is
on rotational storage (traditional hard drive) or not
(e.g. solid-state disk) and changes the default max merge
and thread count accordingly. This detection is currently
Linux-only, and relies on the OS to put the right value
into /sys/block/<dev>/block/rotational. For all
other operating systems it currently assumes a rotational
disk for backwards compatibility. To enable default
settings for spinning or solid state disks for such
operating systems, use setDefaultMaxMergesAndThreads(boolean)
.
Modifier and Type | Class and Description |
---|---|
protected class |
ConcurrentMergeScheduler.MergeThread
Runs a merge thread to execute a single merge, then exits.
|
Modifier and Type | Field and Description |
---|---|
static int |
AUTO_DETECT_MERGES_AND_THREADS
Dynamic default for
maxThreadCount and maxMergeCount ,
used to detect whether the index is backed by an SSD or rotational disk and
set maxThreadCount accordingly. |
static java.lang.String |
DEFAULT_CPU_CORE_COUNT_PROPERTY
Used for testing.
|
static java.lang.String |
DEFAULT_SPINS_PROPERTY
Used for testing.
|
private boolean |
doAutoIOThrottle
true if we should rate-limit writes for each merge
|
private double |
forceMergeMBPerSec |
private static double |
MAX_MERGE_MB_PER_SEC
Ceiling for IO write rate limit (we will never go any higher than this)
|
private int |
maxMergeCount |
private int |
maxThreadCount |
protected int |
mergeThreadCount
How many
ConcurrentMergeScheduler.MergeThread s have kicked off (this is use
to name them). |
protected java.util.List<ConcurrentMergeScheduler.MergeThread> |
mergeThreads
List of currently active
ConcurrentMergeScheduler.MergeThread s. |
private static double |
MIN_BIG_MERGE_MB
Merges below this size are not counted in the maxThreadCount, i.e.
|
private static double |
MIN_MERGE_MB_PER_SEC
Floor for IO write rate limit (we will never go any lower than this)
|
private static double |
START_MB_PER_SEC
Initial value for IO write rate limit when doAutoIOThrottle is true
|
private boolean |
suppressExceptions |
protected double |
targetMBPerSec
Current IO writes throttle rate
|
infoStream
Constructor and Description |
---|
ConcurrentMergeScheduler()
Sole constructor, with all settings set to default
values.
|
Modifier and Type | Method and Description |
---|---|
private static double |
bytesToMB(long bytes) |
(package private) void |
clearSuppressExceptions()
Used for testing
|
void |
close()
Close this MergeScheduler.
|
void |
disableAutoIOThrottle()
Turn off auto IO throttling.
|
protected void |
doMerge(IndexWriter writer,
MergePolicy.OneMerge merge)
Does the actual merge, by calling
IndexWriter.merge(org.apache.lucene.index.MergePolicy.OneMerge) |
protected void |
doStall()
Called from
maybeStall(org.apache.lucene.index.IndexWriter) to pause the calling thread for a bit. |
void |
enableAutoIOThrottle()
Turn on dynamic IO throttling, to adaptively rate limit writes
bytes/sec to the minimal rate necessary so merges do not fall behind.
|
boolean |
getAutoIOThrottle()
Returns true if auto IO throttling is currently enabled.
|
double |
getForceMergeMBPerSec()
Get the per-merge IO throttle rate for forced merges.
|
double |
getIORateLimitMBPerSec()
Returns the currently set per-merge IO writes rate limit, if
enableAutoIOThrottle()
was called, else Double.POSITIVE_INFINITY . |
int |
getMaxMergeCount()
|
int |
getMaxThreadCount()
Returns
maxThreadCount . |
protected ConcurrentMergeScheduler.MergeThread |
getMergeThread(IndexWriter writer,
MergePolicy.OneMerge merge)
Create and return a new MergeThread
|
protected void |
handleMergeException(Directory dir,
java.lang.Throwable exc)
Called when an exception is hit in a background merge
thread
|
private void |
initDynamicDefaults(IndexWriter writer) |
private boolean |
isBacklog(long now,
MergePolicy.OneMerge merge) |
protected boolean |
maybeStall(IndexWriter writer)
This is invoked by
merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean) to possibly stall the incoming
thread when there are too many merges running or pending. |
void |
merge(IndexWriter writer,
MergeTrigger trigger,
boolean newMergesFound)
Run the merges provided by
IndexWriter.getNextMerge() . |
int |
mergeThreadCount()
Returns the number of merge threads that are alive, ignoring the calling thread
if it is a merge thread.
|
private static double |
nsToSec(long ns) |
private static java.lang.String |
rateToString(double mbPerSec) |
(package private) void |
removeMergeThread()
Removes the calling thread from the active merge threads.
|
void |
setDefaultMaxMergesAndThreads(boolean spins)
Sets max merges and threads to proper defaults for rotational
or non-rotational storage.
|
void |
setForceMergeMBPerSec(double v)
Set the per-merge IO throttle rate for forced merges (default:
Double.POSITIVE_INFINITY ). |
void |
setMaxMergesAndThreads(int maxMergeCount,
int maxThreadCount)
Expert: directly set the maximum number of merge threads and
simultaneous merges allowed.
|
(package private) void |
setSuppressExceptions()
Used for testing
|
void |
sync()
Wait for any running merge threads to finish.
|
protected void |
targetMBPerSecChanged()
Subclass can override to tweak targetMBPerSec.
|
java.lang.String |
toString() |
private void |
updateIOThrottle(MergePolicy.OneMerge newMerge,
MergeRateLimiter rateLimiter)
Tunes IO throttle when a new merge starts.
|
protected void |
updateMergeThreads()
Called whenever the running merges have changed, to set merge IO limits.
|
Directory |
wrapForMerge(MergePolicy.OneMerge merge,
Directory in)
Wraps the incoming
Directory so that we can merge-throttle it
using RateLimitedIndexOutput . |
message, setInfoStream, verbose
public static final int AUTO_DETECT_MERGES_AND_THREADS
maxThreadCount
and maxMergeCount
,
used to detect whether the index is backed by an SSD or rotational disk and
set maxThreadCount
accordingly. If it's an SSD,
maxThreadCount
is set to max(1, min(4, cpuCoreCount/2))
,
otherwise 1. Note that detection only currently works on
Linux; other platforms will assume the index is not on an SSD.public static final java.lang.String DEFAULT_CPU_CORE_COUNT_PROPERTY
public static final java.lang.String DEFAULT_SPINS_PROPERTY
protected final java.util.List<ConcurrentMergeScheduler.MergeThread> mergeThreads
ConcurrentMergeScheduler.MergeThread
s.private int maxThreadCount
private int maxMergeCount
protected int mergeThreadCount
ConcurrentMergeScheduler.MergeThread
s have kicked off (this is use
to name them).private static final double MIN_MERGE_MB_PER_SEC
private static final double MAX_MERGE_MB_PER_SEC
private static final double START_MB_PER_SEC
private static final double MIN_BIG_MERGE_MB
protected double targetMBPerSec
private boolean doAutoIOThrottle
private double forceMergeMBPerSec
private boolean suppressExceptions
public ConcurrentMergeScheduler()
public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
maxMergeCount
- the max # simultaneous merges that are allowed.
If a merge is necessary yet we already have this many
threads running, the incoming thread (that is calling
add/updateDocument) will block until a merge thread
has completed. Note that we will only run the
smallest maxThreadCount
merges at a time.maxThreadCount
- the max # simultaneous merge threads that should
be running at once. This must be <= maxMergeCount
public void setDefaultMaxMergesAndThreads(boolean spins)
spins
- true to set defaults best for traditional rotatational storage (spinning disks),
else false (e.g. for solid-state disks)public void setForceMergeMBPerSec(double v)
Double.POSITIVE_INFINITY
).public double getForceMergeMBPerSec()
public void enableAutoIOThrottle()
public void disableAutoIOThrottle()
enableAutoIOThrottle()
public boolean getAutoIOThrottle()
public double getIORateLimitMBPerSec()
enableAutoIOThrottle()
was called, else Double.POSITIVE_INFINITY
.public int getMaxThreadCount()
maxThreadCount
.setMaxMergesAndThreads(int, int)
public int getMaxMergeCount()
void removeMergeThread()
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in)
MergeScheduler
Directory
so that we can merge-throttle it
using RateLimitedIndexOutput
.wrapForMerge
in class MergeScheduler
protected void updateMergeThreads()
private void initDynamicDefaults(IndexWriter writer) throws java.io.IOException
java.io.IOException
private static java.lang.String rateToString(double mbPerSec)
public void close()
MergeScheduler
close
in interface java.io.Closeable
close
in interface java.lang.AutoCloseable
close
in class MergeScheduler
public void sync()
close()
.public int mergeThreadCount()
mergeThreads
size.public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws java.io.IOException
MergeScheduler
IndexWriter.getNextMerge()
.merge
in class MergeScheduler
writer
- the IndexWriter
to obtain the merges from.trigger
- the MergeTrigger
that caused this merge to happennewMergesFound
- true
iff any new merges were found by the caller otherwise false
java.io.IOException
protected boolean maybeStall(IndexWriter writer)
merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean)
to possibly stall the incoming
thread when there are too many merges running or pending. The
default behavior is to force this thread, which is producing too
many segments for merging to keep up, to wait until merges catch
up. Applications that can take other less drastic measures, such
as limiting how many threads are allowed to index, can do nothing
here and throttle elsewhere.
If this method wants to stall but the calling thread is a merge
thread, it should return false to tell caller not to kick off
any new merges.protected void doStall()
maybeStall(org.apache.lucene.index.IndexWriter)
to pause the calling thread for a bit.protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws java.io.IOException
IndexWriter.merge(org.apache.lucene.index.MergePolicy.OneMerge)
java.io.IOException
protected ConcurrentMergeScheduler.MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws java.io.IOException
java.io.IOException
protected void handleMergeException(Directory dir, java.lang.Throwable exc)
void setSuppressExceptions()
void clearSuppressExceptions()
public java.lang.String toString()
toString
in class java.lang.Object
private boolean isBacklog(long now, MergePolicy.OneMerge merge)
private void updateIOThrottle(MergePolicy.OneMerge newMerge, MergeRateLimiter rateLimiter) throws java.io.IOException
java.io.IOException
protected void targetMBPerSecChanged()
private static double nsToSec(long ns)
private static double bytesToMB(long bytes)