Skip to content

Commit 8edb2b1

Browse files
feat: DH-18174: Delay reading from parquet file when creating table and column location (deephaven#6606)
1 parent a96e342 commit 8edb2b1

File tree

7 files changed

+309
-161
lines changed

7 files changed

+309
-161
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {
127127

128128
/**
129129
* @param name The column name
130-
* @return The ColumnLocation for the defined column under this table location
130+
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
131+
* should be returned for the same column name.
131132
*/
132133
@NotNull
133134
ColumnLocation getColumnLocation(@NotNull CharSequence name);

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java

+23
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,16 @@ public LivenessReferent asLivenessReferent() {
8787
// TableLocationState implementation
8888
// ------------------------------------------------------------------------------------------------------------------
8989

90+
/**
91+
* No-op by default, can be overridden by subclasses to initialize state on first access.
92+
* <p>
93+
* The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)}
94+
* instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of
95+
* {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization
96+
* logic will recurse infinitely.
97+
*/
98+
protected void initializeState() {}
99+
90100
@Override
91101
@NotNull
92102
public final Object getStateLock() {
@@ -95,16 +105,19 @@ public final Object getStateLock() {
95105

96106
@Override
97107
public final RowSet getRowSet() {
108+
initializeState();
98109
return state.getRowSet();
99110
}
100111

101112
@Override
102113
public final long getSize() {
114+
initializeState();
103115
return state.getSize();
104116
}
105117

106118
@Override
107119
public final long getLastModifiedTimeMillis() {
120+
initializeState();
108121
return state.getLastModifiedTimeMillis();
109122
}
110123

@@ -137,6 +150,11 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
137150
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
138151
*/
139152
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
153+
initializeState();
154+
handleUpdateInternal(rowSet, lastModifiedTimeMillis);
155+
}
156+
157+
protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) {
140158
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
141159
deliverUpdateNotification();
142160
}
@@ -149,6 +167,11 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
149167
* @param source The source to copy state values from
150168
*/
151169
public void handleUpdate(@NotNull final TableLocationState source) {
170+
initializeState();
171+
handleUpdateInternal(source);
172+
}
173+
174+
protected final void handleUpdateInternal(@NotNull final TableLocationState source) {
152175
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
153176
deliverUpdateNotification();
154177
}

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java

+5-33
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,6 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
603603
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
604604
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
605605
private final int regionIndex = nextRegionIndex++;
606-
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();
607606

608607
/**
609608
* RowSet in the region's space, not the table's space.
@@ -631,13 +630,10 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
631630
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));
632631

633632
for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
634-
// noinspection unchecked,rawtypes
635-
final ColumnLocationState<?> state = new ColumnLocationState(
636-
columnDefinition,
637-
columnSources.get(columnDefinition.getName()),
638-
location.getColumnLocation(columnDefinition.getName()));
639-
columnLocationStates.add(state);
640-
state.regionAllocated(regionIndex);
633+
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
634+
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
635+
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
636+
"regionedColumnSource.addRegion((definition, location)");
641637
}
642638

643639
rowSetAtLastUpdate = initialRowSet;
@@ -710,7 +706,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
710706
}
711707

712708
private void invalidate() {
713-
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
709+
columnSources.values().forEach(source -> source.invalidateRegion(regionIndex));
714710
}
715711

716712
@Override
@@ -734,30 +730,6 @@ public ImmutableTableLocationKey getKey(
734730
}
735731
};
736732

737-
/**
738-
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
739-
*/
740-
private static class ColumnLocationState<T> {
741-
742-
protected final ColumnDefinition<T> definition;
743-
protected final RegionedColumnSource<T> source;
744-
protected final ColumnLocation location;
745-
746-
private ColumnLocationState(
747-
ColumnDefinition<T> definition,
748-
RegionedColumnSource<T> source,
749-
ColumnLocation location) {
750-
this.definition = definition;
751-
this.source = source;
752-
this.location = location;
753-
}
754-
755-
private void regionAllocated(final int regionIndex) {
756-
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
757-
"source.addRegion((definition, location)");
758-
}
759-
}
760-
761733
public Map<String, Object> getTableAttributes(
762734
@NotNull TableUpdateMode tableUpdateMode,
763735
@NotNull TableUpdateMode tableLocationUpdateMode) {

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java

+60-33
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.math.BigInteger;
4040
import java.time.Instant;
4141
import java.util.Arrays;
42+
import java.util.List;
4243
import java.util.Optional;
4344
import java.util.function.Function;
4445
import java.util.function.LongFunction;
@@ -58,48 +59,70 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
5859
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
5960
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);
6061

62+
private final String columnName;
6163
private final String parquetColumnName;
64+
65+
private volatile boolean readersInitialized;
66+
private final Object readersLock;
67+
68+
// Access to following variables must be guarded by initializeReaders()
69+
// -----------------------------------------------------------------------
70+
/**
71+
* Factory object needed for deferred initialization of the remaining fields. We delay initializing this field
72+
* itself till we need to read the column data.
73+
*/
74+
private ColumnChunkReader[] columnChunkReaders;
75+
6276
/**
63-
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
64-
* ensure visibility of the derived fields.
77+
* Whether the column location actually exists.
6578
*/
66-
private volatile ColumnChunkReader[] columnChunkReaders;
79+
private boolean exists;
80+
// -----------------------------------------------------------------------
6781

68-
// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
69-
private volatile PageCache<ATTR> pageCache;
82+
private volatile boolean pagesInitialized;
83+
private final Object pagesLock;
7084

85+
// Access to following variables must be guarded by initializePages()
86+
// -----------------------------------------------------------------------
7187
private ColumnChunkPageStore<ATTR>[] pageStores;
7288
private Supplier<Chunk<ATTR>>[] dictionaryChunkSuppliers;
7389
private ColumnChunkPageStore<DictionaryKeys>[] dictionaryKeysPageStores;
90+
// -----------------------------------------------------------------------
7491

7592
/**
7693
* Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name.
7794
*
7895
* @param tableLocation The table location enclosing this column location
7996
* @param parquetColumnName The Parquet file column name
80-
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
8197
*/
8298
ParquetColumnLocation(
8399
@NotNull final ParquetTableLocation tableLocation,
84100
@NotNull final String columnName,
85-
@NotNull final String parquetColumnName,
86-
@Nullable final ColumnChunkReader[] columnChunkReaders) {
101+
@NotNull final String parquetColumnName) {
87102
super(tableLocation, columnName);
103+
this.columnName = columnName;
88104
this.parquetColumnName = parquetColumnName;
89-
this.columnChunkReaders = columnChunkReaders;
105+
this.readersInitialized = false;
106+
this.readersLock = new Object();
107+
this.pagesInitialized = false;
108+
this.pagesLock = new Object();
90109
}
91110

92-
private PageCache<ATTR> ensurePageCache() {
93-
PageCache<ATTR> localPageCache;
94-
if ((localPageCache = pageCache) != null) {
95-
return localPageCache;
111+
private void initializeReaders() {
112+
if (readersInitialized) {
113+
return;
96114
}
97-
98-
synchronized (this) {
99-
if ((localPageCache = pageCache) != null) {
100-
return localPageCache;
115+
synchronized (readersLock) {
116+
if (readersInitialized) {
117+
return;
101118
}
102-
return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
119+
final List<String> columnPath = tl().getColumnPath(columnName, parquetColumnName);
120+
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
121+
.map(rgr -> rgr.getColumnChunk(columnName, columnPath))
122+
.toArray(ColumnChunkReader[]::new);
123+
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
124+
this.columnChunkReaders = exists ? columnChunkReaders : null;
125+
readersInitialized = true;
103126
}
104127
}
105128

@@ -114,10 +137,8 @@ public String getImplementationName() {
114137

115138
@Override
116139
public boolean exists() {
117-
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
118-
// see a non-null
119-
// pageStores array
120-
return columnChunkReaders != null || pageStores != null;
140+
initializeReaders();
141+
return exists;
121142
}
122143

123144
private ParquetTableLocation tl() {
@@ -258,9 +279,9 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
258279
* @return The page stores
259280
*/
260281
@NotNull
261-
public ColumnChunkPageStore<ATTR>[] getPageStores(
282+
private ColumnChunkPageStore<ATTR>[] getPageStores(
262283
@NotNull final ColumnDefinition<?> columnDefinition) {
263-
fetchValues(columnDefinition);
284+
initializePages(columnDefinition);
264285
return pageStores;
265286
}
266287

@@ -270,9 +291,9 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
270291
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
271292
* @return The dictionary values chunk suppliers, or null if none exist
272293
*/
273-
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
294+
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
274295
@NotNull final ColumnDefinition<?> columnDefinition) {
275-
fetchValues(columnDefinition);
296+
initializePages(columnDefinition);
276297
return dictionaryChunkSuppliers;
277298
}
278299

@@ -285,30 +306,35 @@ public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
285306
*/
286307
private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(
287308
@NotNull final ColumnDefinition<?> columnDefinition) {
288-
fetchValues(columnDefinition);
309+
initializePages(columnDefinition);
289310
return dictionaryKeysPageStores;
290311
}
291312

292313
@SuppressWarnings("unchecked")
293-
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
294-
if (columnChunkReaders == null) {
314+
private void initializePages(@NotNull final ColumnDefinition<?> columnDefinition) {
315+
if (pagesInitialized) {
295316
return;
296317
}
297-
synchronized (this) {
298-
if (columnChunkReaders == null) {
318+
synchronized (pagesLock) {
319+
if (pagesInitialized) {
299320
return;
300321
}
301-
322+
initializeReaders();
302323
final int pageStoreCount = columnChunkReaders.length;
303324
pageStores = new ColumnChunkPageStore[pageStoreCount];
304325
dictionaryChunkSuppliers = new Supplier[pageStoreCount];
305326
dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount];
327+
328+
// We should consider moving this page-cache to column level if needed.
329+
// Column-location level likely allows more parallelism.
330+
final PageCache<ATTR> pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
331+
306332
for (int psi = 0; psi < pageStoreCount; ++psi) {
307333
final ColumnChunkReader columnChunkReader = columnChunkReaders[psi];
308334
try {
309335
final ColumnChunkPageStore.CreatorResult<ATTR> creatorResult =
310336
ColumnChunkPageStore.create(
311-
ensurePageCache(),
337+
pageCache,
312338
columnChunkReader,
313339
tl().getRegionParameters().regionMask,
314340
makeToPage(tl().getColumnTypes().get(parquetColumnName),
@@ -325,6 +351,7 @@ private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
325351
}
326352

327353
columnChunkReaders = null;
354+
pagesInitialized = true;
328355
}
329356
}
330357

0 commit comments

Comments
 (0)