Skip to content

Commit 50f2827

Browse files
feat: DH-18174: Delay reading from parquet file when creating table and column location (#6606)
1 parent b6503a8 commit 50f2827

File tree

6 files changed

+307
-155
lines changed

6 files changed

+307
-155
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {
127127

128128
/**
129129
* @param name The column name
130-
* @return The ColumnLocation for the defined column under this table location
130+
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
131+
* should be returned for the same column name.
131132
*/
132133
@NotNull
133134
ColumnLocation getColumnLocation(@NotNull CharSequence name);

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java

+23
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,16 @@ public LivenessReferent asLivenessReferent() {
8787
// TableLocationState implementation
8888
// ------------------------------------------------------------------------------------------------------------------
8989

90+
/**
91+
* No-op by default, can be overridden by subclasses to initialize state on first access.
92+
* <p>
93+
* The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)}
94+
* instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of
95+
* {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization
96+
* logic will recurse infinitely.
97+
*/
98+
protected void initializeState() {}
99+
90100
@Override
91101
@NotNull
92102
public final Object getStateLock() {
@@ -95,16 +105,19 @@ public final Object getStateLock() {
95105

96106
@Override
97107
public final RowSet getRowSet() {
108+
initializeState();
98109
return state.getRowSet();
99110
}
100111

101112
@Override
102113
public final long getSize() {
114+
initializeState();
103115
return state.getSize();
104116
}
105117

106118
@Override
107119
public final long getLastModifiedTimeMillis() {
120+
initializeState();
108121
return state.getLastModifiedTimeMillis();
109122
}
110123

@@ -137,6 +150,11 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
137150
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
138151
*/
139152
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
153+
initializeState();
154+
handleUpdateInternal(rowSet, lastModifiedTimeMillis);
155+
}
156+
157+
protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) {
140158
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
141159
deliverUpdateNotification();
142160
}
@@ -149,6 +167,11 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
149167
* @param source The source to copy state values from
150168
*/
151169
public void handleUpdate(@NotNull final TableLocationState source) {
170+
initializeState();
171+
handleUpdateInternal(source);
172+
}
173+
174+
protected final void handleUpdateInternal(@NotNull final TableLocationState source) {
152175
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
153176
deliverUpdateNotification();
154177
}

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java

+5-33
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,6 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
603603
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
604604
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
605605
private final int regionIndex = nextRegionIndex++;
606-
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();
607606

608607
/**
609608
* RowSet in the region's space, not the table's space.
@@ -631,13 +630,10 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
631630
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));
632631

633632
for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
634-
// noinspection unchecked,rawtypes
635-
final ColumnLocationState<?> state = new ColumnLocationState(
636-
columnDefinition,
637-
columnSources.get(columnDefinition.getName()),
638-
location.getColumnLocation(columnDefinition.getName()));
639-
columnLocationStates.add(state);
640-
state.regionAllocated(regionIndex);
633+
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
634+
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
635+
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
636+
"regionedColumnSource.addRegion((definition, location)");
641637
}
642638

643639
rowSetAtLastUpdate = initialRowSet;
@@ -710,7 +706,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
710706
}
711707

712708
private void invalidate() {
713-
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
709+
columnSources.values().forEach(source -> source.invalidateRegion(regionIndex));
714710
}
715711

716712
@Override
@@ -734,30 +730,6 @@ public ImmutableTableLocationKey getKey(
734730
}
735731
};
736732

737-
/**
738-
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
739-
*/
740-
private static class ColumnLocationState<T> {
741-
742-
protected final ColumnDefinition<T> definition;
743-
protected final RegionedColumnSource<T> source;
744-
protected final ColumnLocation location;
745-
746-
private ColumnLocationState(
747-
ColumnDefinition<T> definition,
748-
RegionedColumnSource<T> source,
749-
ColumnLocation location) {
750-
this.definition = definition;
751-
this.source = source;
752-
this.location = location;
753-
}
754-
755-
private void regionAllocated(final int regionIndex) {
756-
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
757-
"source.addRegion((definition, location)");
758-
}
759-
}
760-
761733
public Map<String, Object> getTableAttributes(
762734
@NotNull TableUpdateMode tableUpdateMode,
763735
@NotNull TableUpdateMode tableLocationUpdateMode) {

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java

+62-33
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.math.BigInteger;
4040
import java.time.Instant;
4141
import java.util.Arrays;
42+
import java.util.Collections;
43+
import java.util.List;
4244
import java.util.Optional;
4345
import java.util.function.Function;
4446
import java.util.function.LongFunction;
@@ -58,48 +60,71 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
5860
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
5961
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);
6062

63+
private final String columnName;
6164
private final String parquetColumnName;
65+
66+
private volatile boolean readersInitialized;
67+
private final Object readersLock;
68+
69+
// Access to following variables must be guarded by initializeReaders()
70+
// -----------------------------------------------------------------------
71+
/**
72+
* Factory object needed for deferred initialization of the remaining fields. We delay initializing this field
73+
* itself till we need to read the column data.
74+
*/
75+
private ColumnChunkReader[] columnChunkReaders;
76+
6277
/**
63-
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
64-
* ensure visibility of the derived fields.
78+
* Whether the column location actually exists.
6579
*/
66-
private volatile ColumnChunkReader[] columnChunkReaders;
80+
private boolean exists;
81+
// -----------------------------------------------------------------------
6782

68-
// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
69-
private volatile PageCache<ATTR> pageCache;
83+
private volatile boolean pagesInitialized;
84+
private final Object pagesLock;
7085

86+
// Access to following variables must be guarded by initializePages()
87+
// -----------------------------------------------------------------------
7188
private ColumnChunkPageStore<ATTR>[] pageStores;
7289
private Supplier<Chunk<ATTR>>[] dictionaryChunkSuppliers;
7390
private ColumnChunkPageStore<DictionaryKeys>[] dictionaryKeysPageStores;
91+
// -----------------------------------------------------------------------
7492

7593
/**
7694
* Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name.
7795
*
7896
* @param tableLocation The table location enclosing this column location
7997
* @param parquetColumnName The Parquet file column name
80-
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
8198
*/
8299
ParquetColumnLocation(
83100
@NotNull final ParquetTableLocation tableLocation,
84101
@NotNull final String columnName,
85-
@NotNull final String parquetColumnName,
86-
@Nullable final ColumnChunkReader[] columnChunkReaders) {
102+
@NotNull final String parquetColumnName) {
87103
super(tableLocation, columnName);
104+
this.columnName = columnName;
88105
this.parquetColumnName = parquetColumnName;
89-
this.columnChunkReaders = columnChunkReaders;
106+
this.readersInitialized = false;
107+
this.readersLock = new Object();
108+
this.pagesInitialized = false;
109+
this.pagesLock = new Object();
90110
}
91111

92-
private PageCache<ATTR> ensurePageCache() {
93-
PageCache<ATTR> localPageCache;
94-
if ((localPageCache = pageCache) != null) {
95-
return localPageCache;
112+
private void initializeReaders() {
113+
if (readersInitialized) {
114+
return;
96115
}
97-
98-
synchronized (this) {
99-
if ((localPageCache = pageCache) != null) {
100-
return localPageCache;
116+
synchronized (readersLock) {
117+
if (readersInitialized) {
118+
return;
101119
}
102-
return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
120+
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
121+
final List<String> nameList =
122+
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
123+
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
124+
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
125+
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
126+
this.columnChunkReaders = exists ? columnChunkReaders : null;
127+
readersInitialized = true;
103128
}
104129
}
105130

@@ -114,10 +139,8 @@ public String getImplementationName() {
114139

115140
@Override
116141
public boolean exists() {
117-
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
118-
// see a non-null
119-
// pageStores array
120-
return columnChunkReaders != null || pageStores != null;
142+
initializeReaders();
143+
return exists;
121144
}
122145

123146
private ParquetTableLocation tl() {
@@ -258,9 +281,9 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
258281
* @return The page stores
259282
*/
260283
@NotNull
261-
public ColumnChunkPageStore<ATTR>[] getPageStores(
284+
private ColumnChunkPageStore<ATTR>[] getPageStores(
262285
@NotNull final ColumnDefinition<?> columnDefinition) {
263-
fetchValues(columnDefinition);
286+
initializePages(columnDefinition);
264287
return pageStores;
265288
}
266289

@@ -270,9 +293,9 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
270293
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
271294
* @return The dictionary values chunk suppliers, or null if none exist
272295
*/
273-
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
296+
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
274297
@NotNull final ColumnDefinition<?> columnDefinition) {
275-
fetchValues(columnDefinition);
298+
initializePages(columnDefinition);
276299
return dictionaryChunkSuppliers;
277300
}
278301

@@ -285,30 +308,35 @@ public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
285308
*/
286309
private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(
287310
@NotNull final ColumnDefinition<?> columnDefinition) {
288-
fetchValues(columnDefinition);
311+
initializePages(columnDefinition);
289312
return dictionaryKeysPageStores;
290313
}
291314

292315
@SuppressWarnings("unchecked")
293-
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
294-
if (columnChunkReaders == null) {
316+
private void initializePages(@NotNull final ColumnDefinition<?> columnDefinition) {
317+
if (pagesInitialized) {
295318
return;
296319
}
297-
synchronized (this) {
298-
if (columnChunkReaders == null) {
320+
synchronized (pagesLock) {
321+
if (pagesInitialized) {
299322
return;
300323
}
301-
324+
initializeReaders();
302325
final int pageStoreCount = columnChunkReaders.length;
303326
pageStores = new ColumnChunkPageStore[pageStoreCount];
304327
dictionaryChunkSuppliers = new Supplier[pageStoreCount];
305328
dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount];
329+
330+
// We should consider moving this page-cache to column level if needed.
331+
// Column-location level likely allows more parallelism.
332+
final PageCache<ATTR> pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
333+
306334
for (int psi = 0; psi < pageStoreCount; ++psi) {
307335
final ColumnChunkReader columnChunkReader = columnChunkReaders[psi];
308336
try {
309337
final ColumnChunkPageStore.CreatorResult<ATTR> creatorResult =
310338
ColumnChunkPageStore.create(
311-
ensurePageCache(),
339+
pageCache,
312340
columnChunkReader,
313341
tl().getRegionParameters().regionMask,
314342
makeToPage(tl().getColumnTypes().get(parquetColumnName),
@@ -325,6 +353,7 @@ private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
325353
}
326354

327355
columnChunkReaders = null;
356+
pagesInitialized = true;
328357
}
329358
}
330359

0 commit comments

Comments
 (0)