Skip to content

Commit 7751e75

Browse files
Refactor ElementWithReleasableBudget
1 parent aa7e08b commit 7751e75

File tree

2 files changed

+54
-34
lines changed

2 files changed

+54
-34
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ MergeTask peekQueue() {
571571
static class PriorityBlockingQueueWithBudget<E> {
572572
private final ToLongFunction<? super E> budgetFunction;
573573
protected final PriorityQueue<E> enqueuedByBudget;
574-
private final IdentityHashMap<Wrap<E>, Long> unreleasedBudgetPerElement;
574+
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
575575
private final ReentrantLock lock;
576576
private final Condition elementAvailable;
577577
protected long availableBudget;
@@ -612,7 +612,7 @@ ElementWithReleasableBudget take() throws InterruptedException {
612612
elementAvailable.await();
613613
}
614614
// deducts and holds up that element's budget from the available budget
615-
return new ElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
615+
return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
616616
} finally {
617617
lock.unlock();
618618
}
@@ -647,23 +647,40 @@ int queueSize() {
647647
return enqueuedByBudget.size();
648648
}
649649

650+
private ElementWithReleasableBudget newElementWithReleasableBudget(E element, long budget) {
651+
ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element);
652+
assert this.lock.isHeldByCurrentThread();
653+
// the taken element holds up some budget
654+
var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget);
655+
assert prev == null;
656+
this.availableBudget -= budget;
657+
assert this.availableBudget >= 0L;
658+
return elementWithReleasableBudget;
659+
}
660+
661+
private void release(ElementWithReleasableBudget elementWithReleasableBudget) {
662+
final ReentrantLock lock = this.lock;
663+
lock.lock();
664+
try {
665+
assert elementWithReleasableBudget.isClosed() == false;
666+
// when the taken element is not used anymore, it will not influence subsequent computations for available budget,
667+
// but its allotted budget is not yet released
668+
var val = unreleasedBudgetPerElement.remove(elementWithReleasableBudget);
669+
assert val != null;
670+
} finally {
671+
lock.unlock();
672+
}
673+
}
674+
675+
private boolean isReleased(ElementWithReleasableBudget elementWithReleasableBudget) {
676+
return unreleasedBudgetPerElement.containsKey(elementWithReleasableBudget) == false;
677+
}
678+
650679
class ElementWithReleasableBudget implements Releasable {
651-
private final Wrap<E> wrappedElement;
652-
653-
private ElementWithReleasableBudget(E element, long budget) {
654-
// Wrap the element in a brand-new instance that's used as the key in the
655-
// {@link PriorityBlockingQueueWithBudget#unreleasedBudgetPerElement} identity map.
656-
// This allows the same exact "element" instance to hold budgets multiple times concurrently.
657-
// This way we allow to re-enqueue and re-take an element before a previous take completed and
658-
// released the budget.
659-
this.wrappedElement = new Wrap<>(element);
660-
assert PriorityBlockingQueueWithBudget.this.lock.isHeldByCurrentThread();
661-
// the taken element holds up some budget
662-
var prev = PriorityBlockingQueueWithBudget.this.unreleasedBudgetPerElement.put(wrappedElement, budget);
663-
assert prev == null;
664-
assert isClosed() == false;
665-
PriorityBlockingQueueWithBudget.this.availableBudget -= budget;
666-
assert PriorityBlockingQueueWithBudget.this.availableBudget >= 0L;
680+
private final E element;
681+
682+
private ElementWithReleasableBudget(E element) {
683+
this.element = element;
667684
}
668685

669686
/**
@@ -673,27 +690,17 @@ private ElementWithReleasableBudget(E element, long budget) {
673690
*/
674691
@Override
675692
public void close() {
676-
final ReentrantLock lock = PriorityBlockingQueueWithBudget.this.lock;
677-
lock.lock();
678-
try {
679-
assert isClosed() == false;
680-
// when the taken element is not used anymore, it will not influence subsequent available budget computations
681-
unreleasedBudgetPerElement.remove(wrappedElement);
682-
} finally {
683-
lock.unlock();
684-
}
693+
PriorityBlockingQueueWithBudget.this.release(this);
685694
}
686695

687696
boolean isClosed() {
688-
return unreleasedBudgetPerElement.containsKey(wrappedElement) == false;
697+
return PriorityBlockingQueueWithBudget.this.isReleased(this);
689698
}
690699

691700
E element() {
692-
return wrappedElement.element();
701+
return element;
693702
}
694703
}
695-
696-
private record Wrap<E>(E element) {}
697704
}
698705

699706
private static long newTargetIORateBytesPerSec(

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.test.ESTestCase;
2727
import org.elasticsearch.threadpool.TestThreadPool;
2828
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.junit.After;
2930
import org.junit.AfterClass;
3031
import org.junit.BeforeClass;
3132

@@ -36,9 +37,7 @@
3637
import java.nio.file.attribute.FileAttributeView;
3738
import java.nio.file.attribute.FileStoreAttributeView;
3839
import java.nio.file.spi.FileSystemProvider;
39-
import java.util.ArrayDeque;
4040
import java.util.ArrayList;
41-
import java.util.Deque;
4241
import java.util.IdentityHashMap;
4342
import java.util.LinkedHashSet;
4443
import java.util.List;
@@ -104,6 +103,11 @@ public static void removeMockUsableSpaceFS() {
104103
nodeEnvironment.close();
105104
}
106105

106+
@After
107+
public void cleanupThreadPool() {
108+
testThreadPool.scheduledTasks.clear();
109+
}
110+
107111
static class CapturingThreadPool extends TestThreadPool {
108112
final List<Tuple<TimeValue, Cancellable>> scheduledTasks = new ArrayList<>();
109113

@@ -295,14 +299,23 @@ public void testDiskSpaceMonitorStartsAsDisabled() throws Exception {
295299
assertThat(availableDiskSpaceUpdates.size(), is(1));
296300
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(Long.MAX_VALUE));
297301
// updating monitoring interval should enable the monitor
298-
String intervalSettingValue = randomFrom("1s", "123ms", "5ns", "2h");
302+
String intervalSettingValue = randomFrom("1s", "123ms", "5nanos", "2h");
299303
clusterSettings.applySettings(
300304
Settings.builder()
301305
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), intervalSettingValue)
302306
.build()
303307
);
304308
assertThat(diskSpacePeriodicMonitor.isScheduled(), is(true));
305309
assertThat(testThreadPool.scheduledTasks.size(), is(1));
310+
assertThat(
311+
testThreadPool.scheduledTasks.getLast().v1(),
312+
is(
313+
TimeValue.parseTimeValue(
314+
intervalSettingValue,
315+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey()
316+
)
317+
)
318+
);
306319
}
307320
aFileStore.throwIoException = false;
308321
bFileStore.throwIoException = false;

0 commit comments

Comments
 (0)