Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18174: Delay reading from parquet file when creating table and column location #6590

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,31 @@ public LivenessReferent asLivenessReferent() {
// TableLocationState implementation
// ------------------------------------------------------------------------------------------------------------------

protected void initializeState() {
// No-op by default, can be overridden by subclasses to initialize state on first access
}

@Override
@NotNull
public final Object getStateLock() {
return state.getStateLock();
}

@Override
public RowSet getRowSet() {
public final RowSet getRowSet() {
initializeState();
return state.getRowSet();
}

@Override
public long getSize() {
public final long getSize() {
initializeState();
return state.getSize();
}

@Override
public final long getLastModifiedTimeMillis() {
initializeState();
return state.getLastModifiedTimeMillis();
}

Expand Down Expand Up @@ -137,6 +144,7 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
*/
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
initializeState();
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand All @@ -149,6 +157,7 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
* @param source The source to copy state values from
*/
public void handleUpdate(@NotNull final TableLocationState source) {
initializeState();
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private volatile boolean readersInitialized;

// Access to following variables must be guarded by initializeReaders()
// -----------------------------------------------------------------------
/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields. We delay initializing this field till we need to read the column data.
Expand All @@ -79,12 +80,14 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;
private volatile boolean pagesInitialized;

// Access to following variables must be guarded by initializePages()
// -----------------------------------------------------------------------
private ColumnChunkPageStore<ATTR>[] pageStores;
private Supplier<Chunk<ATTR>>[] dictionaryChunkSuppliers;
private ColumnChunkPageStore<DictionaryKeys>[] dictionaryKeysPageStores;
// -----------------------------------------------------------------------

/**
* Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name.
Expand All @@ -100,6 +103,7 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.readersInitialized = false;
this.pagesInitialized = false;
}

private void initializeReaders() {
Expand All @@ -121,20 +125,6 @@ private void initializeReaders() {
}
}

private PageCache<ATTR> ensurePageCache() {
PageCache<ATTR> localPageCache;
if ((localPageCache = pageCache) != null) {
return localPageCache;
}

synchronized (this) {
if ((localPageCache = pageCache) != null) {
return localPageCache;
}
return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
}
}

// -----------------------------------------------------------------------------------------------------------------
// AbstractColumnLocation implementation
// -----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -290,7 +280,7 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
@NotNull
private ColumnChunkPageStore<ATTR>[] getPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return pageStores;
}

Expand All @@ -302,7 +292,7 @@ private ColumnChunkPageStore<ATTR>[] getPageStores(
*/
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryChunkSuppliers;
}

Expand All @@ -315,31 +305,36 @@ private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
*/
private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryKeysPageStores;
}

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
initializeReaders();
if (columnChunkReaders == null) {
private void initializePages(@NotNull final ColumnDefinition<?> columnDefinition) {
if (pagesInitialized) {
return;
}
initializeReaders();
synchronized (this) {
if (columnChunkReaders == null) {
if (pagesInitialized) {
return;
}

final int pageStoreCount = columnChunkReaders.length;
pageStores = new ColumnChunkPageStore[pageStoreCount];
dictionaryChunkSuppliers = new Supplier[pageStoreCount];
dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount];

// We should consider moving this page-cache to column level if needed.
// Column-location level likely allows more parallelism.
final PageCache<ATTR> pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);

for (int psi = 0; psi < pageStoreCount; ++psi) {
final ColumnChunkReader columnChunkReader = columnChunkReaders[psi];
try {
final ColumnChunkPageStore.CreatorResult<ATTR> creatorResult =
ColumnChunkPageStore.create(
ensurePageCache(),
pageCache,
columnChunkReader,
tl().getRegionParameters().regionMask,
makeToPage(tl().getColumnTypes().get(parquetColumnName),
Expand All @@ -356,6 +351,7 @@ private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
}

columnChunkReaders = null;
pagesInitialized = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,16 @@ public class ParquetTableLocation extends AbstractTableLocation {
private volatile boolean isInitialized;

// Access to all the following variables must be guarded by initialize()
// -----------------------------------------------------------------------
private ParquetFileReader parquetFileReader;
private int[] rowGroupIndices;

private RegionedPageStore.Parameters regionParameters;
private Map<String, String[]> parquetColumnNameToPath;

private TableInfo tableInfo;

// Following are initialized on first access, so should only be accessed via their getters
private Map<String, GroupingColumnInfo> groupingColumns;
private Map<String, ColumnTypeInfo> columnTypes;
private List<SortColumn> sortingColumns;
Expand Down Expand Up @@ -125,19 +128,15 @@ private void initialize() {
tableInfo = ParquetSchemaReader
.parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData())
.orElse(TableInfo.builder().build());
groupingColumns = tableInfo.groupingColumnMap();
columnTypes = tableInfo.columnTypeMap();
sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns());
version = tableInfo.version();

isInitialized = true;

if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) {
// We do not have the last modified time for non-file URIs
handleUpdate(computeIndex(rowGroups), TableLocationState.NULL_TIME);
} else {
handleUpdate(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified());
}

isInitialized = true;
}
}

Expand All @@ -164,6 +163,9 @@ RegionedPageStore.Parameters getRegionParameters() {

public Map<String, ColumnTypeInfo> getColumnTypes() {
initialize();
if (columnTypes == null) {
columnTypes = Collections.unmodifiableMap(tableInfo.columnTypeMap());
}
return columnTypes;
}

Expand All @@ -177,36 +179,51 @@ RowGroupReader[] getRowGroupReaders() {
return local;
}
initialize();
return rowGroupReaders = IntStream.of(rowGroupIndices)
.mapToObj(idx -> parquetFileReader.getRowGroup(idx, version))
local = IntStream.of(rowGroupIndices)
.mapToObj(idx -> parquetFileReader.getRowGroup(idx, getVersion()))
.sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal()))
.toArray(RowGroupReader[]::new);
rowGroupReaders = local;
return local;
}
}

@Override
@NotNull
public List<SortColumn> getSortedColumns() {
initialize();
if (sortingColumns == null) {
sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns());
}
return sortingColumns;
}

