Skip to content

Commit 3be488b

Browse files
authored
Re-implement ParallelWhere using JobScheduler semantics. (deephaven#5032)
1 parent 68f9350 commit 3be488b

File tree

7 files changed

+448
-751
lines changed

7 files changed

+448
-751
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java

+221-317
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -1,109 +1,44 @@
11
package io.deephaven.engine.table.impl;
22

3-
import io.deephaven.base.verify.Assert;
43
import io.deephaven.engine.context.ExecutionContext;
5-
import io.deephaven.engine.exceptions.CancellationException;
64
import io.deephaven.engine.rowset.RowSet;
75
import io.deephaven.engine.table.ModifiedColumnSet;
86
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
97
import io.deephaven.engine.table.impl.select.WhereFilter;
10-
import io.deephaven.engine.updategraph.NotificationQueue;
11-
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
12-
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
13-
14-
import java.util.*;
15-
import java.util.concurrent.atomic.AtomicBoolean;
8+
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
9+
import io.deephaven.engine.table.impl.util.JobScheduler;
10+
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
1611

1712
/**
1813
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
1914
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
2015
*/
2116
class InitialFilterExecution extends AbstractFilterExecution {
22-
private final QueryTable sourceTable;
23-
private final boolean permitParallelization;
24-
private final int segmentCount;
25-
private final WhereFilter[] filters;
26-
27-
/**
28-
* The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children
29-
* allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an
30-
* error has occurred.
31-
*/
32-
private final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> pendingSatisfaction;
33-
private final Map<Thread, Thread> runningChildren;
34-
private final AtomicBoolean cancelled;
3517

36-
/**
37-
* The SubEntry lets us track query performance for the split jobs.
38-
*/
39-
private BasePerformanceEntry basePerformanceEntry;
18+
private final int segmentCount;
19+
private final boolean permitParallelization;
4020

41-
/**
42-
* The InitialFilterExecution that represents all the work we are doing for this table.
43-
*/
44-
private final InitialFilterExecution root;
21+
private final JobScheduler jobScheduler;
4522

4623
InitialFilterExecution(
4724
final QueryTable sourceTable,
4825
final WhereFilter[] filters,
4926
final RowSet addedInput,
50-
final long addStart,
51-
final long addEnd,
52-
final InitialFilterExecution parent,
53-
final int filterIndex,
5427
final boolean usePrev) {
55-
super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false,
56-
ModifiedColumnSet.ALL, filterIndex);
57-
this.sourceTable = sourceTable;
58-
permitParallelization = permitParallelization(filters);
59-
this.filters = filters;
60-
if (parent == null) {
61-
pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
62-
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
63-
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
64-
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
65-
: QueryTable.PARALLEL_WHERE_SEGMENTS;
66-
runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
67-
cancelled = new AtomicBoolean(false);
68-
this.root = this;
28+
super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL);
29+
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
30+
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
31+
: QueryTable.PARALLEL_WHERE_SEGMENTS;
32+
permitParallelization = permitParallelization(filters)
33+
&& !QueryTable.DISABLE_PARALLEL_WHERE
34+
&& segmentCount > 1
35+
&& ExecutionContext.getContext().getOperationInitializer().canParallelize();
36+
37+
// If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler.
38+
if (permitParallelization) {
39+
jobScheduler = new OperationInitializerJobScheduler();
6940
} else {
70-
pendingSatisfaction = parent.pendingSatisfaction;
71-
segmentCount = parent.segmentCount;
72-
this.root = parent.root;
73-
runningChildren = null;
74-
cancelled = null;
75-
}
76-
}
77-
78-
@Override
79-
void enqueueSubFilters(
80-
List<AbstractFilterExecution> subFilters,
81-
AbstractFilterExecution.CombinationNotification combinationNotification) {
82-
synchronized (pendingSatisfaction) {
83-
enqueueJobs(subFilters);
84-
pendingSatisfaction.offer(combinationNotification);
85-
}
86-
}
87-
88-
private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
89-
for (NotificationQueue.Notification notification : subFilters) {
90-
ExecutionContext.getContext().getOperationInitializer().submit(() -> {
91-
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
92-
try {
93-
if (!root.cancelled.get()) {
94-
notification.run();
95-
} else {
96-
// we must ensure that we, the parent InitialFilterExecution, are notified of completion
97-
onChildCompleted();
98-
}
99-
if (Thread.interrupted()) {
100-
// we would like to throw a query cancellation exception
101-
exceptionResult = new CancellationException("thread interrupted");
102-
}
103-
} finally {
104-
root.runningChildren.remove(Thread.currentThread());
105-
}
106-
});
41+
jobScheduler = ImmediateJobScheduler.INSTANCE;
10742
}
10843
}
10944

@@ -113,70 +48,16 @@ int getTargetSegments() {
11348
}
11449

