Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18174: Delay reading from parquet file when creating table and column location #6590

Closed
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public final Object getStateLock() {
}

@Override
public final RowSet getRowSet() {
public RowSet getRowSet() {
return state.getRowSet();
}

@Override
public final long getSize() {
public long getSize() {
return state.getSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
private final int regionIndex = nextRegionIndex++;
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();

// Collection of column sources for which we have added a region, useful for invalidating together
private final Collection<RegionedColumnSource<?>> regionedColumnSources = new ArrayList<>();

/**
* RowSet in the region's space, not the table's space.
Expand Down Expand Up @@ -631,13 +633,11 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
// noinspection unchecked,rawtypes
final ColumnLocationState<?> state = new ColumnLocationState(
columnDefinition,
columnSources.get(columnDefinition.getName()),
location.getColumnLocation(columnDefinition.getName()));
columnLocationStates.add(state);
state.regionAllocated(regionIndex);
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
"regionedColumnSource.addRegion((definition, location)");
regionedColumnSources.add(regionedColumnSource);
}

rowSetAtLastUpdate = initialRowSet;
Expand Down Expand Up @@ -710,7 +710,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
}

private void invalidate() {
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
regionedColumnSources.forEach(source -> source.invalidateRegion(regionIndex));
}

@Override
Expand All @@ -734,30 +734,6 @@ public ImmutableTableLocationKey getKey(
}
};

/**
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
*/
private static class ColumnLocationState<T> {

protected final ColumnDefinition<T> definition;
protected final RegionedColumnSource<T> source;
protected final ColumnLocation location;

private ColumnLocationState(
ColumnDefinition<T> definition,
RegionedColumnSource<T> source,
ColumnLocation location) {
this.definition = definition;
this.source = source;
this.location = location;
}

private void regionAllocated(final int regionIndex) {
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
"source.addRegion((definition, location)");
}
}

public Map<String, Object> getTableAttributes(
@NotNull TableUpdateMode tableUpdateMode,
@NotNull TableUpdateMode tableLocationUpdateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,13 +60,25 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final String columnName;
private final String parquetColumnName;


private volatile boolean readersInitialized;

// Access to following variables must be guarded by initializeReaders()
/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
* ensure visibility of the derived fields. We delay initializing this field till we need to read the column data.
*/
private volatile ColumnChunkReader[] columnChunkReaders;

/**
* Whether the column location actually exists.
*/
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;

Expand All @@ -77,16 +91,34 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.readersInitialized = false;
}

private void initializeReaders() {
if (readersInitialized) {
return;
}
synchronized (this) {
if (readersInitialized) {
return;
}
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
readersInitialized = true;
}
}

private PageCache<ATTR> ensurePageCache() {
Expand Down Expand Up @@ -114,10 +146,8 @@ public String getImplementationName() {

@Override
public boolean exists() {
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
return columnChunkReaders != null || pageStores != null;
initializeReaders();
return exists;
}

private ParquetTableLocation tl() {
Expand Down Expand Up @@ -258,7 +288,7 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return pageStores;
Expand All @@ -270,7 +300,7 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return dictionaryChunkSuppliers;
Expand All @@ -291,6 +321,7 @@ private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
initializeReaders();
if (columnChunkReaders == null) {
return;
}
Expand Down
Loading