Skip to content

Commit c6543b2

Browse files
Refactoring the ParquetTools read/write APIs (deephaven#5358)
1 parent 87217e7 commit c6543b2

File tree

17 files changed

+1323
-879
lines changed

17 files changed

+1323
-879
lines changed

Base/src/main/java/io/deephaven/base/FileUtils.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ public boolean accept(File dir, String name) {
3030
};
3131
private final static String[] EMPTY_STRING_ARRAY = new String[0];
3232

33-
public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+");
33+
public static final char URI_SEPARATOR_CHAR = '/';
34+
35+
public static final String URI_SEPARATOR = "" + URI_SEPARATOR_CHAR;
36+
37+
public static final String REPEATED_URI_SEPARATOR = URI_SEPARATOR + URI_SEPARATOR;
38+
39+
public static final Pattern REPEATED_URI_SEPARATOR_PATTERN = Pattern.compile("//+");
3440

3541
/**
3642
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
@@ -258,7 +264,7 @@ public boolean accept(File pathname) {
258264

259265
/**
260266
* Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be
261-
* removed.
267+
* removed. The URI object will always be {@link URI#isAbsolute() absolute}, i.e., will always have a scheme.
262268
*
263269
* @param source The file source path or URI
264270
* @param isDirectory Whether the source is a directory
@@ -273,8 +279,8 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
273279
uri = new URI(source);
274280
// Replace two or more consecutive slashes in the path with a single slash
275281
final String path = uri.getPath();
276-
if (path.contains("//")) {
277-
final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/");
282+
if (path.contains(REPEATED_URI_SEPARATOR)) {
283+
final String canonicalizedPath = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
278284
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
279285
uri.getQuery(), uri.getFragment());
280286
}
@@ -300,17 +306,17 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
300306
*/
301307
public static URI convertToURI(final File file, final boolean isDirectory) {
302308
String absPath = file.getAbsolutePath();
303-
if (File.separatorChar != '/') {
304-
absPath = absPath.replace(File.separatorChar, '/');
309+
if (File.separatorChar != URI_SEPARATOR_CHAR) {
310+
absPath = absPath.replace(File.separatorChar, URI_SEPARATOR_CHAR);
305311
}
306-
if (absPath.charAt(0) != '/') {
307-
absPath = "/" + absPath;
312+
if (absPath.charAt(0) != URI_SEPARATOR_CHAR) {
313+
absPath = URI_SEPARATOR_CHAR + absPath;
308314
}
309-
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
310-
absPath = absPath + "/";
315+
if (isDirectory && absPath.charAt(absPath.length() - 1) != URI_SEPARATOR_CHAR) {
316+
absPath = absPath + URI_SEPARATOR_CHAR;
311317
}
312-
if (absPath.startsWith("//")) {
313-
absPath = "//" + absPath;
318+
if (absPath.startsWith(REPEATED_URI_SEPARATOR)) {
319+
absPath = REPEATED_URI_SEPARATOR + absPath;
314320
}
315321
try {
316322
return new URI("file", null, absPath, null);

engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java

+58-15
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private int priorityOf(@NotNull final ColumnSource<?> keyColumn) {
9292
@NotNull
9393
private static Collection<ColumnSource<?>> getColumnSources(
9494
@NotNull final Table table,
95-
@NotNull final String... keyColumnNames) {
96-
return Arrays.stream(keyColumnNames)
95+
@NotNull final Collection<String> keyColumnNames) {
96+
return keyColumnNames.stream()
9797
.map(table::getColumnSource)
9898
.collect(Collectors.toList());
9999
}
@@ -107,16 +107,29 @@ private static Collection<ColumnSource<?>> getColumnSources(
107107
* @param keyColumnNames The key column names to check
108108
* @return Whether {@code table} has a DataIndexer with a {@link DataIndex} for the given key columns
109109
*/
110-
public static boolean hasDataIndex(@NotNull Table table, @NotNull final String... keyColumnNames) {
111-
if (keyColumnNames.length == 0) {
110+
public static boolean hasDataIndex(@NotNull final Table table, @NotNull final String... keyColumnNames) {
111+
return hasDataIndex(table, Arrays.asList(keyColumnNames));
112+
}
113+
114+
/**
115+
* Test whether {@code table} has a DataIndexer with a usable {@link DataIndex} for the given key columns. Note that
116+
* a result from this method is a snapshot of current state, and does not guarantee anything about future calls to
117+
* {@link #hasDataIndex}, {@link #getDataIndex}, or {@link #getOrCreateDataIndex(Table, String...)}.
118+
*
119+
* @param table The {@link Table} to check
120+
* @param keyColumnNames The key column names to check
121+
* @return Whether {@code table} has a DataIndexer with a {@link DataIndex} for the given key columns
122+
*/
123+
public static boolean hasDataIndex(@NotNull final Table table, @NotNull final Collection<String> keyColumnNames) {
124+
if (keyColumnNames.isEmpty()) {
112125
return false;
113126
}
114-
table = table.coalesce();
115-
final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet());
127+
final Table tableToUse = table.coalesce();
128+
final DataIndexer indexer = DataIndexer.existingOf(tableToUse.getRowSet());
116129
if (indexer == null) {
117130
return false;
118131
}
119-
return indexer.hasDataIndex(getColumnSources(table, keyColumnNames));
132+
return indexer.hasDataIndex(getColumnSources(tableToUse, keyColumnNames));
120133
}
121134

122135
/**
@@ -152,19 +165,34 @@ public boolean hasDataIndex(@NotNull final Collection<ColumnSource<?>> keyColumn
152165
* index is no longer live.
153166
*
154167
* @param table The {@link Table} to check
155-
* @param keyColumnNames The key column for which to retrieve a DataIndex
168+
* @param keyColumnNames The key columns for which to retrieve a DataIndex
156169
* @return The {@link DataIndex}, or {@code null} if one does not exist
157170
*/
158-
public static DataIndex getDataIndex(@NotNull Table table, final String... keyColumnNames) {
159-
if (keyColumnNames.length == 0) {
171+
@Nullable
172+
public static DataIndex getDataIndex(@NotNull final Table table, final String... keyColumnNames) {
173+
return getDataIndex(table, Arrays.asList(keyColumnNames));
174+
}
175+
176+
/**
177+
* If {@code table} has a DataIndexer, return a {@link DataIndex} for the given key columns, or {@code null} if no
178+
* such index exists, if the cached index is invalid, or if the {@link DataIndex#isRefreshing() refreshing} cached
179+
* index is no longer live.
180+
*
181+
* @param table The {@link Table} to check
182+
* @param keyColumnNames The key columns for which to retrieve a DataIndex
183+
* @return The {@link DataIndex}, or {@code null} if one does not exist
184+
*/
185+
@Nullable
186+
public static DataIndex getDataIndex(@NotNull final Table table, final Collection<String> keyColumnNames) {
187+
if (keyColumnNames.isEmpty()) {
160188
return null;
161189
}
162-
table = table.coalesce();
163-
final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet());
190+
final Table tableToUse = table.coalesce();
191+
final DataIndexer indexer = DataIndexer.existingOf(tableToUse.getRowSet());
164192
if (indexer == null) {
165193
return null;
166194
}
167-
return indexer.getDataIndex(getColumnSources(table, keyColumnNames));
195+
return indexer.getDataIndex(getColumnSources(tableToUse, keyColumnNames));
168196
}
169197

170198
/**
@@ -239,13 +267,28 @@ public static DataIndex getOptimalPartialIndex(Table table, final String... keyC
239267
public static DataIndex getOrCreateDataIndex(
240268
@NotNull final Table table,
241269
@NotNull final String... keyColumnNames) {
242-
if (keyColumnNames.length == 0) {
270+
return getOrCreateDataIndex(table, Arrays.asList(keyColumnNames));
271+
}
272+
273+
/**
274+
* Create a {@link DataIndex} for {@code table} indexing {@code keyColumns}, if no valid, live data index already
275+
* exists for these inputs.
276+
*
277+
* @param table The {@link Table} to index
278+
* @param keyColumnNames The key column names to include
279+
* @return The existing or newly created {@link DataIndex}
280+
* @apiNote This method causes the returned {@link DataIndex} to be managed by the enclosing liveness manager.
281+
*/
282+
public static DataIndex getOrCreateDataIndex(
283+
@NotNull final Table table,
284+
@NotNull final Collection<String> keyColumnNames) {
285+
if (keyColumnNames.isEmpty()) {
243286
throw new IllegalArgumentException("Cannot create a DataIndex without any key columns");
244287
}
245288
final QueryTable tableToUse = (QueryTable) table.coalesce();
246289
final DataIndexer dataIndexer = DataIndexer.of(tableToUse.getRowSet());
247290
return dataIndexer.rootCache.computeIfAbsent(dataIndexer.pathFor(getColumnSources(tableToUse, keyColumnNames)),
248-
() -> new TableBackedDataIndex(tableToUse, keyColumnNames));
291+
() -> new TableBackedDataIndex(tableToUse, keyColumnNames.toArray(String[]::new)));
249292
}
250293

251294
/**

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525
import java.util.function.Supplier;
2626
import java.util.stream.Stream;
2727

28+
import static io.deephaven.base.FileUtils.URI_SEPARATOR;
29+
2830
/**
2931
* Extracts a key-value partitioned table layout from a stream of URIs.
3032
*/
3133
public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocationKey>
3234
extends KeyValuePartitionLayout<TLK, URI> {
3335

34-
private static final String URI_SEPARATOR = "/";
35-
3636
protected final URI tableRootDirectory;
3737
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
3838
private final int maxPartitioningLevels;
@@ -96,7 +96,7 @@ private void getPartitions(@NotNull final URI relativePath,
9696
@NotNull final TIntObjectMap<ColumnNameInfo> partitionColInfo,
9797
final boolean registered) {
9898
final String relativePathString = relativePath.getPath();
99-
// The following assumes that there is exactly one URI_SEPARATOR between each subdirectory in the path
99+
// The following assumes that there is exactly one separator between each subdirectory in the path
100100
final String[] subDirs = relativePathString.split(URI_SEPARATOR);
101101
final int numPartitioningCol = subDirs.length - 1;
102102
if (registered) {

engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestChunkedRegionedOperations.java

+12-15
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package io.deephaven.engine.table.impl.sources.regioned;
55

66
import io.deephaven.base.FileUtils;
7-
import io.deephaven.datastructures.util.CollectionUtil;
87
import io.deephaven.engine.context.ExecutionContext;
98
import io.deephaven.engine.table.*;
109
import io.deephaven.stringset.ArrayStringSet;
@@ -226,36 +225,34 @@ public void setUp() throws Exception {
226225
final String tableName = "TestTable";
227226

228227
final PartitionedTable partitionedInputData = inputData.partitionBy("PC");
229-
final File[] partitionedInputDestinations;
228+
final String[] partitionedInputDestinations;
230229
try (final Stream<String> partitionNames = partitionedInputData.table()
231230
.<String>objectColumnIterator("PC").stream()) {
232231
partitionedInputDestinations = partitionNames.map(pcv -> new File(dataDirectory,
233232
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
234-
+ PARQUET_FILE_NAME))
235-
.toArray(File[]::new);
233+
+ PARQUET_FILE_NAME)
234+
.getPath())
235+
.toArray(String[]::new);
236236
}
237-
ParquetTools.writeParquetTables(
237+
ParquetTools.writeTables(
238238
partitionedInputData.constituents(),
239-
partitionedDataDefinition.getWritable(),
240-
parquetInstructions,
241239
partitionedInputDestinations,
242-
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
240+
parquetInstructions.withTableDefinition(partitionedDataDefinition.getWritable()));
243241

244242
final PartitionedTable partitionedInputMissingData = inputMissingData.view("PC", "II").partitionBy("PC");
245-
final File[] partitionedInputMissingDestinations;
243+
final String[] partitionedInputMissingDestinations;
246244
try (final Stream<String> partitionNames = partitionedInputMissingData.table()
247245
.<String>objectColumnIterator("PC").stream()) {
248246
partitionedInputMissingDestinations = partitionNames.map(pcv -> new File(dataDirectory,
249247
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
250-
+ PARQUET_FILE_NAME))
251-
.toArray(File[]::new);
248+
+ PARQUET_FILE_NAME)
249+
.getPath())
250+
.toArray(String[]::new);
252251
}
253-
ParquetTools.writeParquetTables(
252+
ParquetTools.writeTables(
254253
partitionedInputMissingData.constituents(),
255-
partitionedMissingDataDefinition.getWritable(),
256-
parquetInstructions,
257254
partitionedInputMissingDestinations,
258-
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
255+
parquetInstructions.withTableDefinition(partitionedMissingDataDefinition.getWritable()));
259256

260257
expected = TableTools
261258
.merge(

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java

+75
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@
33
//
44
package io.deephaven.parquet.base;
55

6+
import io.deephaven.UncheckedDeephavenException;
7+
import io.deephaven.util.channel.CachedChannelProvider;
68
import io.deephaven.util.channel.SeekableChannelContext;
79
import io.deephaven.util.channel.SeekableChannelsProvider;
10+
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
811
import org.apache.parquet.format.*;
912
import org.apache.parquet.format.ColumnOrder;
1013
import org.apache.parquet.format.Type;
1114
import org.apache.parquet.schema.*;
15+
import org.jetbrains.annotations.NotNull;
16+
import org.jetbrains.annotations.Nullable;
1217

1318
import java.io.File;
1419
import java.io.IOException;
@@ -37,6 +42,76 @@ public class ParquetFileReader {
3742
private final URI rootURI;
3843
private final MessageType type;
3944

45+
/**
46+
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
47+
* {@link UncheckedDeephavenException}.
48+
*
49+
* @param parquetFile The parquet file or the parquet metadata file
50+
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
51+
* channels
52+
* @return The new {@link ParquetFileReader}
53+
*/
54+
public static ParquetFileReader create(
55+
@NotNull final File parquetFile,
56+
@Nullable final Object specialInstructions) {
57+
try {
58+
return createChecked(parquetFile, specialInstructions);
59+
} catch (IOException e) {
60+
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
61+
}
62+
}
63+
64+
/**
65+
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
66+
* {@link UncheckedDeephavenException}.
67+
*
68+
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
69+
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
70+
* channels
71+
* @return The new {@link ParquetFileReader}
72+
*/
73+
public static ParquetFileReader create(
74+
@NotNull final URI parquetFileURI,
75+
@Nullable final Object specialInstructions) {
76+
try {
77+
return createChecked(parquetFileURI, specialInstructions);
78+
} catch (IOException e) {
79+
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
80+
}
81+
}
82+
83+
/**
84+
* Make a {@link ParquetFileReader} for the supplied {@link File}.
85+
*
86+
* @param parquetFile The parquet file or the parquet metadata file
87+
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
88+
* channels
89+
* @return The new {@link ParquetFileReader}
90+
* @throws IOException if an IO exception occurs
91+
*/
92+
public static ParquetFileReader createChecked(
93+
@NotNull final File parquetFile,
94+
@Nullable final Object specialInstructions) throws IOException {
95+
return createChecked(convertToURI(parquetFile, false), specialInstructions);
96+
}
97+
98+
/**
99+
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
100+
*
101+
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
102+
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
103+
* channels
104+
* @return The new {@link ParquetFileReader}
105+
* @throws IOException if an IO exception occurs
106+
*/
107+
public static ParquetFileReader createChecked(
108+
@NotNull final URI parquetFileURI,
109+
@Nullable final Object specialInstructions) throws IOException {
110+
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
111+
parquetFileURI, specialInstructions);
112+
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
113+
}
114+
40115
/**
41116
* Create a new ParquetFileReader for the provided source.
42117
*

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java

+20
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@
99
import java.nio.charset.StandardCharsets;
1010
import java.nio.file.Path;
1111

12+
import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;
13+
1214
public final class ParquetUtils {
1315

1416
public static final String METADATA_FILE_NAME = "_metadata";
1517
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
1618
public static final String PARQUET_FILE_EXTENSION = ".parquet";
19+
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
20+
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
21+
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
22+
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
1723
private static final String MAGIC_STR = "PAR1";
1824
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
1925

@@ -35,6 +41,20 @@ public static String getPerFileMetadataKey(final String filePath) {
3541
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
3642
}
3743

44+
/**
45+
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
46+
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
47+
*/
48+
public static boolean isParquetFile(@NotNull final String source) {
49+
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
50+
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
51+
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
52+
if (File.separatorChar != URI_SEPARATOR_CHAR) {
53+
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
54+
}
55+
return ret;
56+
}
57+
3858
/**
3959
* Check if the provided path points to a non-hidden parquet file, and that none of its parents (till rootDir) are
4060
* hidden.

0 commit comments

Comments
 (0)