Skip to content

Commit 2dbbf32

Browse files
Improved threading capabilities of S3+parquet (deephaven#5451)
1 parent 25e0cb1 commit 2dbbf32

File tree

24 files changed

+496
-326
lines changed

24 files changed

+496
-326
lines changed

Base/src/main/java/io/deephaven/base/FileUtils.java

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//
44
package io.deephaven.base;
55

6-
import io.deephaven.base.verify.Assert;
76
import io.deephaven.base.verify.Require;
87
import org.jetbrains.annotations.Nullable;
98

Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,15 @@ enum ChannelType {
5858
private final RAPriQueue<PerPathPool> releasePriority =
5959
new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class);
6060

61-
public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
61+
public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider,
62+
final int maximumPooledCount) {
63+
if (wrappedProvider instanceof CachedChannelProvider) {
64+
throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider");
65+
}
66+
return new CachedChannelProvider(wrappedProvider, maximumPooledCount);
67+
}
68+
69+
private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
6270
final int maximumPooledCount) {
6371
this.wrappedProvider = wrappedProvider;
6472
this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount");

Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,19 @@ private SeekableChannelsProviderLoader() {
3737
}
3838

3939
/**
40-
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
41-
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
42-
* read files from S3.
40+
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
41+
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
42+
* {@link SeekableChannelsProvider} which can read files from S3.
4343
*
4444
* @param uri The URI
45-
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
45+
* @param specialInstructions An optional object to pass special instructions to the provider.
4646
* @return A {@link SeekableChannelsProvider} for the given URI.
4747
*/
48-
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
48+
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
49+
@Nullable final Object specialInstructions) {
4950
for (final SeekableChannelsProviderPlugin plugin : providers) {
50-
if (plugin.isCompatible(uri, object)) {
51-
return plugin.createProvider(uri, object);
51+
if (plugin.isCompatible(uri, specialInstructions)) {
52+
return plugin.createProvider(uri, specialInstructions);
5253
}
5354
}
5455
throw new UnsupportedOperationException("No plugin found for uri: " + uri);

Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java

+19-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.Supplier;
2121
import java.util.stream.Stream;
2222

23+
import static org.junit.jupiter.api.Assertions.fail;
2324
import static org.junit.jupiter.api.Assertions.assertEquals;
2425
import static org.junit.jupiter.api.Assertions.assertNull;
2526
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -32,7 +33,7 @@ public class CachedChannelProviderTest {
3233
@Test
3334
public void testSimpleRead() throws IOException {
3435
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
35-
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
36+
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
3637
for (int ii = 0; ii < 100; ++ii) {
3738
final SeekableByteChannel[] sameFile = new SeekableByteChannel[10];
3839
for (int jj = 0; jj < sameFile.length; ++jj) {
@@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException {
5556
@Test
5657
public void testSimpleReadWrite() throws IOException {
5758
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
58-
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
59+
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
5960
for (int i = 0; i < 1000; i++) {
6061
SeekableByteChannel rc =
6162
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
@@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException {
6970
@Test
7071
public void testSimpleWrite() throws IOException {
7172
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
72-
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
73+
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
7374
for (int i = 0; i < 1000; i++) {
7475
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
7576
// Call write to hit the assertions inside the mock channel
@@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException {
8687
@Test
8788
public void testSimpleAppend() throws IOException {
8889
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
89-
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
90+
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
9091
for (int i = 0; i < 1000; i++) {
9192
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
9293
rc.close();
@@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException {
100101
@Test
101102
public void testCloseOrder() throws IOException {
102103
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
103-
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
104+
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
104105
for (int i = 0; i < 20; i++) {
105106
List<SeekableByteChannel> channels = new ArrayList<>();
106107
for (int j = 0; j < 50; j++) {
@@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException {
121122
@Test
122123
public void testReuse() throws IOException {
123124
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
124-
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50);
125+
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50);
125126
final SeekableByteChannel[] someResult = new SeekableByteChannel[50];
126127
final ByteBuffer buffer = ByteBuffer.allocate(1);
127128
for (int ci = 0; ci < someResult.length; ++ci) {
@@ -149,7 +150,7 @@ public void testReuse() throws IOException {
149150
@Test
150151
public void testReuse10() throws IOException {
151152
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
152-
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
153+
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
153154
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
154155
for (int pi = 0; pi < 10; ++pi) {
155156
for (int ci = 0; ci < 10; ++ci) {
@@ -173,6 +174,17 @@ public void testReuse10() throws IOException {
173174
assertEquals(0, closed.size());
174175
}
175176

177+
@Test
178+
void testRewrapCachedChannelProvider() {
179+
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
180+
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
181+
try {
182+
CachedChannelProvider.create(cachedChannelProvider, 100);
183+
fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider");
184+
} catch (final IllegalArgumentException expected) {
185+
}
186+
}
187+
176188

177189
private class TestChannelProvider implements SeekableChannelsProvider {
178190

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//
2+
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.util.thread;
5+
6+
import io.deephaven.configuration.Configuration;
7+
8+
public class ThreadHelpers {
9+
/**
10+
* Get the number of threads to use for a given configuration key, defaulting to the number of available processors
11+
* if the configuration key is set to a non-positive value, or the configuration key is not set and the provided
12+
* default is non-positive.
13+
*
14+
* @param configKey The configuration key to look up
15+
* @param defaultValue The default value to use if the configuration key is not set
16+
* @return The number of threads to use
17+
*/
18+
public static int getOrComputeThreadCountProperty(final String configKey, final int defaultValue) {
19+
final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, defaultValue);
20+
if (numThreads <= 0) {
21+
return Runtime.getRuntime().availableProcessors();
22+
} else {
23+
return numThreads;
24+
}
25+
}
26+
}

engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package io.deephaven.engine.table.impl;
55

66
import io.deephaven.chunk.util.pools.MultiChunkPool;
7-
import io.deephaven.configuration.Configuration;
87
import io.deephaven.engine.context.ExecutionContext;
98
import io.deephaven.engine.updategraph.OperationInitializer;
109
import io.deephaven.util.thread.NamingThreadFactory;
@@ -17,6 +16,8 @@
1716
import java.util.concurrent.ThreadPoolExecutor;
1817
import java.util.concurrent.TimeUnit;
1918

19+
import static io.deephaven.util.thread.ThreadHelpers.getOrComputeThreadCountProperty;
20+
2021
/**
2122
* Implementation of OperationInitializer that delegates to a pool of threads.
2223
*/
@@ -25,17 +26,8 @@ public class OperationInitializationThreadPool implements OperationInitializer {
2526
/**
2627
* The number of threads that will be used for parallel initialization in this process
2728
*/
28-
public static final int NUM_THREADS;
29-
30-
static {
31-
final int numThreads =
32-
Configuration.getInstance().getIntegerWithDefault("OperationInitializationThreadPool.threads", -1);
33-
if (numThreads <= 0) {
34-
NUM_THREADS = Runtime.getRuntime().availableProcessors();
35-
} else {
36-
NUM_THREADS = numThreads;
37-
}
38-
}
29+
private static final int NUM_THREADS =
30+
getOrComputeThreadCountProperty("OperationInitializationThreadPool.threads", -1);
3931
private final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);
4032

4133
private final ThreadPoolExecutor executorService;

engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
38903890
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");
38913891

38923892
final Table loaded = ParquetTools.readPartitionedTableInferSchema(
3893-
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
3893+
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
38943894
ParquetInstructions.EMPTY);
38953895

38963896
// verify the sources are identical

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java

+20-66
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,19 @@
33
//
44
package io.deephaven.parquet.base;
55

6-
import io.deephaven.UncheckedDeephavenException;
76
import io.deephaven.util.channel.CachedChannelProvider;
87
import io.deephaven.util.channel.SeekableChannelContext;
98
import io.deephaven.util.channel.SeekableChannelsProvider;
10-
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
119
import org.apache.parquet.format.*;
1210
import org.apache.parquet.format.ColumnOrder;
1311
import org.apache.parquet.format.Type;
1412
import org.apache.parquet.schema.*;
1513
import org.jetbrains.annotations.NotNull;
16-
import org.jetbrains.annotations.Nullable;
1714

1815
import java.io.File;
1916
import java.io.IOException;
2017
import java.io.InputStream;
18+
import java.io.UncheckedIOException;
2119
import java.net.URI;
2220
import java.nio.channels.SeekableByteChannel;
2321
import java.util.*;
@@ -44,94 +42,50 @@ public class ParquetFileReader {
4442

4543
/**
4644
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
47-
* {@link UncheckedDeephavenException}.
45+
* {@link UncheckedIOException}.
4846
*
4947
* @param parquetFile The parquet file or the parquet metadata file
50-
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
51-
* channels
48+
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
5249
* @return The new {@link ParquetFileReader}
5350
*/
5451
public static ParquetFileReader create(
5552
@NotNull final File parquetFile,
56-
@Nullable final Object specialInstructions) {
53+
@NotNull final SeekableChannelsProvider channelsProvider) {
5754
try {
58-
return createChecked(parquetFile, specialInstructions);
59-
} catch (IOException e) {
60-
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
55+
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
56+
} catch (final IOException e) {
57+
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
6158
}
6259
}
6360

6461
/**
6562
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
66-
* {@link UncheckedDeephavenException}.
63+
* {@link UncheckedIOException}.
6764
*
6865
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
69-
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
70-
* channels
66+
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
7167
* @return The new {@link ParquetFileReader}
7268
*/
7369
public static ParquetFileReader create(
7470
@NotNull final URI parquetFileURI,
75-
@Nullable final Object specialInstructions) {
71+
@NotNull final SeekableChannelsProvider channelsProvider) {
7672
try {
77-
return createChecked(parquetFileURI, specialInstructions);
78-
} catch (IOException e) {
79-
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
73+
return new ParquetFileReader(parquetFileURI, channelsProvider);
74+
} catch (final IOException e) {
75+
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e);
8076
}
8177
}
8278

83-
/**
84-
* Make a {@link ParquetFileReader} for the supplied {@link File}.
85-
*
86-
* @param parquetFile The parquet file or the parquet metadata file
87-
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
88-
* channels
89-
* @return The new {@link ParquetFileReader}
90-
* @throws IOException if an IO exception occurs
91-
*/
92-
public static ParquetFileReader createChecked(
93-
@NotNull final File parquetFile,
94-
@Nullable final Object specialInstructions) throws IOException {
95-
return createChecked(convertToURI(parquetFile, false), specialInstructions);
96-
}
97-
98-
/**
99-
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
100-
*
101-
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
102-
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
103-
* channels
104-
* @return The new {@link ParquetFileReader}
105-
* @throws IOException if an IO exception occurs
106-
*/
107-
public static ParquetFileReader createChecked(
108-
@NotNull final URI parquetFileURI,
109-
@Nullable final Object specialInstructions) throws IOException {
110-
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
111-
parquetFileURI, specialInstructions);
112-
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
113-
}
114-
115-
/**
116-
* Create a new ParquetFileReader for the provided source.
117-
*
118-
* @param source The source path or URI for the parquet file or the parquet metadata file
119-
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
120-
*/
121-
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
122-
throws IOException {
123-
this(convertToURI(source, false), channelsProvider);
124-
}
125-
12679
/**
12780
* Create a new ParquetFileReader for the provided source.
12881
*
12982
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
130-
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
83+
* @param provider The {@link SeekableChannelsProvider} to use for reading the file
13184
*/
132-
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
133-
throws IOException {
134-
this.channelsProvider = channelsProvider;
85+
private ParquetFileReader(
86+
@NotNull final URI parquetFileURI,
87+
@NotNull final SeekableChannelsProvider provider) throws IOException {
88+
this.channelsProvider = CachedChannelProvider.create(provider, 1 << 7);
13589
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
13690
// Construct a new file URI for the parent directory
13791
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
@@ -270,7 +224,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {
270224

271225
/**
272226
* Create a {@link RowGroupReader} object for provided row group number
273-
*
227+
*
274228
* @param version The "version" string from deephaven specific parquet metadata, or null if it's not present.
275229
*/
276230
public RowGroupReader getRowGroup(final int groupNumber, final String version) {
@@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp
506460

507461
/**
508462
* Helper method to determine if a logical type is adjusted to UTC.
509-
*
463+
*
510464
* @param logicalType the logical type to check
511465
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
512466
*/

0 commit comments

Comments
 (0)