@NotNull
Map<String, String[]> getParquetColumnNameToPath() {
private Map<String, GroupingColumnInfo> getGroupingColumns() {
initialize();
return parquetColumnNameToPath;
if (groupingColumns == null) {
groupingColumns = Collections.unmodifiableMap(tableInfo.groupingColumnMap());
}
return groupingColumns;
}

@Override
public final RowSet getRowSet() {
private String getVersion() {
initialize();
if (version == null) {
version = tableInfo.version();
}
return version;
}

@NotNull
Map<String, String[]> getParquetColumnNameToPath() {
initialize();
return super.getRowSet();
return parquetColumnNameToPath;
}

@Override
public final long getSize() {
public final void initializeState() {
initialize();
return super.getSize();
}

@Override
Expand Down Expand Up @@ -237,22 +254,23 @@ private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) {
public List<String[]> getDataIndexColumns() {
initialize();
final List<DataIndexInfo> dataIndexes = tableInfo.dataIndexes();
if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) {
final Map<String, GroupingColumnInfo> localGroupingColumns = getGroupingColumns();
if (dataIndexes.isEmpty() && localGroupingColumns.isEmpty()) {
return List.of();
}
final List<String[]> dataIndexColumns = new ArrayList<>(dataIndexes.size() + groupingColumns.size());
final List<String[]> dataIndexColumns = new ArrayList<>(dataIndexes.size() + localGroupingColumns.size());
// Add the data indexes to the list
dataIndexes.stream().map(di -> di.columns().toArray(String[]::new)).forEach(dataIndexColumns::add);
// Add grouping columns to the list
groupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add);
localGroupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add);
return dataIndexColumns;
}

@Override
public boolean hasDataIndex(@NotNull final String... columns) {
initialize();
// Check if the column name matches any of the grouping columns
if (columns.length == 1 && groupingColumns.containsKey(columns[0])) {
if (columns.length == 1 && getGroupingColumns().containsKey(columns[0])) {
// Validate the index file exists (without loading and parsing it)
final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns);
return metadata != null && parquetFileExists(metadata.fileURI);
Expand Down Expand Up @@ -328,7 +346,7 @@ private IndexFileMetadata getIndexFileMetadata(
@NotNull final String... keyColumnNames) {
if (keyColumnNames.length == 1) {
// If there's only one key column, there might be (legacy) grouping info
final GroupingColumnInfo groupingColumnInfo = groupingColumns.get(keyColumnNames[0]);
final GroupingColumnInfo groupingColumnInfo = getGroupingColumns().get(keyColumnNames[0]);
if (groupingColumnInfo != null) {
return new IndexFileMetadata(
makeRelativeURI(parentFileURI, groupingColumnInfo.groupingTablePath()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3701,10 +3701,7 @@ public void testTableLocationReading() {
assertNotNull(nonExistentTableLocation.toString());
assertNotNull(nonExistentTableLocation.asLivenessReferent());
assertNotNull(nonExistentTableLocation.getStateLock());
nonExistentTableLocation.getLastModifiedTimeMillis();
nonExistentTableLocation.refresh();
nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0);
makeNewTableLocationAndVerifyNoMakeHandleException(nonExistentTableLocation::handleUpdate);

// Verify that we can get a column location for a non-existent column
final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A");
Expand Down Expand Up @@ -3741,6 +3738,11 @@ public void testTableLocationReading() {
verifyMakeHandleException(nonExistentTableLocation::getSize);
makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSize);

verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis);
makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getLastModifiedTimeMillis);

verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0));

// APIs from ColumnLocation
verifyMakeHandleException(nonExistentColumnLocation::exists);
verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionChar(
Expand Down