Skip to content

Commit 3a503a9

Browse files
nbauernfeindrcaudy
andauthored
Use a Deque to avoid StackOverflow in ImmediateJobScheduler (deephaven#5077)
Co-authored-by: Ryan Caudy <ryan@deephaven.io>
1 parent 2f0355a commit 3a503a9

File tree

7 files changed

+44
-19
lines changed

7 files changed

+44
-19
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class InitialFilterExecution extends AbstractFilterExecution {
3838
if (permitParallelization) {
3939
jobScheduler = new OperationInitializerJobScheduler();
4040
} else {
41-
jobScheduler = ImmediateJobScheduler.INSTANCE;
41+
jobScheduler = new ImmediateJobScheduler();
4242
}
4343
}
4444

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1479,7 +1479,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
14791479
&& analyzer.allowCrossColumnParallelization()) {
14801480
jobScheduler = new OperationInitializerJobScheduler();
14811481
} else {
1482-
jobScheduler = ImmediateJobScheduler.INSTANCE;
1482+
jobScheduler = new ImmediateJobScheduler();
14831483
}
14841484

14851485
final QueryTable resultTable;

engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void onUpdate(final TableUpdate upstream) {
8888
if (enableParallelUpdate) {
8989
jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph());
9090
} else {
91-
jobScheduler = ImmediateJobScheduler.INSTANCE;
91+
jobScheduler = new ImmediateJobScheduler();
9292
}
9393

9494
analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this,

engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ private ListenerFilterExecution(
260260
if (permitParallelization) {
261261
jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph());
262262
} else {
263-
jobScheduler = ImmediateJobScheduler.INSTANCE;
263+
jobScheduler = new ImmediateJobScheduler();
264264
}
265265
}
266266

engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public Result<QueryTable> initialize(final boolean usePrev, final long beforeClo
255255
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
256256
jobScheduler = new OperationInitializerJobScheduler();
257257
} else {
258-
jobScheduler = ImmediateJobScheduler.INSTANCE;
258+
jobScheduler = new ImmediateJobScheduler();
259259
}
260260

261261
final ExecutionContext executionContext = ExecutionContext.newBuilder()

engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable {
302302
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
303303
jobScheduler = new OperationInitializerJobScheduler();
304304
} else {
305-
jobScheduler = ImmediateJobScheduler.INSTANCE;
305+
jobScheduler = new ImmediateJobScheduler();
306306
}
307307
executionContext = ExecutionContext.newBuilder()
308308
.markSystemic().build();
@@ -331,7 +331,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable {
331331
if (source.getUpdateGraph().parallelismFactor() > 1) {
332332
jobScheduler = new UpdateGraphJobScheduler(source.getUpdateGraph());
333333
} else {
334-
jobScheduler = ImmediateJobScheduler.INSTANCE;
334+
jobScheduler = new ImmediateJobScheduler();
335335
}
336336
executionContext = ExecutionContext.newBuilder()
337337
.setUpdateGraph(result().getUpdateGraph())

engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java

+37-12
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,55 @@
33
import io.deephaven.base.log.LogOutputAppendable;
44
import io.deephaven.engine.context.ExecutionContext;
55
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
6-
import io.deephaven.io.log.impl.LogOutputStringImpl;
76
import io.deephaven.util.SafeCloseable;
8-
import io.deephaven.util.process.ProcessEnvironment;
97

8+
import java.util.ArrayDeque;
9+
import java.util.Deque;
10+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
1011
import java.util.function.Consumer;
1112

1213
public class ImmediateJobScheduler implements JobScheduler {
13-
public static final ImmediateJobScheduler INSTANCE = new ImmediateJobScheduler();
14+
15+
private volatile Thread processingThread;
16+
private static final AtomicReferenceFieldUpdater<ImmediateJobScheduler, Thread> PROCESSING_THREAD_UPDATER =
17+
AtomicReferenceFieldUpdater.newUpdater(ImmediateJobScheduler.class, Thread.class, "processingThread");
18+
19+
private final Deque<Runnable> pendingJobs = new ArrayDeque<>();
1420

1521
@Override
1622
public void submit(
1723
final ExecutionContext executionContext,
1824
final Runnable runnable,
1925
final LogOutputAppendable description,
2026
final Consumer<Exception> onError) {
21-
// We do not need to install the update context since we are not changing thread contexts.
22-
try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) {
23-
runnable.run();
24-
} catch (Exception e) {
25-
onError.accept(e);
26-
} catch (Error e) {
27-
final String logMessage = new LogOutputStringImpl().append(description).append(" Error").toString();
28-
ProcessEnvironment.getGlobalFatalErrorReporter().report(logMessage, e);
29-
throw e;
27+
final Thread thisThread = Thread.currentThread();
28+
final boolean thisThreadIsProcessing = processingThread == thisThread;
29+
30+
if (!thisThreadIsProcessing && !PROCESSING_THREAD_UPDATER.compareAndSet(this, null, thisThread)) {
31+
throw new IllegalCallerException("An unexpected thread submitted a job to this job scheduler");
32+
}
33+
34+
pendingJobs.addLast(() -> {
35+
// We do not need to install the update context since we are not changing thread contexts.
36+
try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) {
37+
runnable.run();
38+
} catch (Exception e) {
39+
onError.accept(e);
40+
}
41+
});
42+
43+
if (thisThreadIsProcessing) {
44+
// We're already draining the queue in an ancestor stack frame
45+
return;
46+
}
47+
48+
try {
49+
Runnable job;
50+
while ((job = pendingJobs.pollLast()) != null) {
51+
job.run();
52+
}
53+
} finally {
54+
PROCESSING_THREAD_UPDATER.set(this, null);
3055
}
3156
}
3257

0 commit comments

Comments
 (0)