Skip to content

Commit 972e205

Browse files
committed
[FLINK-37437][state/forst] Bundle file cache entry with mapping entry and release properly
1 parent 3e39931 commit 972e205

File tree

5 files changed

+67
-23
lines changed

5 files changed

+67
-23
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java

+5-14
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ public synchronized ByteBufferWritableFSDataOutputStream create(
191191
Path dbFilePath, WriteMode overwriteMode) throws IOException {
192192
// Create a file in the mapping table
193193
MappingEntry createdMappingEntry =
194-
fileMappingManager.createNewFile(dbFilePath, overwriteMode == WriteMode.OVERWRITE);
194+
fileMappingManager.createNewFile(
195+
dbFilePath, overwriteMode == WriteMode.OVERWRITE, fileBasedCache);
195196

196197
// The source must be backed by a file
197198
FileBackedMappingEntrySource source =
@@ -359,14 +360,7 @@ public synchronized FileStatus[] listStatus(Path path) throws IOException {
359360

360361
@Override
361362
public synchronized boolean delete(Path path, boolean recursive) throws IOException {
362-
MappingEntry mappingEntry = fileMappingManager.mappingEntry(path.toString());
363-
boolean success = fileMappingManager.deleteFileOrDirectory(path, recursive);
364-
if (fileBasedCache != null && mappingEntry != null) {
365-
// if mappingEntry is not null, it means it is a file, not directory
366-
MappingEntrySource source = mappingEntry.getSource();
367-
fileBasedCache.delete(source.getFilePath());
368-
}
369-
return success;
363+
return fileMappingManager.deleteFileOrDirectory(path, recursive);
370364
}
371365

372366
@Override
@@ -390,11 +384,8 @@ public synchronized int link(String src, Path dst) throws IOException {
390384
public synchronized void registerReusedRestoredFile(
391385
String key, StreamStateHandle stateHandle, Path dbFilePath) {
392386
MappingEntry mappingEntry =
393-
fileMappingManager.registerReusedRestoredFile(key, stateHandle, dbFilePath);
394-
if (fileBasedCache != null) {
395-
fileBasedCache.registerInCache(
396-
mappingEntry.getSourcePath(), stateHandle.getStateSize());
397-
}
387+
fileMappingManager.registerReusedRestoredFile(
388+
key, stateHandle, dbFilePath, fileBasedCache);
398389
}
399390

400391
public synchronized @Nullable MappingEntry getMappingEntry(Path path) {

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.core.fs.FileSystem;
2323
import org.apache.flink.core.fs.Path;
2424
import org.apache.flink.runtime.state.StreamStateHandle;
25+
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
2526
import org.apache.flink.util.Preconditions;
2627

2728
import org.slf4j.Logger;
@@ -61,33 +62,38 @@ public FileMappingManager(FileSystem fileSystem, String remoteBase, String local
6162
}
6263

6364
/** Create a new file in the mapping table. */
64-
public MappingEntry createNewFile(Path filePath, boolean overwrite) {
65+
public MappingEntry createNewFile(Path filePath, boolean overwrite, FileBasedCache cache) {
6566
String key = filePath.toString();
66-
if (FileOwnershipDecider.shouldAlwaysBeLocal(filePath)) {
67+
boolean isLocal = FileOwnershipDecider.shouldAlwaysBeLocal(filePath);
68+
if (isLocal) {
6769
filePath = forceLocalPath(filePath);
6870
}
6971

7072
return addFileToMappingTable(
7173
key,
7274
toUUIDPath(filePath),
7375
FileOwnershipDecider.decideForNewFile(filePath),
76+
isLocal ? null : cache,
7477
true,
7578
overwrite);
7679
}
7780

7881
/** Register a file restored from checkpoints to the mapping table. */
7982
public MappingEntry registerReusedRestoredFile(
80-
String key, StreamStateHandle stateHandle, Path dbFilePath) {
83+
String key, StreamStateHandle stateHandle, Path dbFilePath, FileBasedCache cache) {
8184
// The checkpoint file may contain only the UUID without the file extension, so we:
8285
// - Decide file ownership based on dbFilePath, so we can know the real file type.
8386
// - Add to mapping table based on cpFilePath, so we can access the real file.
8487
LOG.trace("decide restored file ownership based on dbFilePath: {}", dbFilePath);
8588
return addHandleBackedFileToMappingTable(
86-
key, stateHandle, FileOwnershipDecider.decideForRestoredFile(dbFilePath));
89+
key, stateHandle, FileOwnershipDecider.decideForRestoredFile(dbFilePath), cache);
8790
}
8891

8992
private MappingEntry addHandleBackedFileToMappingTable(
90-
String key, StreamStateHandle stateHandle, FileOwnership fileOwnership) {
93+
String key,
94+
StreamStateHandle stateHandle,
95+
FileOwnership fileOwnership,
96+
FileBasedCache cache) {
9197
MappingEntrySource source = new HandleBackedMappingEntrySource(stateHandle);
9298
MappingEntry existingEntry = mappingTable.getOrDefault(key, null);
9399
if (existingEntry != null) {
@@ -105,14 +111,16 @@ private MappingEntry addHandleBackedFileToMappingTable(
105111
LOG.trace("Skip adding a file that already exists in mapping table: {}", key);
106112
}
107113
return existingEntry == null
108-
? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, false))
114+
? addMappingEntry(
115+
key, new MappingEntry(1, source, fileOwnership, cache, false, false))
109116
: existingEntry;
110117
}
111118

112119
private MappingEntry addFileToMappingTable(
113120
String key,
114121
Path filePath,
115122
FileOwnership fileOwnership,
123+
FileBasedCache cache,
116124
boolean writing,
117125
boolean overwrite) {
118126
MappingEntrySource source = new FileBackedMappingEntrySource(filePath);
@@ -144,7 +152,8 @@ private MappingEntry addFileToMappingTable(
144152
}
145153
}
146154
return existingEntry == null
147-
? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, writing))
155+
? addMappingEntry(
156+
key, new MappingEntry(1, source, fileOwnership, cache, false, writing))
148157
: existingEntry;
149158
}
150159

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java

+19
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.flink.core.fs.Path;
2222
import org.apache.flink.runtime.asyncprocessing.ReferenceCounted;
2323
import org.apache.flink.runtime.state.StreamStateHandle;
24+
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
2425
import org.apache.flink.util.Preconditions;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

2930
import javax.annotation.Nullable;
3031

32+
import java.io.IOException;
33+
3134
/**
3235
* A file mapping entry that encapsulates source and destination path. Source Path : dest Path = 1 :
3336
* N.
@@ -40,6 +43,8 @@ public class MappingEntry extends ReferenceCounted {
4043

4144
FileOwnership fileOwnership;
4245

46+
final @Nullable FileBasedCache cache;
47+
4348
final boolean isDirectory;
4449

4550
volatile boolean writing;
@@ -56,6 +61,7 @@ public MappingEntry(
5661
initReference,
5762
new HandleBackedMappingEntrySource(stateHandle),
5863
fileOwnership,
64+
null,
5965
isDirectory,
6066
false);
6167
}
@@ -66,6 +72,7 @@ public MappingEntry(
6672
initReference,
6773
new FileBackedMappingEntrySource(sourcePath),
6874
fileOwnership,
75+
null,
6976
isDirectory,
7077
false);
7178
}
@@ -74,14 +81,23 @@ public MappingEntry(
7481
int initReference,
7582
MappingEntrySource source,
7683
FileOwnership fileOwnership,
84+
FileBasedCache cache,
7785
boolean isDirectory,
7886
boolean writing) {
7987
super(initReference);
8088
this.source = source;
8189
this.parentDir = null;
8290
this.fileOwnership = fileOwnership;
91+
this.cache = cache;
8392
this.isDirectory = isDirectory;
8493
this.writing = writing;
94+
if (!writing && cache != null && !isDirectory && source.cacheable()) {
95+
try {
96+
cache.registerInCache(source.getFilePath(), source.getSize());
97+
} catch (IOException e) {
98+
LOG.warn("Failed to register file {} in cache.", source, e);
99+
}
100+
}
85101
}
86102

87103
public void setFileOwnership(FileOwnership ownership) {
@@ -133,6 +149,9 @@ protected void referenceCountReachedZero(@Nullable Object o) {
133149
return;
134150
}
135151
source.delete(isDirectory);
152+
if (cache != null && !isDirectory && source.cacheable()) {
153+
cache.delete(source.getFilePath());
154+
}
136155
} catch (Exception e) {
137156
LOG.warn("Failed to delete file {}.", source, e);
138157
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public static List<Object[]> params() {
5353
private MappingEntry registerFile(FileMappingManager manager, Path filePath) {
5454
if (reuseCp) {
5555
return manager.registerReusedRestoredFile(
56-
filePath.toString(), new FileStateHandle(filePath, 0), filePath);
56+
filePath.toString(), new FileStateHandle(filePath, 0), filePath, null);
5757
} else {
58-
return manager.createNewFile(filePath, false);
58+
return manager.createNewFile(filePath, false, null);
5959
}
6060
}
6161

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,31 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
348348
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
349349

350350
is.close();
351+
352+
// test link and deleted by reference
353+
long waitLoaded = 0L;
354+
while (waitLoaded < 30000L && cacheEntry1.getReferenceCount() <= 0) {
355+
try {
356+
Thread.sleep(100);
357+
waitLoaded += 100;
358+
} catch (InterruptedException e) {
359+
throw new RuntimeException(e);
360+
}
361+
}
362+
assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
363+
org.apache.flink.core.fs.Path sstRemotePath4 =
364+
new org.apache.flink.core.fs.Path(remotePath, "4.sst");
365+
fileSystem.link(sstRemotePath1, sstRemotePath4);
366+
assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
367+
assertThat(fileSystem.exists(sstRemotePath4)).isTrue();
368+
fileSystem.delete(sstRemotePath1, false);
369+
assertThat(fileSystem.exists(sstRemotePath1)).isFalse();
370+
assertThat(fileSystem.exists(sstRemotePath4)).isTrue();
371+
assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
372+
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
373+
fileSystem.delete(sstRemotePath4, false);
374+
assertThat(cacheEntry1.getReferenceCount()).isEqualTo(0);
375+
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(0L);
351376
}
352377

353378
@Test

0 commit comments

Comments
 (0)