Skip to content

Threadpool merge executor is aware of available disk space #127613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
60fc8b8
watermark settings
albertzaharovits Apr 15, 2025
8718784
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Apr 24, 2025
253daa1
Check watermark limits only when the thread pool merge scheduler is e…
albertzaharovits Apr 24, 2025
365ae2d
initial merge disk space monitor
albertzaharovits Apr 29, 2025
0e6f7e2
move settings to MergeDiskSpaceMonitor
albertzaharovits Apr 29, 2025
27476ca
remove unused import
albertzaharovits Apr 29, 2025
b9b26fa
Nits
albertzaharovits Apr 29, 2025
1ddd3d3
Thrashy in-between
albertzaharovits May 1, 2025
94d27f9
DiskSpaceMonitor
albertzaharovits May 1, 2025
023f042
PriorityBlockingQueueWithMaxLimit
albertzaharovits May 1, 2025
ad6cf4c
Trimming code
albertzaharovits May 1, 2025
e507684
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 1, 2025
6805fa9
[CI] Auto commit changes from spotless
elasticsearchmachine May 1, 2025
33a0246
Fix compilation issue
albertzaharovits May 1, 2025
4c012cc
Fix TPMST
albertzaharovits May 1, 2025
6a6a759
Some test fixes
albertzaharovits May 2, 2025
2f2720d
[CI] Auto commit changes from spotless
elasticsearchmachine May 2, 2025
4f8a203
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 5, 2025
edcf6bf
Make settings follow the allocation ones
albertzaharovits May 5, 2025
fbc17ec
PriorityBlockingQueueWithBudget
albertzaharovits May 6, 2025
258b7ed
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 6, 2025
bfad0a0
ElementWithReleasableBudget
albertzaharovits May 6, 2025
56df5f7
DiskSpaceMonitor
albertzaharovits May 6, 2025
ab53108
Checkstyle
albertzaharovits May 6, 2025
af1b2ff
Nit
albertzaharovits May 7, 2025
cedf89f
Nits
albertzaharovits May 7, 2025
b1c2d7a
Almost
albertzaharovits May 7, 2025
76e4e45
Nit
albertzaharovits May 13, 2025
792e645
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 13, 2025
78f906c
[CI] Auto commit changes from spotless
elasticsearchmachine May 13, 2025
7e6f4f0
Fix test compilation
albertzaharovits May 13, 2025
5173f1e
[CI] Auto commit changes from spotless
elasticsearchmachine May 13, 2025
be3f126
Fix :x-pack:plugin:ccr:compileTestJava
albertzaharovits May 14, 2025
e60ab15
[CI] Auto commit changes from spotless
elasticsearchmachine May 14, 2025
9f6a607
ClusterSettings only
albertzaharovits May 14, 2025
1eed272
[CI] Auto commit changes from spotless
elasticsearchmachine May 14, 2025
24232ac
No estimatedMergeSize()
albertzaharovits May 15, 2025
afdbc34
Fix ThreadPoolMergeExecutorServiceTests
albertzaharovits May 15, 2025
98447ab
[CI] Auto commit changes from spotless
elasticsearchmachine May 15, 2025
7f902a9
Fix ThreadPoolMergeSchedulerTests
albertzaharovits May 16, 2025
127515e
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 16, 2025
df1590f
javadoc nits
albertzaharovits May 23, 2025
5b5e427
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 23, 2025
305e4db
[CI] Auto commit changes from spotless
elasticsearchmachine May 23, 2025
5f48156
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 26, 2025
5888a5d
Fix tests with wrap
albertzaharovits May 26, 2025
e704d2a
register no-op settings consumer so that settings validations works p…
albertzaharovits May 26, 2025
c9229ae
Settings nits
albertzaharovits May 27, 2025
199a15b
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 27, 2025
e6c8c08
Nit
albertzaharovits May 27, 2025
931af36
DefaultSettingsProvider
albertzaharovits May 27, 2025
0e4bab5
Disable fs available disk space feature for this test
albertzaharovits May 27, 2025
973bb2c
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 27, 2025
e0edb71
WIP refactor thread pool merge executor service
albertzaharovits May 28, 2025
1b767b6
Nit
albertzaharovits May 28, 2025
5940c72
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 28, 2025
de97f02
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 28, 2025
71976a9
nodeSpec.getVersion().onOrAfter("9.1.0")
albertzaharovits May 28, 2025
415e67e
startDiskSpaceMonitoring
albertzaharovits May 28, 2025
d11b1b8
Fix compilation
albertzaharovits May 28, 2025
fe96251
[CI] Auto commit changes from spotless
elasticsearchmachine May 28, 2025
631f9c0
testAvailableDiskSpaceMonitorSingleUpdateWithDefaultSettings
albertzaharovits May 29, 2025
4acdfcf
testAvailableDiskSpaceMonitorSettingsUpdate
albertzaharovits May 29, 2025
ffd34d2
[CI] Auto commit changes from spotless
elasticsearchmachine May 29, 2025
f647639
more testAvailableDiskSpaceMonitorSettingsUpdate
albertzaharovits May 29, 2025
12e3578
testMergeTaskPriorityAvailableBudgetTracking
albertzaharovits May 30, 2025
6a03079
[CI] Auto commit changes from spotless
elasticsearchmachine May 30, 2025
973c04e
testMergeTaskQueueBudgetTrackingWhenEstimatedRemainingMergeSizeChanges
albertzaharovits May 30, 2025
b2ecb82
Nit
albertzaharovits May 30, 2025
8f30793
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 30, 2025
0ee4599
testUnavailableBudgetBlocksEnqueuedMergeTasks
albertzaharovits May 30, 2025
08664f9
nit
albertzaharovits May 30, 2025
4a9a05a
Update docs/changelog/127613.yaml
albertzaharovits May 30, 2025
63eb905
[CI] Auto commit changes from spotless
elasticsearchmachine May 30, 2025
c247360
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 2, 2025
92596b0
PriorityBlockingQueueWithBudget.this.availableBudget
albertzaharovits Jun 2, 2025
759e269
Don't release budget when the merge task goes out of scope
albertzaharovits Jun 2, 2025
da310da
Fixes ThreadPoolMergeExecutorServiceTests
albertzaharovits Jun 2, 2025
09e511b
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 2, 2025
322c6c0
Fix testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
albertzaharovits Jun 2, 2025
a2710e4
try-with-resource
albertzaharovits Jun 2, 2025
69f340a
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 2, 2025
5644752
testUnavailableBudgetBlocksNewMer nits
albertzaharovits Jun 3, 2025
d1654e0
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 3, 2025
03b705e
testBackloggedMergeTasksDoNotHoldUpBudget
albertzaharovits Jun 3, 2025
fe9f7c8
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 4, 2025
aa50fa1
WIP
albertzaharovits Jun 4, 2025
d1f47b9
refactor ThreadPoolMergeExecutorServiceDiskSpaceTests to use statics
albertzaharovits Jun 5, 2025
e7a21c0
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 5, 2025
524d10d
testAbortingOrRunningMergeTaskHoldsUpBudget
albertzaharovits Jun 5, 2025
fb5edd7
testAbortingOrRunningMergeTaskHoldsUpBudget
albertzaharovits Jun 5, 2025
1cb69b5
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 5, 2025
ad62d61
testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable
albertzaharovits Jun 5, 2025
1b15479
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 5, 2025
fe08482
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 5, 2025
f4c962e
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 6, 2025
d07b32b
updateConsumer.accept(ByteSizeValue.MINUS_ONE)
albertzaharovits Jun 6, 2025
846c0f3
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 6, 2025
a2c3cc2
fix testAvailableDiskSpaceMonitorWhenFileSystemStatErrors
albertzaharovits Jun 6, 2025
aa7e08b
super(MergeTask::estimatedRemainingMergeSize, 0L)
albertzaharovits Jun 6, 2025
7751e75
Refactor ElementWithReleasableBudget
albertzaharovits Jun 6, 2025
d1f3570
Nit
albertzaharovits Jun 6, 2025
622f89f
Fix ThreadPoolMergeExecutorServiceDiskSpaceTests
albertzaharovits Jun 6, 2025
3c1caa3
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 6, 2025
46fcfac
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 7, 2025
ef11f9d
Remove indices.merge.disk.watermark.high in DefaultSettingsProvider
albertzaharovits Jun 8, 2025
009f3e3
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Jun 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.indices.IndexingMemoryController;
Expand Down Expand Up @@ -628,6 +629,9 @@ public void apply(Settings value, Settings current, Settings previous) {
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ long estimatedMergeSize() {
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
}

long estimatedRemainingMergeSize() {
return Math.max(0L, estimatedMergeSize() - rateLimiter.getTotalBytesWritten());
}

public long getMergeMemoryEstimateBytes() {
return mergeMemoryEstimateBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ protected void doStart() {
IndicesService(IndicesServiceBuilder builder) {
this.settings = builder.settings;
this.threadPool = builder.threadPool;
this.pluginsService = builder.pluginsService;
this.nodeEnv = builder.nodeEnv;
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings
settings,
nodeEnv
);
this.pluginsService = builder.pluginsService;
this.nodeEnv = builder.nodeEnv;
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
.withRegistry(builder.xContentRegistry);
this.valuesSourceRegistry = builder.valuesSourceRegistry;
Expand Down Expand Up @@ -366,7 +367,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache
indicesQueryCache,
threadPoolMergeExecutorService
);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,17 @@ public void setUp() throws Exception {
emptyMap()
);
threadPool = new TestThreadPool("test");
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
circuitBreakerService = new NoneCircuitBreakerService();
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
nodeEnvironment = new NodeEnvironment(settings, environment);
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings,
nodeEnvironment
);
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
import org.elasticsearch.index.merge.OnGoingMerge;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -56,9 +58,13 @@

public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {

public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() {
TestThreadPool testThreadPool = new TestThreadPool("test");
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() throws IOException {
TestThreadPool testThreadPool = new TestThreadPool("test", Settings.EMPTY);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
// shutdown the thread pool
testThreadPool.shutdown();
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -80,7 +86,11 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
var countingListener = new CountingMergeEventListener();
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Expand Down Expand Up @@ -191,7 +201,11 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Semaphore runMergeSemaphore = new Semaphore(0);
AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger();
Expand Down Expand Up @@ -271,7 +285,11 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Semaphore runMergeSemaphore = new Semaphore(0);
Expand Down Expand Up @@ -333,38 +351,39 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
}
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(0, 250);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(750, 1000);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(250, 750);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

private void testIORateAdjustedForSubmittedTasks(
int totalTasksToSubmit,
int submittedVsExecutedRateOutOf1000,
int initialTasksToSubmit
) {
private void testIORateAdjustedForSubmittedTasks(int totalTasksToSubmit, int submittedVsExecutedRateOutOf1000, int initialTasksToSubmit)
throws IOException {
DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue();
ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
mergeExecutorThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
final AtomicInteger currentlySubmittedMergeTaskCount = new AtomicInteger();
final AtomicLong targetIORateLimit = new AtomicLong(ThreadPoolMergeExecutorService.START_IO_RATE.getBytes());
final AtomicReference<MergeTask> lastRunTask = new AtomicReference<>();
Expand Down Expand Up @@ -424,7 +443,11 @@ public void testMergeTasksRunConcurrently() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
// more merge tasks than max concurrent merges allowed to run concurrently
int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
Expand Down Expand Up @@ -465,7 +488,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeExecutorThreadCount));
// with the other merge tasks enqueued
assertThat(
threadPoolMergeExecutorService.getQueuedMergeTasks().size(),
threadPoolMergeExecutorService.getMergeTasksQueueLength(),
is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount)
);
// also check thread-pool stats for the same
Expand All @@ -485,7 +508,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
// there are fewer available merges than available threads
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergeTasksCount));
// no more merges enqueued
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
// also check thread-pool stats for the same
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount));
assertThat(threadPoolExecutor.getQueue().size(), is(0));
Expand All @@ -504,7 +527,11 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
int totalMergeTasksCount = randomIntBetween(1, 10);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Expand Down Expand Up @@ -533,7 +560,7 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
assertThat(threadPoolExecutor.getActiveCount(), is(backloggedMergeTasksList.size()));
assertThat(threadPoolExecutor.getQueue().size(), is(0));
}
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
});
// re-enqueue backlogged merge tasks
for (MergeTask backloggedMergeTask : backloggedMergeTasksList) {
Expand All @@ -557,7 +584,11 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
// many merge tasks concurrently
int mergeTaskCount = randomIntBetween(10, 100);
Expand Down Expand Up @@ -613,10 +644,14 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
}
}

public void testMergeTasksExecuteInSizeOrder() {
public void testMergeTasksExecuteInSizeOrder() throws IOException {
DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue();
ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
mergeExecutorThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue();
int mergeTaskCount = randomIntBetween(10, 100);
// sort merge tasks available to run by size
Expand Down Expand Up @@ -696,13 +731,21 @@ public void onMergeAborted(OnGoingMerge merge) {
}
}

static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(
ThreadPool threadPool,
Settings settings,
NodeEnvironment nodeEnvironment
) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
.maybeCreateThreadPoolMergeExecutorService(
threadPool,
randomBoolean()
? Settings.EMPTY
: Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build()
? settings
: Settings.builder()
.put(settings)
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
.build(),
nodeEnvironment
);
assertNotNull(threadPoolMergeExecutorService);
assertTrue(threadPoolMergeExecutorService.allDone());
Expand Down
Loading