Skip to content

Commit db04b57

Browse files
Added support to read single parquet file hosted in AWS S3 (deephaven#4972)
Refactored local parquet reading code to support reading from URIs instead of file paths.
1 parent 2e747b4 commit db04b57

File tree

77 files changed

+2704
-441
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+2704
-441
lines changed

Util/channel/build.gradle

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
plugins {
2+
id 'java-library'
3+
id 'io.deephaven.project.register'
4+
}
5+
6+
dependencies {
7+
implementation project(':Base')
8+
9+
// Needed for SafeCloseable
10+
implementation project(':Util')
11+
12+
compileOnly depAnnotations
13+
14+
Classpaths.inheritJUnitPlatform(project)
15+
Classpaths.inheritAssertJ(project)
16+
testImplementation 'org.junit.jupiter:junit-jupiter'
17+
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
18+
}
19+
20+
test {
21+
useJUnitPlatform()
22+
}

Util/channel/gradle.properties

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.deephaven.project.ProjectType=JAVA_PUBLIC

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/CachedChannelProvider.java Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java

+46-8
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
/**
22
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
33
*/
4-
package io.deephaven.parquet.base.util;
4+
package io.deephaven.util.channel;
55

66
import io.deephaven.base.RAPriQueue;
77
import io.deephaven.base.verify.Assert;
88
import io.deephaven.base.verify.Require;
99
import io.deephaven.hash.KeyedObjectHashMap;
1010
import io.deephaven.hash.KeyedObjectKey;
11+
import io.deephaven.util.annotations.FinalDefault;
1112
import org.jetbrains.annotations.NotNull;
1213
import org.jetbrains.annotations.Nullable;
1314

1415
import java.io.IOException;
16+
import java.net.URI;
1517
import java.nio.ByteBuffer;
1618
import java.nio.channels.SeekableByteChannel;
1719
import java.nio.file.Path;
@@ -22,6 +24,15 @@
2224
*/
2325
public class CachedChannelProvider implements SeekableChannelsProvider {
2426

27+
public interface ContextHolder {
28+
void setContext(SeekableChannelContext channelContext);
29+
30+
@FinalDefault
31+
default void clearContext() {
32+
setContext(null);
33+
}
34+
}
35+
2536
private final SeekableChannelsProvider wrappedProvider;
2637
private final int maximumPooledCount;
2738

@@ -52,13 +63,27 @@ public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProv
5263
}
5364

5465
@Override
55-
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
56-
final String pathKey = path.toAbsolutePath().toString();
66+
public SeekableChannelContext makeContext() {
67+
return wrappedProvider.makeContext();
68+
}
69+
70+
@Override
71+
public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) {
72+
return wrappedProvider.isCompatibleWith(channelContext);
73+
}
74+
75+
@Override
76+
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
77+
@NotNull final URI uri)
78+
throws IOException {
79+
final String uriString = uri.toString();
5780
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
58-
final CachedChannel result = tryGetPooledChannel(pathKey, channelPool);
59-
return result == null
60-
? new CachedChannel(wrappedProvider.getReadChannel(path), ChannelType.Read, pathKey)
81+
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);
82+
final CachedChannel channel = result == null
83+
? new CachedChannel(wrappedProvider.getReadChannel(channelContext, uri), ChannelType.Read, uriString)
6184
: result.position(0);
85+
channel.setContext(channelContext);
86+
return channel;
6287
}
6388

6489
@Override
@@ -125,10 +150,15 @@ private long advanceClock() {
125150
return logicalClock = 1;
126151
}
127152

