Skip to content

Commit cb9c83d

Browse files
feat: Added support to read parquet metadata files from S3 (deephaven#5777)
1 parent 6ca0c89 commit cb9c83d

File tree

17 files changed

+449
-248
lines changed

17 files changed

+449
-248
lines changed

Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,14 @@ public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelCon
8787
return wrappedProvider.isCompatibleWith(channelContext);
8888
}
8989

90+
@Override
91+
public boolean exists(@NotNull final URI uri) {
92+
return wrappedProvider.exists(uri);
93+
}
94+
9095
@Override
9196
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
92-
@NotNull final URI uri)
93-
throws IOException {
97+
@NotNull final URI uri) throws IOException {
9498
final String uriString = uri.toString();
9599
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
96100
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);

Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelCo
3333
return true;
3434
}
3535

36+
@Override
37+
public boolean exists(@NotNull final URI uri) {
38+
return Files.exists(Path.of(uri));
39+
}
40+
3641
@Override
3742
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
3843
@NotNull final URI uri)

Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java

+8
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ default SeekableChannelContext makeSingleUseContext() {
5858
*/
5959
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);
6060

61+
/**
62+
* Returns true if the given URI exists in the underlying storage.
63+
*
64+
* @param uri the URI to check
65+
* @return true if the URI exists
66+
*/
67+
boolean exists(@NotNull URI uri);
68+
6169
default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
6270
throws IOException {
6371
return getReadChannel(channelContext, convertToURI(uriStr, false));

Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ public boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext)
208208
return channelContext == SeekableChannelContext.NULL;
209209
}
210210

211+
@Override
212+
public boolean exists(@NotNull URI uri) {
213+
throw new UnsupportedOperationException("exists");
214+
}
215+
211216
@Override
212217
public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext,
213218
@NotNull String path) {

engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3976,7 +3976,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
39763976
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");
39773977

39783978
final Table loaded = ParquetTools.readTable(
3979-
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
3979+
ParquetKeyValuePartitionedLayout.create(testRootFile.toURI(), 2, ParquetInstructions.EMPTY, null),
39803980
ParquetInstructions.EMPTY);
39813981

39823982
// verify the sources are identical

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
1111
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
1212
import io.deephaven.util.datastructures.SoftCachingFunction;
13-
import org.apache.commons.io.FilenameUtils;
1413
import org.apache.parquet.bytes.BytesInput;
1514
import org.apache.parquet.column.ColumnDescriptor;
1615
import org.apache.parquet.column.Dictionary;
@@ -28,14 +27,12 @@
2827
import java.io.UncheckedIOException;
2928
import java.net.URI;
3029
import java.nio.channels.SeekableByteChannel;
31-
import java.nio.file.Path;
3230
import java.util.List;
3331
import java.util.NoSuchElementException;
3432
import java.util.function.Function;
3533

36-
import static io.deephaven.base.FileUtils.convertToURI;
34+
import static io.deephaven.parquet.base.ParquetUtils.resolve;
3735
import static io.deephaven.parquet.base.ColumnPageReaderImpl.getDecompressorHolder;
38-
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
3936
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
4037
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;
4138

@@ -83,12 +80,10 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
8380
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
8481
this.numRows = numRows;
8582
this.version = version;
86-
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
87-
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
88-
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
83+
if (columnChunk.isSetFile_path()) {
84+
columnChunkURI = resolve(rootURI, columnChunk.getFile_path());
8985
} else {
90-
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
91-
this.columnChunkURI = rootURI;
86+
columnChunkURI = rootURI;
9287
}
9388
// Construct the reader object but don't read the offset index yet
9489
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java

+38-10
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,34 @@
33
//
44
package io.deephaven.parquet.base;
55

6+
import io.deephaven.UncheckedDeephavenException;
67
import org.jetbrains.annotations.NotNull;
78

89
import java.io.File;
10+
import java.net.URI;
11+
import java.net.URISyntaxException;
912
import java.nio.charset.StandardCharsets;
1013
import java.nio.file.Path;
1114

15+
import static io.deephaven.base.FileUtils.URI_SEPARATOR;
1216
import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;
1317

1418
public final class ParquetUtils {
1519

20+
public static final String PARQUET_FILE_EXTENSION = ".parquet";
21+
1622
public static final String METADATA_FILE_NAME = "_metadata";
1723
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
18-
public static final String PARQUET_FILE_EXTENSION = ".parquet";
1924
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
2025
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
21-
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
22-
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
26+
private static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
27+
private static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
28+
2329
private static final String MAGIC_STR = "PAR1";
2430
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
2531

32+
private static final String WINDOWS_FILE_SEPARATOR = "\\";
33+
2634
/**
2735
* The number of bytes to buffer before flushing while writing parquet files and metadata files.
2836
*/
@@ -42,17 +50,23 @@ public static String getPerFileMetadataKey(final String filePath) {
4250
}
4351

4452
/**
45-
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
46-
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
53+
* This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI.
4754
*/
4855
public static boolean isParquetFile(@NotNull final String source) {
49-
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
50-
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
51-
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
56+
return source.endsWith(PARQUET_FILE_EXTENSION);
57+
}
58+
59+
/**
60+
* This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI.
61+
*/
62+
public static boolean isMetadataFile(@NotNull final String source) {
63+
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
64+
return true;
65+
}
5266
if (File.separatorChar != URI_SEPARATOR_CHAR) {
53-
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
67+
return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
5468
}
55-
return ret;
69+
return false;
5670
}
5771

5872
/**
@@ -74,4 +88,18 @@ public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull
7488
}
7589
return true;
7690
}
91+
92+
/**
93+
* Resolve a relative path against a base URI. The path can be from Windows or Unix systems.
94+
*/
95+
public static URI resolve(final URI base, final String relativePath) {
96+
final URI relativeURI;
97+
try {
98+
// Sanitize the relative path before resolving it to avoid issues with separators and special characters
99+
relativeURI = new URI(null, null, relativePath.replace(WINDOWS_FILE_SEPARATOR, URI_SEPARATOR), null);
100+
} catch (final URISyntaxException e) {
101+
throw new UncheckedDeephavenException("Failed to create URI from relative path: " + relativePath, e);
102+
}
103+
return base.resolve(relativeURI);
104+
}
77105
}

0 commit comments

Comments
 (0)