Skip to content

Commit 475128a

Browse files
authored
fix: DH-18433: SourcePartitionedTable needs to check the size of pending locations even if there are no added or removed locations (#6570)
1 parent 47a50c3 commit 475128a

File tree

4 files changed

+85
-23
lines changed

4 files changed

+85
-23
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java

+37-17
Original file line numberDiff line numberDiff line change
@@ -250,27 +250,42 @@ private void processPendingLocations(final boolean notifyListeners) {
250250
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
251251
subscriptionBuffer.processPending()) {
252252
if (locationUpdate == null) {
253-
return;
253+
removed = null;
254+
} else {
255+
removed = processRemovals(locationUpdate);
256+
processAdditions(locationUpdate);
254257
}
255-
removed = processRemovals(locationUpdate);
256-
added = processAdditions(locationUpdate);
258+
added = checkPendingLocations();
257259
}
258260

259-
resultRows.update(added, removed);
261+
if (removed == null) {
262+
if (added == null) {
263+
return;
264+
}
265+
resultRows.insert(added);
266+
} else if (added == null) {
267+
resultRows.remove(removed);
268+
} else {
269+
resultRows.update(added, removed);
270+
}
260271
if (notifyListeners) {
261272
result.notifyListeners(new TableUpdateImpl(
262-
added,
263-
removed,
273+
added == null ? RowSetFactory.empty() : added,
274+
removed == null ? RowSetFactory.empty() : removed,
264275
RowSetFactory.empty(),
265276
RowSetShiftData.EMPTY,
266277
ModifiedColumnSet.EMPTY));
267278
} else {
268-
added.close();
269-
removed.close();
279+
if (added != null) {
280+
added.close();
281+
}
282+
if (removed != null) {
283+
removed.close();
284+
}
270285
}
271286
}
272287