153+
@Override
154+
public void close() {
155+
wrappedProvider.close();
156+
}
157+
128158
/**
129159
* {@link SeekableByteChannel Channel} wrapper for pooled usage.
130160
*/
131-
private class CachedChannel implements SeekableByteChannel {
161+
private class CachedChannel implements SeekableByteChannel, ContextHolder {
132162

133163
private final SeekableByteChannel wrappedChannel;
134164
private final ChannelType channelType;
@@ -163,7 +193,7 @@ public long position() throws IOException {
163193
}
164194

165195
@Override
166-
public SeekableByteChannel position(final long newPosition) throws IOException {
196+
public CachedChannel position(final long newPosition) throws IOException {
167197
Require.eqTrue(isOpen, "isOpen");
168198
wrappedChannel.position(newPosition);
169199
return this;
@@ -196,12 +226,20 @@ public boolean isOpen() {
196226
public void close() throws IOException {
197227
Require.eqTrue(isOpen, "isOpen");
198228
isOpen = false;
229+
clearContext();
199230
returnPoolableChannel(this);
200231
}
201232

202233
private void dispose() throws IOException {
203234
wrappedChannel.close();
204235
}
236+
237+
@Override
238+
public final void setContext(@Nullable final SeekableChannelContext channelContext) {
239+
if (wrappedChannel instanceof ContextHolder) {
240+
((ContextHolder) wrappedChannel).setContext(channelContext);
241+
}
242+
}
205243
}
206244

207245
/**

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/LocalFSChannelProvider.java Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
/**
22
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
33
*/
4-
package io.deephaven.parquet.base.util;
4+
package io.deephaven.util.channel;
55

66
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
78

89
import java.io.IOException;
10+
import java.net.URI;
911
import java.nio.channels.FileChannel;
1012
import java.nio.channels.SeekableByteChannel;
1113
import java.nio.file.Path;
@@ -14,8 +16,23 @@
1416
public class LocalFSChannelProvider implements SeekableChannelsProvider {
1517

1618
@Override
17-
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
18-
return FileChannel.open(path, StandardOpenOption.READ);
19+
public SeekableChannelContext makeContext() {
20+
// No additional context required for local FS
21+
return SeekableChannelContext.NULL;
22+
}
23+
24+
@Override
25+
public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelContext) {
26+
// Context is not used, hence always compatible
27+
return true;
28+
}
29+
30+
@Override
31+
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
32+
@NotNull final URI uri)
33+
throws IOException {
34+
// context is unused here
35+
return FileChannel.open(Path.of(uri), StandardOpenOption.READ);
1936
}
2037

2138
@Override
@@ -31,4 +48,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
3148
}
3249
return result;
3350
}
51+
52+
@Override
53+
public void close() {}
3454
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.deephaven.util.channel;
2+
3+
import io.deephaven.util.SafeCloseable;
4+
5+
/**
6+
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
7+
*/
8+
public interface SeekableChannelContext extends SafeCloseable {
9+
10+
SeekableChannelContext NULL = new SeekableChannelContext() {};
11+
12+
/**
13+
* Release any resources associated with this context. The context should not be used afterward.
14+
*/
15+
default void close() {}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
3+
*/
4+
package io.deephaven.util.channel;
5+
6+
import io.deephaven.util.SafeCloseable;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
import java.io.File;
10+
import java.io.IOException;
11+
import java.net.URI;
12+
import java.net.URISyntaxException;
13+
import java.nio.channels.SeekableByteChannel;
14+
import java.nio.file.Path;
15+
import java.nio.file.Paths;
16+
17+
public interface SeekableChannelsProvider extends SafeCloseable {
18+
19+
/**
20+
* Take the file source path or URI and convert it to a URI object.
21+
*
22+
* @param source The file source path or URI
23+
* @return The URI object
24+
*/
25+
static URI convertToURI(final String source) {
26+
final URI uri;
27+
try {
28+
uri = new URI(source);
29+
} catch (final URISyntaxException e) {
30+
// If the URI is invalid, assume it's a file path
31+
return new File(source).toURI();
32+
}
33+
if (uri.getScheme() == null) {
34+
// Need to convert to a "file" URI
35+
return new File(source).toURI();
36+
}
37+
return uri;
38+
}
39+
40+
/**
41+
* Create a new {@link SeekableChannelContext} object for creating read channels via this provider.
42+
*/
43+
SeekableChannelContext makeContext();
44+
45+
/**
46+
* Check if the given context is compatible with this provider. Useful to test if we can use provided
47+
* {@code context} object for creating channels with this provider.
48+
*/
49+
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);
50+
51+
default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
52+
throws IOException {
53+
return getReadChannel(channelContext, convertToURI(uriStr));
54+
}
55+
56+
SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
57+
throws IOException;
58+
59+
default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException {
60+
return getWriteChannel(Paths.get(path), append);
61+
}
62+
63+
SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
3+
*/
4+
package io.deephaven.util.channel;
5+
6+
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
8+
9+
import java.net.URI;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.ServiceLoader;
13+
14+
/**
15+
* A service loader class for loading {@link SeekableChannelsProviderPlugin} implementations at runtime and provide
16+
* {@link SeekableChannelsProvider} implementations for different URI schemes, e.g., S3.
17+
*/
18+
public final class SeekableChannelsProviderLoader {
19+
20+
private static volatile SeekableChannelsProviderLoader instance;
21+
22+
public static SeekableChannelsProviderLoader getInstance() {
23+
if (instance == null) {
24+
instance = new SeekableChannelsProviderLoader();
25+
}
26+
return instance;
27+
}
28+
29+
private final List<SeekableChannelsProviderPlugin> providers;
30+
31+
private SeekableChannelsProviderLoader() {
32+
providers = new ArrayList<>();
33+
// Load the plugins
34+
for (final SeekableChannelsProviderPlugin plugin : ServiceLoader.load(SeekableChannelsProviderPlugin.class)) {
35+
providers.add(plugin);
36+
}
37+
}
38+
39+
/**
40+
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
41+
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
42+
* read files from S3.
43+
*
44+
* @param uri The URI
45+
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
46+
* @return A {@link SeekableChannelsProvider} for the given URI.
47+
*/
48+
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
49+
for (final SeekableChannelsProviderPlugin plugin : providers) {
50+
if (plugin.isCompatible(uri, object)) {
51+
return plugin.createProvider(uri, object);
52+
}
53+
}
54+
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
3+
*/
4+
package io.deephaven.util.channel;
5+
6+
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
8+
9+
import java.net.URI;
10+
11+
/**
12+
* A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3.
13+
* Check out {@link SeekableChannelsProviderLoader} for more details.
14+
*/
15+
public interface SeekableChannelsProviderPlugin {
16+
/**
17+
* Check if this plugin is compatible with the given URI and config object.
18+
*/
19+
boolean isCompatible(@NotNull URI uri, @Nullable Object config);
20+
21+
/**
22+
* Create a {@link SeekableChannelsProvider} for the given URI and config object.
23+
*/
24+
SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object);
25+
}

0 commit comments

Comments
 (0)