Skip to content

Commit 552da37

Browse files
albertzaharovitsbenchaplin
authored andcommitted
Threadpool merge executor is aware of available disk space (elastic#127613)
This PR introduces 3 new settings: indices.merge.disk.check_interval, indices.merge.disk.watermark.high, and indices.merge.disk.watermark.high.max_headroom that control if the threadpool merge executor starts executing new merges when the disk space is getting low. The intent of this change is to avoid the situation where in-progress merges exhaust the available disk space on the node's local filesystem. To this end, the thread pool merge executor periodically monitors the available disk space, as well as the current disk space estimates required by all in-progress (currently running) merges on the node, and will NOT schedule any new merges if the disk space is getting low (by default below the 5% limit of the total disk space, or 100 GB, whichever is smaller (same as the disk allocation flood stage level)).
1 parent 2715c83 commit 552da37

File tree

13 files changed

+1891
-97
lines changed

13 files changed

+1891
-97
lines changed

docs/changelog/127613.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127613
2+
summary: Threadpool merge executor is aware of available disk space
3+
area: Engine
4+
type: feature
5+
issues: []

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.elasticsearch.index.IndexSettings;
8989
import org.elasticsearch.index.IndexingPressure;
9090
import org.elasticsearch.index.MergePolicyConfig;
91+
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
9192
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
9293
import org.elasticsearch.index.shard.IndexingStatsSettings;
9394
import org.elasticsearch.indices.IndexingMemoryController;
@@ -629,6 +630,9 @@ public void apply(Settings value, Settings current, Settings previous) {
629630
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
630631
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
631632
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
633+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
634+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
635+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
632636
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
633637
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
634638
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 516 additions & 33 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
5555
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
5656
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
5757
16,
58-
Comparator.comparingLong(MergeTask::estimatedMergeSize)
58+
Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize)
5959
);
6060
private final Map<MergePolicy.OneMerge, MergeTask> runningMergeTasks = new HashMap<>();
6161
// set when incoming merges should be throttled (i.e. restrict the indexing rate)
@@ -266,7 +266,7 @@ private void checkMergeTaskThrottling() {
266266
// exposed for tests
267267
// synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
268268
synchronized Schedule schedule(MergeTask mergeTask) {
269-
assert mergeTask.isRunning() == false;
269+
assert mergeTask.hasStartedRunning() == false;
270270
if (closed) {
271271
// do not run or backlog tasks when closing the merge scheduler, instead abort them
272272
return Schedule.ABORT;
@@ -280,6 +280,7 @@ synchronized Schedule schedule(MergeTask mergeTask) {
280280
assert added : "starting merge task [" + mergeTask + "] registered as already running";
281281
return Schedule.RUN;
282282
} else {
283+
assert mergeTask.hasStartedRunning() == false;
283284
backloggedMergeTasks.add(mergeTask);
284285
return Schedule.BACKLOG;
285286
}
@@ -403,8 +404,14 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) {
403404
this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac());
404405
}
405406

406-
public boolean isRunning() {
407-
return mergeStartTimeNS.get() > 0L;
407+
/**
408+
* Returns {@code true} if this task is currently running, or was run in the past.
409+
* An aborted task (see {@link #abort()}) is considered as NOT run.
410+
*/
411+
public boolean hasStartedRunning() {
412+
boolean isRunning = mergeStartTimeNS.get() > 0L;
413+
assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L;
414+
return isRunning;
408415
}
409416

410417
/**
@@ -415,7 +422,7 @@ public boolean isRunning() {
415422
*/
416423
@Override
417424
public void run() {
418-
assert isRunning() == false;
425+
assert hasStartedRunning() == false;
419426
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
420427
: "runNowOrBacklog must be invoked before actually running the merge task";
421428
try {
@@ -480,7 +487,7 @@ public void run() {
480487
* (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges.
481488
*/
482489
void abort() {
483-
assert isRunning() == false;
490+
assert hasStartedRunning() == false;
484491
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false
485492
: "cannot abort a merge task that's already running";
486493
if (verbose()) {
@@ -509,10 +516,17 @@ void abort() {
509516
}
510517
}
511518

512-
long estimatedMergeSize() {
519+
/**
520+
* Before the merge task started running, this returns the estimated required disk space for the merge to complete
521+
* (i.e. the estimated disk space size of the resulting segment following the merge).
522+
* While the merge is running, the returned estimation is updated to take into account the data that's already been written.
523+
* After the merge completes, the estimation returned here should ideally be close to "0".
524+
*/
525+
long estimatedRemainingMergeSize() {
513526
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
514527
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
515-
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
528+
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
529+
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
516530
}
517531

518532
public long getMergeMemoryEstimateBytes() {

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,6 @@ protected void doStart() {
295295
IndicesService(IndicesServiceBuilder builder) {
296296
this.settings = builder.settings;
297297
this.threadPool = builder.threadPool;
298-
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
299-
threadPool,
300-
settings
301-
);
302298
this.pluginsService = builder.pluginsService;
303299
this.nodeEnv = builder.nodeEnv;
304300
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
@@ -321,6 +317,11 @@ protected void doStart() {
321317
this.bigArrays = builder.bigArrays;
322318
this.scriptService = builder.scriptService;
323319
this.clusterService = builder.clusterService;
320+
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
321+
threadPool,
322+
clusterService.getClusterSettings(),
323+
nodeEnv
324+
);
324325
this.projectResolver = builder.projectResolver;
325326
this.client = builder.client;
326327
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());
@@ -368,7 +369,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
368369
indicesFieldDataCache,
369370
cacheCleaner,
370371
indicesRequestCache,
371-
indicesQueryCache
372+
indicesQueryCache,
373+
threadPoolMergeExecutorService
372374
);
373375
} catch (IOException e) {
374376
throw new UncheckedIOException(e);

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,17 @@ public void setUp() throws Exception {
194194
emptyMap()
195195
);
196196
threadPool = new TestThreadPool("test");
197-
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
198197
circuitBreakerService = new NoneCircuitBreakerService();
199198
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
200199
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
201200
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
202-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
201+
clusterService = ClusterServiceUtils.createClusterService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings));
203202
nodeEnvironment = new NodeEnvironment(settings, environment);
203+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
204+
threadPool,
205+
clusterService.getClusterSettings(),
206+
nodeEnvironment
207+
);
204208
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
205209
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
206210
}

0 commit comments

Comments
 (0)