Skip to content

Threadpool merge executor is aware of available disk space #127613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
60fc8b8
watermark settings
albertzaharovits Apr 15, 2025
8718784
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Apr 24, 2025
253daa1
Check watermark limits only when the thread pool merge scheduler is e…
albertzaharovits Apr 24, 2025
365ae2d
initial merge disk space monitor
albertzaharovits Apr 29, 2025
0e6f7e2
move settings to MergeDiskSpaceMonitor
albertzaharovits Apr 29, 2025
27476ca
remove unused import
albertzaharovits Apr 29, 2025
b9b26fa
Nits
albertzaharovits Apr 29, 2025
1ddd3d3
Thrashy in-between
albertzaharovits May 1, 2025
94d27f9
DiskSpaceMonitor
albertzaharovits May 1, 2025
023f042
PriorityBlockingQueueWithMaxLimit
albertzaharovits May 1, 2025
ad6cf4c
Trimming code
albertzaharovits May 1, 2025
e507684
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 1, 2025
6805fa9
[CI] Auto commit changes from spotless
elasticsearchmachine May 1, 2025
33a0246
Fix compilation issue
albertzaharovits May 1, 2025
4c012cc
Fix TPMST
albertzaharovits May 1, 2025
6a6a759
Some test fixes
albertzaharovits May 2, 2025
2f2720d
[CI] Auto commit changes from spotless
elasticsearchmachine May 2, 2025
4f8a203
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 5, 2025
edcf6bf
Make settings follow the allocation ones
albertzaharovits May 5, 2025
fbc17ec
PriorityBlockingQueueWithBudget
albertzaharovits May 6, 2025
258b7ed
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 6, 2025
bfad0a0
ElementWithReleasableBudget
albertzaharovits May 6, 2025
56df5f7
DiskSpaceMonitor
albertzaharovits May 6, 2025
ab53108
Checkstyle
albertzaharovits May 6, 2025
af1b2ff
Nit
albertzaharovits May 7, 2025
cedf89f
Nits
albertzaharovits May 7, 2025
b1c2d7a
Almost
albertzaharovits May 7, 2025
76e4e45
Nit
albertzaharovits May 13, 2025
792e645
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 13, 2025
78f906c
[CI] Auto commit changes from spotless
elasticsearchmachine May 13, 2025
7e6f4f0
Fix test compilation
albertzaharovits May 13, 2025
5173f1e
[CI] Auto commit changes from spotless
elasticsearchmachine May 13, 2025
be3f126
Fix :x-pack:plugin:ccr:compileTestJava
albertzaharovits May 14, 2025
e60ab15
[CI] Auto commit changes from spotless
elasticsearchmachine May 14, 2025
9f6a607
ClusterSettings only
albertzaharovits May 14, 2025
1eed272
[CI] Auto commit changes from spotless
elasticsearchmachine May 14, 2025
24232ac
No estimatedMergeSize()
albertzaharovits May 15, 2025
afdbc34
Fix ThreadPoolMergeExecutorServiceTests
albertzaharovits May 15, 2025
98447ab
[CI] Auto commit changes from spotless
elasticsearchmachine May 15, 2025
7f902a9
Fix ThreadPoolMergeSchedulerTests
albertzaharovits May 16, 2025
127515e
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 16, 2025
df1590f
javadoc nits
albertzaharovits May 23, 2025
5b5e427
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 23, 2025
305e4db
[CI] Auto commit changes from spotless
elasticsearchmachine May 23, 2025
5f48156
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 26, 2025
5888a5d
Fix tests with wrap
albertzaharovits May 26, 2025
e704d2a
register no-op settings consumer so that settings validations works p…
albertzaharovits May 26, 2025
c9229ae
Settings nits
albertzaharovits May 27, 2025
199a15b
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 27, 2025
e6c8c08
Nit
albertzaharovits May 27, 2025
931af36
DefaultSettingsProvider
albertzaharovits May 27, 2025
0e4bab5
Disable fs available disk space feature for this test
albertzaharovits May 27, 2025
973bb2c
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 27, 2025
e0edb71
WIP refactor thread pool merge executor service
albertzaharovits May 28, 2025
1b767b6
Nit
albertzaharovits May 28, 2025
5940c72
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 28, 2025
de97f02
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 28, 2025
71976a9
nodeSpec.getVersion().onOrAfter("9.1.0")
albertzaharovits May 28, 2025
415e67e
startDiskSpaceMonitoring
albertzaharovits May 28, 2025
d11b1b8
Fix compilation
albertzaharovits May 28, 2025
fe96251
[CI] Auto commit changes from spotless
elasticsearchmachine May 28, 2025
631f9c0
testAvailableDiskSpaceMonitorSingleUpdateWithDefaultSettings
albertzaharovits May 29, 2025
4acdfcf
testAvailableDiskSpaceMonitorSettingsUpdate
albertzaharovits May 29, 2025
ffd34d2
[CI] Auto commit changes from spotless
elasticsearchmachine May 29, 2025
f647639
more testAvailableDiskSpaceMonitorSettingsUpdate
albertzaharovits May 29, 2025
12e3578
testMergeTaskPriorityAvailableBudgetTracking
albertzaharovits May 30, 2025
6a03079
[CI] Auto commit changes from spotless
elasticsearchmachine May 30, 2025
973c04e
testMergeTaskQueueBudgetTrackingWhenEstimatedRemainingMergeSizeChanges
albertzaharovits May 30, 2025
b2ecb82
Nit
albertzaharovits May 30, 2025
8f30793
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 30, 2025
0ee4599
testUnavailableBudgetBlocksEnqueuedMergeTasks
albertzaharovits May 30, 2025
08664f9
nit
albertzaharovits May 30, 2025
4a9a05a
Update docs/changelog/127613.yaml
albertzaharovits May 30, 2025
63eb905
[CI] Auto commit changes from spotless
elasticsearchmachine May 30, 2025
c247360
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 2, 2025
92596b0
PriorityBlockingQueueWithBudget.this.availableBudget
albertzaharovits Jun 2, 2025
759e269
Don't release budget when the merge task goes out of scope
albertzaharovits Jun 2, 2025
da310da
Fixes ThreadPoolMergeExecutorServiceTests
albertzaharovits Jun 2, 2025
09e511b
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 2, 2025
322c6c0
Fix testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
albertzaharovits Jun 2, 2025
a2710e4
try-with-resource
albertzaharovits Jun 2, 2025
69f340a
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 2, 2025
5644752
testUnavailableBudgetBlocksNewMer nits
albertzaharovits Jun 3, 2025
d1654e0
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 3, 2025
03b705e
testBackloggedMergeTasksDoNotHoldUpBudget
albertzaharovits Jun 3, 2025
fe9f7c8
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 4, 2025
aa50fa1
WIP
albertzaharovits Jun 4, 2025
d1f47b9
refactor ThreadPoolMergeExecutorServiceDiskSpaceTests to use statics
albertzaharovits Jun 5, 2025
e7a21c0
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 5, 2025
524d10d
testAbortingOrRunningMergeTaskHoldsUpBudget
albertzaharovits Jun 5, 2025
fb5edd7
testAbortingOrRunningMergeTaskHoldsUpBudget
albertzaharovits Jun 5, 2025
1cb69b5
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 5, 2025
ad62d61
testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable
albertzaharovits Jun 5, 2025
1b15479
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 5, 2025
fe08482
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 5, 2025
f4c962e
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 6, 2025
d07b32b
updateConsumer.accept(ByteSizeValue.MINUS_ONE)
albertzaharovits Jun 6, 2025
846c0f3
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 6, 2025
a2c3cc2
fix testAvailableDiskSpaceMonitorWhenFileSystemStatErrors
albertzaharovits Jun 6, 2025
aa7e08b
super(MergeTask::estimatedRemainingMergeSize, 0L)
albertzaharovits Jun 6, 2025
7751e75
Refactor ElementWithReleasableBudget
albertzaharovits Jun 6, 2025
d1f3570
Nit
albertzaharovits Jun 6, 2025
622f89f
Fix ThreadPoolMergeExecutorServiceDiskSpaceTests
albertzaharovits Jun 6, 2025
3c1caa3
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 6, 2025
46fcfac
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 7, 2025
ef11f9d
Remove indices.merge.disk.watermark.high in DefaultSettingsProvider
albertzaharovits Jun 8, 2025
009f3e3
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127613.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127613
summary: Threadpool merge executor is aware of available disk space
area: Engine
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.indices.IndexingMemoryController;
Expand Down Expand Up @@ -629,6 +630,9 @@ public void apply(Settings value, Settings current, Settings previous) {
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
16,
Comparator.comparingLong(MergeTask::estimatedMergeSize)
Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize)
);
private final Map<MergePolicy.OneMerge, MergeTask> runningMergeTasks = new HashMap<>();
// set when incoming merges should be throttled (i.e. restrict the indexing rate)
Expand Down Expand Up @@ -266,7 +266,7 @@ private void checkMergeTaskThrottling() {
// exposed for tests
// synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
synchronized Schedule schedule(MergeTask mergeTask) {
assert mergeTask.isRunning() == false;
assert mergeTask.hasStartedRunning() == false;
if (closed) {
// do not run or backlog tasks when closing the merge scheduler, instead abort them
return Schedule.ABORT;
Expand All @@ -280,6 +280,7 @@ synchronized Schedule schedule(MergeTask mergeTask) {
assert added : "starting merge task [" + mergeTask + "] registered as already running";
return Schedule.RUN;
} else {
assert mergeTask.hasStartedRunning() == false;
backloggedMergeTasks.add(mergeTask);
return Schedule.BACKLOG;
}
Expand Down Expand Up @@ -403,8 +404,14 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) {
this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac());
}

public boolean isRunning() {
return mergeStartTimeNS.get() > 0L;
/**
* Returns {@code true} if this task is currently running, or was run in the past.
* An aborted task (see {@link #abort()}) is considered as NOT run.
*/
public boolean hasStartedRunning() {
boolean isRunning = mergeStartTimeNS.get() > 0L;
assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L;
return isRunning;
}

/**
Expand All @@ -415,7 +422,7 @@ public boolean isRunning() {
*/
@Override
public void run() {
assert isRunning() == false;
assert hasStartedRunning() == false;
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
: "runNowOrBacklog must be invoked before actually running the merge task";
try {
Expand Down Expand Up @@ -480,7 +487,7 @@ public void run() {
* (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges.
*/
void abort() {
assert isRunning() == false;
assert hasStartedRunning() == false;
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false
: "cannot abort a merge task that's already running";
if (verbose()) {
Expand Down Expand Up @@ -509,10 +516,17 @@ void abort() {
}
}

long estimatedMergeSize() {
/**
* Before the merge task started running, this returns the estimated required disk space for the merge to complete
* (i.e. the estimated disk space size of the resulting segment following the merge).
* While the merge is running, the returned estimation is updated to take into account the data that's already been written.
* After the merge completes, the estimation returned here should ideally be close to "0".
*/
long estimatedRemainingMergeSize() {
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
}

public long getMergeMemoryEstimateBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,6 @@ protected void doStart() {
IndicesService(IndicesServiceBuilder builder) {
this.settings = builder.settings;
this.threadPool = builder.threadPool;
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings
);
this.pluginsService = builder.pluginsService;
this.nodeEnv = builder.nodeEnv;
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
Expand All @@ -321,6 +317,11 @@ protected void doStart() {
this.bigArrays = builder.bigArrays;
this.scriptService = builder.scriptService;
this.clusterService = builder.clusterService;
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
clusterService.getClusterSettings(),
nodeEnv
);
this.projectResolver = builder.projectResolver;
this.client = builder.client;
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -368,7 +369,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache
indicesQueryCache,
threadPoolMergeExecutorService
);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,17 @@ public void setUp() throws Exception {
emptyMap()
);
threadPool = new TestThreadPool("test");
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
circuitBreakerService = new NoneCircuitBreakerService();
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
clusterService = ClusterServiceUtils.createClusterService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings));
nodeEnvironment = new NodeEnvironment(settings, environment);
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
clusterService.getClusterSettings(),
nodeEnvironment
);
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
}
Expand Down
Loading
Loading