Skip to content

Commit c825e15

Browse files
authored
[FLINK-36598][state/ForSt] Provide FileSystem instance in initialization (apache#25638)
1 parent 6fe39c6 commit c825e15

15 files changed

+112
-148
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import javax.annotation.Nonnull;
7272
import javax.annotation.concurrent.GuardedBy;
7373

74-
import java.io.File;
7574
import java.io.IOException;
7675
import java.util.HashSet;
7776
import java.util.LinkedHashMap;
@@ -522,7 +521,7 @@ public void dispose() {
522521
}
523522

524523
@VisibleForTesting
525-
File getLocalBasePath() {
524+
Path getLocalBasePath() {
526525
return optionsContainer.getLocalBasePath();
527526
}
528527

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.typeutils.TypeSerializer;
2222
import org.apache.flink.core.fs.CloseableRegistry;
23+
import org.apache.flink.core.fs.Path;
2324
import org.apache.flink.core.memory.DataInputDeserializer;
2425
import org.apache.flink.core.memory.DataOutputSerializer;
2526
import org.apache.flink.metrics.MetricGroup;
@@ -62,7 +63,6 @@
6263
import javax.annotation.Nonnegative;
6364
import javax.annotation.Nonnull;
6465

65-
import java.io.File;
6666
import java.io.IOException;
6767
import java.util.Collection;
6868
import java.util.Collections;
@@ -301,10 +301,10 @@ private ForStRestoreOperation getForStRestoreOperation(
301301
// working dir. We will implement this in ForStDB later, but before that, we achieved this
302302
// by setting the dbPath to "/" when the dfs directory existed.
303303
// TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir
304-
File instanceForStPath =
304+
Path instanceForStPath =
305305
optionsContainer.getRemoteForStPath() == null
306306
? optionsContainer.getLocalForStPath()
307-
: new File("/");
307+
: new Path("/");
308308

309309
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
310310
return new ForStNoneRestoreOperation(
@@ -377,12 +377,7 @@ private ForStRestoreOperation getForStRestoreOperation(
377377

378378
ForStSnapshotStrategyBase<K, ?> snapshotStrategy;
379379

380-
ForStFlinkFileSystem forStFs =
381-
optionsContainer.getRemoteForStPath() != null
382-
? (ForStFlinkFileSystem)
383-
ForStFlinkFileSystem.get(
384-
optionsContainer.getRemoteForStPath().toUri())
385-
: null;
380+
ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem();
386381
ForStStateDataTransfer stateTransfer =
387382
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs);
388383

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java

+23-23
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public final class ForStResourceContainer implements AutoCloseable {
8080

8181
@Nullable private final Path remoteForStPath;
8282

83-
@Nullable private final File localBasePath;
83+
@Nullable private final Path localBasePath;
8484

85-
@Nullable private final File localForStPath;
85+
@Nullable private final Path localForStPath;
8686

8787
@Nullable private Path cacheBasePath;
8888

@@ -97,7 +97,7 @@ public final class ForStResourceContainer implements AutoCloseable {
9797
@Nullable private final ForStOptionsFactory optionsFactory;
9898

9999
/** The ForSt file system. Null when remote dir is not set. */
100-
@Nullable private FileSystem forstFileSystem;
100+
@Nullable private ForStFlinkFileSystem forstFileSystem;
101101

102102
/**
103103
* The shared resource among ForSt instances. This resource is not part of the 'handlesToClose',
@@ -134,7 +134,7 @@ public ForStResourceContainer(
134134
ReadableConfig configuration,
135135
@Nullable ForStOptionsFactory optionsFactory,
136136
@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources,
137-
@Nullable File localBasePath,
137+
@Nullable Path localBasePath,
138138
@Nullable Path remoteBasePath,
139139
boolean enableStatistics) {
140140

@@ -143,7 +143,7 @@ public ForStResourceContainer(
143143
this.sharedResources = sharedResources;
144144

145145
this.localBasePath = localBasePath;
146-
this.localForStPath = localBasePath != null ? new File(localBasePath, DB_DIR_STRING) : null;
146+
this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null;
147147
this.remoteBasePath = remoteBasePath;
148148
this.remoteForStPath =
149149
remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null;
@@ -266,12 +266,12 @@ public ReadOptions getReadOptions() {
266266
}
267267

268268
@Nullable
269-
public File getLocalBasePath() {
269+
public Path getLocalBasePath() {
270270
return localBasePath;
271271
}
272272

273273
@Nullable
274-
public File getLocalForStPath() {
274+
public Path getLocalForStPath() {
275275
return localForStPath;
276276
}
277277

@@ -289,15 +289,15 @@ public Path getBasePath() {
289289
if (remoteBasePath != null) {
290290
return remoteBasePath;
291291
} else {
292-
return Path.fromLocalFile(localBasePath);
292+
return localBasePath;
293293
}
294294
}
295295

296296
public Path getDbPath() {
297297
if (remoteForStPath != null) {
298298
return remoteForStPath;
299299
} else {
300-
return Path.fromLocalFile(localForStPath);
300+
return localForStPath;
301301
}
302302
}
303303

@@ -331,25 +331,27 @@ public void prepareDirectories() throws Exception {
331331
new Path(localBasePath.getPath()), new Path(localForStPath.getPath()));
332332
}
333333
if (remoteForStPath != null && localForStPath != null) {
334-
ForStFlinkFileSystem.setupLocalBasePath(
335-
remoteForStPath.toString(), localForStPath.toString());
336-
}
337-
if (cacheReservedSize > 0 || cacheCapacity > 0) {
338334
if (cacheBasePath == null && localBasePath != null) {
339335
cacheBasePath = new Path(localBasePath.getPath(), "cache");
340336
LOG.info(
341337
"Cache base path is not configured, set to local base path: {}",
342338
cacheBasePath);
343339
}
344-
ForStFlinkFileSystem.configureCache(cacheBasePath, cacheCapacity, cacheReservedSize);
345-
}
346-
if (remoteForStPath != null) {
347-
forstFileSystem = ForStFlinkFileSystem.get(remoteForStPath.toUri());
340+
forstFileSystem =
341+
ForStFlinkFileSystem.get(
342+
remoteForStPath.toUri(),
343+
localForStPath,
344+
ForStFlinkFileSystem.getFileBasedCache(
345+
cacheBasePath, cacheCapacity, cacheReservedSize));
348346
} else {
349347
forstFileSystem = null;
350348
}
351349
}
352350

351+
public ForStFlinkFileSystem getFileSystem() {
352+
return forstFileSystem;
353+
}
354+
353355
private static void prepareDirectories(Path basePath, Path dbPath) throws IOException {
354356
FileSystem fileSystem = basePath.getFileSystem();
355357
if (fileSystem.exists(basePath)) {
@@ -377,10 +379,9 @@ private static void prepareDirectories(Path basePath, Path dbPath) throws IOExce
377379
public void clearDirectories() throws Exception {
378380
if (remoteBasePath != null) {
379381
clearDirectories(remoteBasePath);
380-
ForStFlinkFileSystem.unregisterLocalBasePath(remoteForStPath.toString());
381382
}
382383
if (localBasePath != null) {
383-
clearDirectories(new Path(localBasePath.getPath()));
384+
clearDirectories(localBasePath);
384385
}
385386
}
386387

@@ -464,7 +465,7 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions)
464465
String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
465466
if (logDir == null || logDir.isEmpty()) {
466467
if (localForStPath == null
467-
|| localForStPath.getAbsolutePath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
468+
|| localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
468469
relocateDefaultDbLogDir(currentOptions);
469470
} else {
470471
// disable log relocate when instance path length exceeds limit to prevent ForSt
@@ -583,7 +584,7 @@ private void setLocalForStPathAsLogDir(DBOptions dbOptions) {
583584
// issues, so the db log dir is temporarily set explicitly here.
584585
// TODO: remove this method after ForSt deal log dir well
585586
if (localForStPath != null) {
586-
this.relocatedDbLogBaseDir = localForStPath.toPath();
587+
this.relocatedDbLogBaseDir = java.nio.file.Path.of(localForStPath.toUri().toString());
587588
dbOptions.setDbLogDir(localForStPath.getPath());
588589
}
589590
}
@@ -604,8 +605,7 @@ private void cleanRelocatedDbLogs() {
604605
if (localForStPath != null && relocatedDbLogBaseDir != null) {
605606
LOG.info("Cleaning up relocated ForSt logs: {}.", relocatedDbLogBaseDir);
606607

607-
String relocatedDbLogPrefix =
608-
resolveRelocatedDbLogPrefix(localForStPath.getAbsolutePath());
608+
String relocatedDbLogPrefix = resolveRelocatedDbLogPrefix(localForStPath.getPath());
609609
try {
610610
Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
611611
.filter(

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java

+17-13
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,10 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
329329
"op_%s_attempt_%s",
330330
fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber());
331331

332-
File localBasePath =
333-
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath);
332+
Path localBasePath =
333+
new Path(
334+
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
335+
.getAbsolutePath());
334336
Path remoteBasePath =
335337
remoteForStDirectory != null
336338
? new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath)
@@ -391,15 +393,17 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
391393

392394
lazyInitializeForJob(env, fileCompatibleIdentifier);
393395

394-
File instanceBasePath =
395-
new File(
396-
getNextStoragePath(),
397-
"job_"
398-
+ jobId
399-
+ "_op_"
400-
+ fileCompatibleIdentifier
401-
+ "_uuid_"
402-
+ UUID.randomUUID());
396+
Path instanceBasePath =
397+
new Path(
398+
new File(
399+
getNextStoragePath(),
400+
"job_"
401+
+ jobId
402+
+ "_op_"
403+
+ fileCompatibleIdentifier
404+
+ "_uuid_"
405+
+ UUID.randomUUID())
406+
.getAbsolutePath());
403407

404408
LocalRecoveryConfig localRecoveryConfig =
405409
env.getTaskStateManager().createLocalRecoveryConfig();
@@ -671,14 +675,14 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon
671675
}
672676

673677
@VisibleForTesting
674-
ForStResourceContainer createOptionsAndResourceContainer(@Nullable File localBasePath) {
678+
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) {
675679
return createOptionsAndResourceContainer(null, localBasePath, null, false);
676680
}
677681

678682
@VisibleForTesting
679683
private ForStResourceContainer createOptionsAndResourceContainer(
680684
@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources,
681-
@Nullable File localBasePath,
685+
@Nullable Path localBasePath,
682686
@Nullable Path remoteBasePath,
683687
boolean enableStatistics) {
684688

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java

+11-43
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import java.io.File;
4141
import java.io.IOException;
4242
import java.net.URI;
43-
import java.util.Map;
44-
import java.util.concurrent.ConcurrentHashMap;
4543
import java.util.function.Function;
4644

4745
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,11 +58,7 @@ public class ForStFlinkFileSystem extends FileSystem {
6058

6159
private static final long SST_FILE_SIZE = 1024 * 1024 * 64;
6260

63-
private static final Map<String, String> remoteLocalMapping = new ConcurrentHashMap<>();
6461
private static final Function<String, Boolean> miscFileFilter = s -> !s.endsWith(".sst");
65-
private static Path cacheBase = null;
66-
private static long cacheCapacity = Long.MAX_VALUE;
67-
private static long cacheReservedSize = 0;
6862

6963
private final FileSystem localFS;
7064
private final FileSystem delegateFS;
@@ -86,19 +80,6 @@ public ForStFlinkFileSystem(
8680
this.fileBasedCache = fileBasedCache;
8781
}
8882

89-
/**
90-
* Configure cache for ForStFlinkFileSystem.
91-
*
92-
* @param path the cache base path.
93-
* @param cacheCap the cache capacity.
94-
* @param reserveSize the cache reserved size.
95-
*/
96-
public static void configureCache(Path path, long cacheCap, long reserveSize) {
97-
cacheBase = path;
98-
cacheCapacity = cacheCap;
99-
cacheReservedSize = reserveSize;
100-
}
101-
10283
/**
10384
* Returns a reference to the {@link FileSystem} instance for accessing the file system
10485
* identified by the given {@link URI}.
@@ -108,14 +89,20 @@ public static void configureCache(Path path, long cacheCap, long reserveSize) {
10889
* identified by the given {@link URI}.
10990
* @throws IOException thrown if a reference to the file system instance could not be obtained.
11091
*/
111-
public static FileSystem get(URI uri) throws IOException {
112-
String localBase = remoteLocalMapping.get(uri.toString());
113-
Preconditions.checkNotNull(localBase, "localBase is null, remote uri:" + uri);
92+
public static ForStFlinkFileSystem get(URI uri) throws IOException {
11493
return new ForStFlinkFileSystem(
115-
FileSystem.get(uri), uri.toString(), localBase, getFileBasedCache());
94+
FileSystem.get(uri), uri.toString(), System.getProperty("java.io.tmpdir"), null);
11695
}
11796

118-
private static FileBasedCache getFileBasedCache() throws IOException {
97+
public static ForStFlinkFileSystem get(URI uri, Path localBase, FileBasedCache fileBasedCache)
98+
throws IOException {
99+
Preconditions.checkNotNull(localBase, "localBase is null, remote uri: %s.", uri);
100+
return new ForStFlinkFileSystem(
101+
FileSystem.get(uri), uri.toString(), localBase.toString(), fileBasedCache);
102+
}
103+
104+
public static FileBasedCache getFileBasedCache(
105+
Path cacheBase, long cacheCapacity, long cacheReservedSize) throws IOException {
119106
if (cacheBase == null || cacheCapacity <= 0 && cacheReservedSize <= 0) {
120107
return null;
121108
}
@@ -139,25 +126,6 @@ private static FileBasedCache getFileBasedCache() throws IOException {
139126
Integer.MAX_VALUE, cacheLimitPolicy, cacheBase.getFileSystem(), cacheBase);
140127
}
141128

142-
/**
143-
* Setup local base path for corresponding remote base path.
144-
*
145-
* @param remoteBasePath the remote base path.
146-
* @param localBasePath the local base path.
147-
*/
148-
public static void setupLocalBasePath(String remoteBasePath, String localBasePath) {
149-
remoteLocalMapping.put(remoteBasePath, localBasePath);
150-
}
151-
152-
/**
153-
* Unregister local base path for corresponding remote base path.
154-
*
155-
* @param remoteBasePath the remote base path.
156-
*/
157-
public static void unregisterLocalBasePath(String remoteBasePath) {
158-
remoteLocalMapping.remove(remoteBasePath);
159-
}
160-
161129
/**
162130
* Create ByteBufferWritableFSDataOutputStream from specific path which supports to write data
163131
* to ByteBuffer with {@link org.apache.flink.core.fs.FileSystem.WriteMode#OVERWRITE} mode.

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.state.forst.restore;
2020

21+
import org.apache.flink.core.fs.Path;
2122
import org.apache.flink.metrics.MetricGroup;
2223
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
2324
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -36,7 +37,6 @@
3637
import javax.annotation.Nonnull;
3738
import javax.annotation.Nullable;
3839

39-
import java.io.File;
4040
import java.io.IOException;
4141
import java.util.ArrayList;
4242
import java.util.Collections;
@@ -62,13 +62,13 @@ class ForStHandle implements AutoCloseable {
6262

6363
protected ForStHandle(
6464
Map<String, ForStKvStateInfo> kvStateInformation,
65-
File instanceRocksDBPath,
65+
Path instanceRocksDBPath,
6666
DBOptions dbOptions,
6767
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
6868
ForStNativeMetricOptions nativeMetricOptions,
6969
MetricGroup metricGroup) {
7070
this.kvStateInformation = kvStateInformation;
71-
this.dbPath = instanceRocksDBPath.getAbsolutePath();
71+
this.dbPath = instanceRocksDBPath.getPath();
7272
this.dbOptions = dbOptions;
7373
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
7474
this.nativeMetricOptions = nativeMetricOptions;

0 commit comments

Comments
 (0)