273-
private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
288+
private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
274289
/*
275290
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
276291
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
@@ -280,13 +295,18 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
280295
* population in STM ColumnSources.
281296
*/
282297
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
283-
locationUpdate.getPendingAddedLocationKeys().stream()
284-
.map(LiveSupplier::get)
285-
.filter(locationKeyMatcher)
286-
.map(tableLocationProvider::getTableLocation)
287-
.peek(this::manage)
288-
.map(PendingLocationState::new)
289-
.forEach(pendingLocationStates::offer);
298+
if (locationUpdate != null) {
299+
locationUpdate.getPendingAddedLocationKeys().stream()
300+
.map(LiveSupplier::get)
301+
.filter(locationKeyMatcher)
302+
.map(tableLocationProvider::getTableLocation)
303+
.peek(this::manage)
304+
.map(PendingLocationState::new)
305+
.forEach(pendingLocationStates::offer);
306+
}
307+
}
308+
309+
private RowSet checkPendingLocations() {
290310
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
291311
final PendingLocationState pendingLocationState = iter.next();
292312
if (pendingLocationState.exists()) {
@@ -296,7 +316,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
296316
}
297317

298318
if (readyLocationStates.isEmpty()) {
299-
return RowSetFactory.empty();
319+
return null;
300320
}
301321

302322
final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public void close() {
129129
* reset). No order is maintained internally. If a pending exception is thrown, this signals that the subscription
130130
* is no longer valid and no subsequent location keys will be returned.
131131
*
132-
* @return The collection of pending location keys.
132+
* @return A {@link LocationUpdate} collecting pending added and removed location keys, or {@code null} if there are
133+
* none; the caller must {@link LocationUpdate#close() close} the returned object when done.
133134
*/
134135
public synchronized LocationUpdate processPending() {
135136
if (!subscribed) {

engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java

+35-5
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,42 @@ public void tearDown() throws Exception {
6262
private QueryTable p2;
6363
private QueryTable p3;
6464
private QueryTable p4;
65+
private QueryTable p5;
6566

6667
private DependentRegistrar registrar;
6768
private TableBackedTableLocationProvider tlp;
6869

6970
private SourcePartitionedTable setUpData() {
70-
p1 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
71+
p1 = testRefreshingTable(ir(0, 3).toTracking(),
7172
stringCol("Sym", "aa", "bb", "aa", "bb"),
7273
intCol("intCol", 10, 20, 40, 60),
7374
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
7475
p1.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
7576

76-
p2 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
77+
p2 = testRefreshingTable(ir(0, 3).toTracking(),
7778
stringCol("Sym", "cc", "dd", "cc", "dd"),
7879
intCol("intCol", 100, 200, 400, 600),
7980
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
8081
p2.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
8182

82-
p3 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
83+
p3 = testRefreshingTable(ir(0, 3).toTracking(),
8384
stringCol("Sym", "ee", "ff", "ee", "ff"),
8485
intCol("intCol", 1000, 2000, 4000, 6000),
8586
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
8687
p3.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
8788

88-
p4 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
89+
p4 = testRefreshingTable(ir(0, 3).toTracking(),
8990
stringCol("Sym", "gg", "hh", "gg", "hh"),
9091
intCol("intCol", 10000, 20000, 40000, 60000),
9192
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
9293
p4.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
9394

95+
p5 = testRefreshingTable(i().toTracking(), // Initially empty
96+
stringCol("Sym"),
97+
intCol("intCol"),
98+
doubleCol("doubleCol"));
99+
p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
100+
94101
registrar = new DependentRegistrar();
95102
tlp = new TableBackedTableLocationProvider(
96103
registrar,
@@ -238,7 +245,7 @@ public void testAddAndRemoveLocations() {
238245
*/
239246
final TableLocation location5;
240247
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(), true)) {
241-
final QueryTable p5 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
248+
final QueryTable p5 = testRefreshingTable(ir(0, 3).toTracking(),
242249
stringCol("Sym", "ii", "jj", "ii", "jj"),
243250
intCol("intCol", 10000, 20000, 40000, 60000),
244251
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
@@ -407,4 +414,27 @@ public void testCantReadPrev() {
407414
TableLocationRemovedException.class).isPresent()));
408415
getUpdateErrors().clear();
409416
}
417+
418+
@Test
419+
public void testInitiallyEmptyLocation() {
420+
final SourcePartitionedTable spt = setUpData();
421+
final Table ptSummary = spt.merge().selectDistinct("Sym");
422+
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
423+
tlp.add(p5);
424+
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
425+
updateGraph.refreshSources();
426+
// We refreshed the source first, so it won't see a new size for the location backed by p5 on this cycle.
427+
addToTable(p5, ir(0, 3),
428+
stringCol("Sym", "ii", "jj", "kk", "ll"),
429+
intCol("intCol", 10000, 20000, 40000, 60000),
430+
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
431+
}, true);
432+
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
433+
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
434+
updateGraph.refreshSources();
435+
// Now the source has been refreshed, so it should see the new size of the location backed by p5, and
436+
// include it in the result.
437+
}, true);
438+
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd", "ii", "jj", "kk", "ll");
439+
}
410440
}

engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java

+11
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,17 @@ public static WritableRowSet i(long... keys) {
136136
return RowSetFactory.fromKeys(keys);
137137
}
138138

139+
/**
140+
* A shorthand for {@link RowSetFactory#fromRange(long, long)} for use in unit tests.
141+
*
142+
* @param firstRowKey the first key of the new RowSet
143+
* @param lastRowKey the last key (inclusive) of the new RowSet
144+
* @return a new RowSet with the given key range
145+
*/
146+
public static WritableRowSet ir(final long firstRowKey, final long lastRowKey) {
147+
return RowSetFactory.fromRange(firstRowKey, lastRowKey);
148+
}
149+
139150
public static void addToTable(final Table table, final RowSet rowSet, final ColumnHolder<?>... columnHolders) {
140151
if (rowSet.isEmpty()) {
141152
return;

0 commit comments

Comments
 (0)