Skip to content

Commit 0e1dd15

Browse files
[9.0] New threadpool-based merge scheduler that is disk space aware (elastic#129134)
This is the backport of a few PRs related to the implementation of the new threadpool-based merge scheduler. The new merge scheduler uses a node-level threadpool (sized the number of CPU cores) to execute all the merges across all the shards on the node, limiting the amount of concurrently executing merges, irrespective of the number of shards that the node hosts. Smaller merges continue to have priority over larger ones. In addition, the new merge scheduler implementation also monitors the available disk space on the node, so that it won't start executing any new merges when the available disk space becomes scarce (the used disk space gets above the indices.merge.disk.watermark.high (95%) limit (same as the the allocation flood stage (the limit that flips shards on the node to read-only))). The new merge scheduler is now enabled by default (indices.merge.scheduler.use_thread_pool is true).
1 parent 76829db commit 0e1dd15

File tree

37 files changed

+5068
-96
lines changed

37 files changed

+5068
-96
lines changed

docs/changelog/120869.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120869
2+
summary: Threadpool merge scheduler
3+
area: Engine
4+
type: feature
5+
issues: []

docs/changelog/127613.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127613
2+
summary: Threadpool merge executor is aware of available disk space
3+
area: Engine
4+
type: feature
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,40 @@
88
*/
99
package org.elasticsearch.index.engine;
1010

11+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
1113
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1214
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1315
import org.elasticsearch.action.bulk.BulkResponse;
1416
import org.elasticsearch.action.index.IndexRequest;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.index.query.QueryBuilders;
1519
import org.elasticsearch.test.ESIntegTestCase;
1620
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
1721
import org.elasticsearch.test.ESIntegTestCase.Scope;
22+
import org.elasticsearch.threadpool.ThreadPool;
1823

1924
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
25+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
2026
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
2127
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
2228
import static org.hamcrest.Matchers.equalTo;
2329
import static org.hamcrest.Matchers.lessThan;
2430
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2531

26-
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE)
32+
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0, scope = Scope.TEST)
2733
public class InternalEngineMergeIT extends ESIntegTestCase {
2834

35+
private boolean useThreadPoolMerging;
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
39+
useThreadPoolMerging = randomBoolean();
40+
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
41+
settings.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), useThreadPoolMerging);
42+
return settings.build();
43+
}
44+
2945
public void testMergesHappening() throws Exception {
3046
final int numOfShards = randomIntBetween(1, 5);
3147
// some settings to keep num segments low
@@ -83,4 +99,60 @@ public void testMergesHappening() throws Exception {
8399
assertThat(count, lessThanOrEqualTo(upperNumberSegments));
84100
}
85101

102+
public void testMergesUseTheMergeThreadPool() throws Exception {
103+
final String indexName = randomIdentifier();
104+
createIndex(indexName, indexSettings(randomIntBetween(1, 3), 0).build());
105+
long id = 0;
106+
final int minMerges = randomIntBetween(1, 5);
107+
long totalDocs = 0;
108+
109+
while (true) {
110+
int docs = randomIntBetween(100, 200);
111+
totalDocs += docs;
112+
113+
BulkRequestBuilder request = client().prepareBulk();
114+
for (int j = 0; j < docs; ++j) {
115+
request.add(
116+
new IndexRequest(indexName).id(Long.toString(id++))
117+
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
118+
);
119+
}
120+
BulkResponse response = request.get();
121+
assertNoFailures(response);
122+
refresh(indexName);
123+
124+
var mergesResponse = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get();
125+
var primaries = mergesResponse.getIndices().get(indexName).getPrimaries();
126+
if (primaries.merge.getTotal() >= minMerges) {
127+
break;
128+
}
129+
}
130+
131+
forceMerge();
132+
refresh(indexName);
133+
134+
// after a force merge there should only be 1 segment per shard
135+
var shardsWithMultipleSegments = getShardSegments().stream()
136+
.filter(shardSegments -> shardSegments.getSegments().size() > 1)
137+
.toList();
138+
assertTrue("there are shards with multiple segments " + shardsWithMultipleSegments, shardsWithMultipleSegments.isEmpty());
139+
140+
final long expectedTotalDocs = totalDocs;
141+
assertHitCount(prepareSearch(indexName).setQuery(QueryBuilders.matchAllQuery()).setTrackTotalHits(true), expectedTotalDocs);
142+
143+
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats(indexName).setMerge(true).get();
144+
long mergeCount = indicesStats.getIndices().get(indexName).getPrimaries().merge.getTotal();
145+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
146+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
147+
148+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
149+
if (useThreadPoolMerging) {
150+
assertThat(
151+
nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().get().completed(),
152+
equalTo(mergeCount)
153+
);
154+
} else {
155+
assertTrue(nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().isEmpty());
156+
}
157+
}
86158
}

0 commit comments

Comments
 (0)