Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a temporary directory per file system and preserve the directory structure locally #641

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public class S3FileSystem extends FileSystem {

// private S3AsyncClient client;
private final S3NioSpiConfiguration configuration;
private final Path temporaryDirectory;

/**
* Create a filesystem that represents the bucket specified by the URI
*
* @param provider the provider to be used with this fileSystem
* @param config the configuration to use; can be null to use a default configuration
* @param temporaryDirectory the local temporary directory for the S3 object
*/
S3FileSystem(S3FileSystemProvider provider, S3NioSpiConfiguration config) {
S3FileSystem(S3FileSystemProvider provider, S3NioSpiConfiguration config, Path temporaryDirectory) {
configuration = (config == null) ? new S3NioSpiConfiguration() : config;
bucketName = configuration.getBucketName();

Expand All @@ -82,6 +84,7 @@ public class S3FileSystem extends FileSystem {

clientProvider = new S3ClientProvider(configuration);
this.provider = provider;
this.temporaryDirectory = temporaryDirectory;
}

/**
Expand Down Expand Up @@ -488,6 +491,25 @@ boolean deregisterClosedChannel(S3SeekableByteChannel closedChannel) {
return openChannels.remove(closedChannel);
}

/**
* Creates a new local temporary file for an S3 object.
*
* @return new local temporary file
*/
Path createTempFile(S3Path path) throws IOException {
if (path.isDirectory()) {
throw new IllegalArgumentException("path must be a file");
}
if (path.getNameCount() == 1) {
Path newPath = temporaryDirectory.resolve(path.getFileName().toString());
return Files.createFile(newPath);
}
Path parent = temporaryDirectory.resolve(path.getParent().toString());
Files.createDirectories(parent);
Path newPath = parent.resolve(path.getFileName().toString());
return Files.createFile(newPath);
}

/**
* Tests if two S3 filesystems are equal
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static software.amazon.nio.spi.s3.util.TimeOutUtils.logAndGenerateExceptionOnTimeOut;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -253,7 +254,13 @@ public FileSystem getFileSystem(URI uri) {
if (info.accessKey() != null) {
config.withCredentials(info.accessKey(), info.accessSecret());
}
return new S3FileSystem(this, config);
Path temporaryDirectory;
try {
temporaryDirectory = Files.createTempDirectory("aws-s3-nio-");
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
return new S3FileSystem(this, config, temporaryDirectory);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class S3WritableByteChannel implements SeekableByteChannel {
throw new NoSuchFileException("File at path:" + path + " does not exist yet");
}

tempFile = Files.createTempFile("aws-s3-nio-", ".tmp");
tempFile = path.getFileSystem().createTempFile(path);
if (exists) {
s3TransferUtil.downloadToLocalFile(path, tempFile);
}
Expand Down
44 changes: 38 additions & 6 deletions src/test/java/software/amazon/nio/spi/s3/S3FileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@
package software.amazon.nio.spi.s3;

import static org.assertj.core.api.BDDAssertions.then;
import static org.assertj.core.api.BDDAssertions.thenThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import static software.amazon.nio.spi.s3.Constants.PATH_SEPARATOR;
import static software.amazon.nio.spi.s3.S3Matchers.anyConsumer;

import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

@ExtendWith(MockitoExtension.class)
public class S3FileSystemTest {
Expand All @@ -43,8 +42,6 @@ public void init() {
provider = new S3FileSystemProvider();
s3FileSystem = (S3FileSystem) provider.getFileSystem(s3Uri);
s3FileSystem.clientProvider = new FixedS3ClientProvider(mockClient);
lenient().when(mockClient.headObject(anyConsumer())).thenReturn(
CompletableFuture.supplyAsync(() -> HeadObjectResponse.builder().contentLength(100L).build()));
}

@AfterEach
Expand Down Expand Up @@ -124,6 +121,41 @@ public void getPathMatcher() {
s3FileSystem.getPathMatcher("glob:*.*").getClass());
}

@Test
void createTempFile() throws IOException {
var temporaryDirectory = s3FileSystemTemporaryDirectory();

thenThrownBy(() -> s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, "/dir/")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("path must be a file");
thenThrownBy(() -> s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, "/dir1/dir2/dir3/")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("path must be a file");

var key1 = "file1";
var tempFile1 = s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, key1));
then(tempFile1).exists().isEqualTo(temporaryDirectory.resolve(key1));

var key2 = "/file2";
var tempFile2 = s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, key2));
then(tempFile2).exists().isEqualTo(temporaryDirectory.resolve(key2.substring(1)));

var key3 = "/dir1/dir2/file3";
var tempFile3 = s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, key3));
then(tempFile3).exists().isEqualTo(temporaryDirectory.resolve(key3.substring(1)));

var key4 = "dir1/dir2/file4";
var tempFile4 = s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, key4));
then(tempFile4).exists().isEqualTo(temporaryDirectory.resolve(key4));
}

private Path s3FileSystemTemporaryDirectory() throws IOException {
var tempFile0 = s3FileSystem.createTempFile(S3Path.getPath(s3FileSystem, "file0"));
var temporaryDirectory = tempFile0.getParent();
Files.delete(tempFile0);
return temporaryDirectory;
}

@Test
public void testGetOpenChannelsIsNotModifiable() {
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
Expand All @@ -43,12 +46,13 @@ class S3WritableByteChannelTest {

@Test
@DisplayName("when file exists and constructor is invoked with option `CREATE_NEW` should throw FileAlreadyExistsException")
void whenFileExistsAndCreateNewShouldThrowFileAlreadyExistsException() throws InterruptedException, TimeoutException {
void whenFileExistsAndCreateNewShouldThrowFileAlreadyExistsException() throws InterruptedException, TimeoutException, IOException {
S3FileSystemProvider provider = mock();
when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(true);

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));

var file = S3Path.getPath(fs, "somefile");
assertThatThrownBy(() -> new S3WritableByteChannel(file, mock(), mock(), Set.of(CREATE_NEW)))
Expand Down Expand Up @@ -77,6 +81,7 @@ void shouldBeSeekable() throws InterruptedException, TimeoutException, IOExcepti

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));

var file = S3Path.getPath(fs, "somefile");
var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(READ, WRITE));
Expand All @@ -103,6 +108,7 @@ void shouldNotThrowWhen(boolean fileExists, Set<StandardOpenOption> openOptions)

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));

var file = S3Path.getPath(fs, "somefile");
new S3WritableByteChannel(file, mock(), mock(), openOptions).close();
Expand All @@ -116,6 +122,7 @@ void shouldBeOpenBeforeClose() throws InterruptedException, TimeoutException, IO

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));

var file = S3Path.getPath(fs, "somefile");
try(var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(CREATE))){
Expand All @@ -131,6 +138,7 @@ void shouldBeNotOpenAfterClose() throws InterruptedException, TimeoutException,

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));

var file = S3Path.getPath(fs, "somefile");
var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(CREATE));
Expand All @@ -143,16 +151,20 @@ void shouldBeNotOpenAfterClose() throws InterruptedException, TimeoutException,
void tmpFileIsCleanedUpAfterClose(@TempDir Path tempDir) throws InterruptedException, TimeoutException, IOException {
S3FileSystemProvider provider = mock();
when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(false);
S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
var file = S3Path.getPath(fs, "somefile");

var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(CREATE));

var countAfterOpening = countTemporaryFiles(tempDir);
channel.close();
var countAfterClosing = countTemporaryFiles(tempDir);
assertThat(countAfterClosing).isLessThan(countAfterOpening);
var fs = new S3FileSystem(provider, null, tempDir);
var file1 = S3Path.getPath(fs, "file1");
var file2 = S3Path.getPath(fs, "dir1/file2");
var file3 = S3Path.getPath(fs, "dir1/dir2/file3");

var channel1 = new S3WritableByteChannel(file1, mock(), mock(), Set.of(CREATE));
var channel2 = new S3WritableByteChannel(file2, mock(), mock(), Set.of(CREATE));
var channel3 = new S3WritableByteChannel(file3, mock(), mock(), Set.of(CREATE));

assertThat(countTemporaryFiles(tempDir)).isEqualTo(3);
channel1.close();
channel2.close();
channel3.close();
assertThat(countTemporaryFiles(tempDir)).isZero();
}

@Test
Expand All @@ -162,6 +174,7 @@ void secondCloseIsNoOp() throws InterruptedException, TimeoutException, IOExcept
when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(false);
S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);
when(fs.createTempFile(any(S3Path.class))).thenReturn(Files.createTempFile("", ""));
var file = S3Path.getPath(fs, "somefile");

S3TransferUtil utilMock = mock();
Expand All @@ -174,11 +187,17 @@ void secondCloseIsNoOp() throws InterruptedException, TimeoutException, IOExcept
}

private long countTemporaryFiles(Path tempDir) throws IOException {
try (var list = Files.list(tempDir.getParent())) {
return list
.filter((path) -> path.getFileName().toString().contains("aws-s3-nio-"))
.count();
}
var visitor = new SimpleFileVisitor<Path>() {
int fileCount = 0;

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
fileCount++;
return super.visitFile(file, attrs);
}
};
Files.walkFileTree(tempDir, visitor);
return visitor.fileCount;
}

private Stream<Arguments> acceptedFileExistsAndOpenOptions() {
Expand Down