11550
@Override
116-
boolean doParallelization(long numberOfRows) {
117-
return permitParallelization
118-
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()
119-
&& doParallelizationBase(numberOfRows);
120-
}
121-
122-
@Override
123-
void handleUncaughtException(Exception throwable) {
124-
throw new UnsupportedOperationException(throwable);
125-
}
126-
127-
@Override
128-
void accumulatePerformanceEntry(BasePerformanceEntry entry) {
129-
synchronized (root) {
130-
if (root.basePerformanceEntry != null) {
131-
root.basePerformanceEntry.accumulate(entry);
132-
} else {
133-
root.basePerformanceEntry = entry;
134-
}
135-
}
136-
}
137-
138-
/**
139-
* Run any satisfied jobs in the pendingSatisfaction list.
140-
*/
141-
@Override
142-
void onNoChildren() {
143-
final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> satisfied = new IntrusiveDoublyLinkedQueue<>(
144-
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
145-
synchronized (pendingSatisfaction) {
146-
for (final Iterator<NotificationQueue.Notification> it = pendingSatisfaction.iterator(); it.hasNext();) {
147-
final NotificationQueue.Notification notification = it.next();
148-
if (notification.canExecute(0)) {
149-
satisfied.offer(notification);
150-
it.remove();
151-
}
152-
}
153-
}
154-
if (satisfied.isEmpty()) {
155-
return;
156-
}
157-
satisfied.forEach(NotificationQueue.Notification::run);
51+
JobScheduler jobScheduler() {
52+
return jobScheduler;
15853
}
15954

16055
@Override
161-
InitialFilterExecution makeChild(
162-
final RowSet addedInput,
163-
final long addStart,
164-
final long addEnd,
165-
final RowSet modifyInput,
166-
final long modifyStart,
167-
final long modifyEnd,
168-
final int filterIndex) {
169-
Assert.eqNull(modifyInput, "modifyInput");
170-
return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex,
171-
usePrev);
56+
boolean permitParallelization() {
57+
return permitParallelization;
17258
}
17359

17460
BasePerformanceEntry getBasePerformanceEntry() {
17561
return basePerformanceEntry;
17662
}
177-
178-
void setCancelled() {
179-
cancelled.set(true);
180-
runningChildren.forEach((thread, ignored) -> thread.interrupt());
181-
}
18263
}

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

+42-49
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.deephaven.configuration.Configuration;
2929
import io.deephaven.datastructures.util.CollectionUtil;
3030
import io.deephaven.engine.context.ExecutionContext;
31+
import io.deephaven.engine.table.impl.util.*;
3132
import io.deephaven.engine.updategraph.UpdateGraph;
3233
import io.deephaven.engine.exceptions.CancellationException;
3334
import io.deephaven.engine.liveness.LivenessScope;
@@ -48,17 +49,14 @@
4849
import io.deephaven.engine.table.impl.select.MatchPairFactory;
4950
import io.deephaven.engine.table.impl.select.SelectColumnFactory;
5051
import io.deephaven.engine.table.impl.updateby.UpdateBy;
51-
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
52-
import io.deephaven.engine.table.impl.util.JobScheduler;
53-
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
5452
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
55-
import io.deephaven.engine.table.impl.util.FieldUtils;
5653
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
5754
import io.deephaven.engine.table.iterators.*;
5855
import io.deephaven.engine.updategraph.DynamicNode;
5956
import io.deephaven.engine.util.*;
6057
import io.deephaven.engine.util.systemicmarking.SystemicObject;
6158
import io.deephaven.util.annotations.InternalUseOnly;
59+
import io.deephaven.util.annotations.ReferentialIntegrity;
6260
import io.deephaven.vector.Vector;
6361
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
6462
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
@@ -982,6 +980,9 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
982980
private boolean refilterUnmatchedRequested = false;
983981
private MergedListener whereListener;
984982

983+
@ReferentialIntegrity
984+
Runnable delayedErrorReference;
985+
985986
public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
986987
super(source.getDefinition(), currentMapping, source.columns, null, null);
987988
this.source = source;
@@ -1066,9 +1067,10 @@ void doRefilter(
10661067

10671068
if (refilterMatchedRequested && refilterUnmatchedRequested) {
10681069
final WhereListener.ListenerFilterExecution filterExecution =
1069-
listener.makeFilterExecution(source.getRowSet().copy());
1070+
listener.makeRefilterExecution(source.getRowSet().copy());
10701071
filterExecution.scheduleCompletion(
1071-
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
1072+
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
1073+
exception -> errorRefilterUpdate(listener, exception, upstream));
10721074
refilterMatchedRequested = refilterUnmatchedRequested = false;
10731075
} else if (refilterUnmatchedRequested) {
10741076
// things that are added or removed are already reflected in source.getRowSet
@@ -1078,9 +1080,9 @@ void doRefilter(
10781080
unmatchedRows.insert(upstream.modified());
10791081
}
10801082
final RowSet unmatched = unmatchedRows.copy();
1081-
final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched);
1082-
filterExecution.scheduleCompletion(fe -> {
1083-
final WritableRowSet newMapping = fe.addedResult;
1083+
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
1084+
filterExecution.scheduleCompletion((adds, mods) -> {
1085+
final WritableRowSet newMapping = adds.writableCast();
10841086
// add back what we previously matched, but for modifications and removals
10851087
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
10861088
if (upstream != null) {
@@ -1089,8 +1091,8 @@ void doRefilter(
10891091
}
10901092
newMapping.insert(previouslyMatched);
10911093
}
1092-
completeRefilterUpdate(listener, upstream, update, fe.addedResult);
1093-
});
1094+
completeRefilterUpdate(listener, upstream, update, adds);
1095+
}, exception -> errorRefilterUpdate(listener, exception, upstream));
10941096
refilterUnmatchedRequested = false;
10951097
} else if (refilterMatchedRequested) {
10961098
// we need to take removed rows out of our rowSet so we do not read them, and also examine added or
@@ -1103,9 +1105,10 @@ void doRefilter(
11031105
final RowSet matchedClone = matchedRows.copy();
11041106

11051107
final WhereListener.ListenerFilterExecution filterExecution =
1106-
listener.makeFilterExecution(matchedClone);
1108+
listener.makeRefilterExecution(matchedClone);
11071109
filterExecution.scheduleCompletion(
1108-
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
1110+
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
1111+
exception -> errorRefilterUpdate(listener, exception, upstream));
11091112
refilterMatchedRequested = false;
11101113
} else {
11111114
throw new IllegalStateException("Refilter called when a refilter was not requested!");
@@ -1141,11 +1144,24 @@ private void completeRefilterUpdate(
11411144
update.shifted = upstream == null ? RowSetShiftData.EMPTY : upstream.shifted();
11421145

11431146
notifyListeners(update);
1144-
if (upstream != null) {
1145-
upstream.release();
1146-
}
11471147

1148-
listener.setFinalExecutionStep();
1148+
// Release the upstream update and set the final notification step.
1149+
listener.finalizeUpdate(upstream);
1150+
}
1151+
1152+
private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) {
1153+
// Notify listeners that we had an issue refreshing the table.
1154+
if (getLastNotificationStep() == updateGraph.clock().currentStep()) {
1155+
if (listener != null) {
1156+
listener.forceReferenceCountToZero();
1157+
}
1158+
delayedErrorReference = new DelayedErrorNotifier(e, listener == null ? null : listener.entry, this);
1159+
} else {
1160+
notifyListenersOnError(e, listener == null ? null : listener.entry);
1161+
forceReferenceCountToZero();
1162+
}
1163+
// Release the upstream update and set the final notification step.
1164+
listener.finalizeUpdate(upstream);
11491165
}
11501166

11511167
private void setWhereListener(MergedListener whereListener) {
@@ -1226,41 +1242,18 @@ private QueryTable whereInternal(final WhereFilter... filters) {
12261242

12271243
final CompletableFuture<TrackingWritableRowSet> currentMappingFuture =
12281244
new CompletableFuture<>();
1245+
12291246
final InitialFilterExecution initialFilterExecution = new InitialFilterExecution(
1230-
this, filters, rowSetToUse.copy(), 0, rowSetToUse.size(), null, 0,
1231-
usePrev) {
1232-
@Override
1233-
void handleUncaughtException(Exception throwable) {
1234-
currentMappingFuture.completeExceptionally(throwable);
1235-
}
1236-
};
1237-
final ExecutionContext executionContext = ExecutionContext.getContext();
1238-
initialFilterExecution.scheduleCompletion(x -> {
1239-
try (final SafeCloseable ignored = executionContext.open()) {
1240-
if (x.exceptionResult != null) {
1241-
currentMappingFuture.completeExceptionally(x.exceptionResult);
1242-
} else {
1243-
currentMappingFuture.complete(x.addedResult.toTracking());
1244-
}
1245-
}
1246-
});
1247+
this, filters, rowSetToUse.copy(), usePrev);
1248+
final TrackingWritableRowSet currentMapping;
1249+
initialFilterExecution.scheduleCompletion((adds, mods) -> {
1250+
currentMappingFuture.complete(adds.writableCast().toTracking());
1251+
}, currentMappingFuture::completeExceptionally);
12471252

1248-
boolean cancelled = false;
1249-
TrackingWritableRowSet currentMapping = null;
12501253
try {
1251-
boolean done = false;
1252-
while (!done) {
1253-
try {
1254-
currentMapping = currentMappingFuture.get();
1255-
done = true;
1256-
} catch (InterruptedException e) {
1257-
// cancel the job and wait for it to finish cancelling
1258-
cancelled = true;
1259-
initialFilterExecution.setCancelled();
1260-
}
1261-
}
1262-
} catch (ExecutionException e) {
1263-
if (cancelled) {
1254+
currentMapping = currentMappingFuture.get();
1255+
} catch (ExecutionException | InterruptedException e) {
1256+
if (e instanceof InterruptedException) {
12641257
throw new CancellationException("interrupted while filtering");
12651258
} else if (e.getCause() instanceof RuntimeException) {
12661259
throw (RuntimeException) e.getCause();

0 commit comments

Comments
 (0)