From b6a9b1d8321a133949dda7a21109181507cf0ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Thu, 20 Feb 2025 22:22:47 +0100 Subject: [PATCH] Implement `SeekableByteChannel` in `S3WritableByteChannel` - Get size from write channel instead of fetching - This is a workaround for the case where the file does not exist remotely but the caller is working with the open option `CREATE`. - Add unit tests --- .../nio/spi/s3/FileChannelOpenTest.java | 72 ++++++++++++++++++ .../nio/spi/s3/FilesNewByteChannelTest.java | 76 +++++++++++++++++++ .../nio/spi/s3/S3FileSystemProvider.java | 5 +- .../nio/spi/s3/S3SeekableByteChannel.java | 27 +++++-- .../nio/spi/s3/S3WritableByteChannel.java | 29 ++++++- .../nio/spi/s3/S3SeekableByteChannelTest.java | 4 +- .../nio/spi/s3/S3WritableByteChannelTest.java | 28 +++++++ 7 files changed, 229 insertions(+), 12 deletions(-) create mode 100644 src/integrationTest/java/software/amazon/nio/spi/s3/FileChannelOpenTest.java create mode 100644 src/integrationTest/java/software/amazon/nio/spi/s3/FilesNewByteChannelTest.java diff --git a/src/integrationTest/java/software/amazon/nio/spi/s3/FileChannelOpenTest.java b/src/integrationTest/java/software/amazon/nio/spi/s3/FileChannelOpenTest.java new file mode 100644 index 00000000..a58893ed --- /dev/null +++ b/src/integrationTest/java/software/amazon/nio/spi/s3/FileChannelOpenTest.java @@ -0,0 +1,72 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.nio.spi.s3; + +import static org.assertj.core.api.Assertions.*; +import static software.amazon.nio.spi.s3.Containers.*; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@DisplayName("FileChannel$open* should read and write on S3") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class FileChannelOpenTest { + + String bucketName; + + @BeforeEach + public void createBucket() { + bucketName = "file-channel-bucket" + System.currentTimeMillis(); + Containers.createBucket(bucketName); + } + + @Test + @DisplayName("open with CREATE and WRITE is supported") + public void open_CREATE_WRITE() throws IOException { + var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/fc-create-write-test.txt")); + + String text = "we test FileChannel#open with CREATE and WRITE options"; + try (var channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.write(ByteBuffer.wrap(text.getBytes())); + } + + assertThat(path).hasContent(text); + } + + @Test + @DisplayName("open with READ and WRITE is supported") + public void open_READ_WRITE() throws IOException { + var path = putObject(bucketName, "fc-read-write-test.txt"); + + String text = "abcdefhij"; + try (var channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + + // write + channel.write(ByteBuffer.wrap("def".getBytes()), 3); + channel.write(ByteBuffer.wrap("abc".getBytes()), 0); + channel.write(ByteBuffer.wrap("hij".getBytes()), 6); + + // read + var dst = ByteBuffer.allocate(text.getBytes().length); + channel.read(dst, 0); + + // verify + assertThat(dst.array()).isEqualTo(text.getBytes()); + } + + assertThat(path).hasContent(text); + } + +} diff --git a/src/integrationTest/java/software/amazon/nio/spi/s3/FilesNewByteChannelTest.java b/src/integrationTest/java/software/amazon/nio/spi/s3/FilesNewByteChannelTest.java new file mode 100644 index 00000000..0ed544ba --- /dev/null +++ b/src/integrationTest/java/software/amazon/nio/spi/s3/FilesNewByteChannelTest.java @@ -0,0 +1,76 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.nio.spi.s3; + +import static org.assertj.core.api.Assertions.*; +import static software.amazon.nio.spi.s3.Containers.*; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@DisplayName("Files$newByteChannel* should read and write on S3") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class FilesNewByteChannelTest { + + String bucketName; + + @BeforeEach + public void createBucket() { + bucketName = "byte-channel-bucket" + System.currentTimeMillis(); + Containers.createBucket(bucketName); + } + + @Test + @DisplayName("newByteChannel with CREATE and WRITE is supported") + public void newByteChannel_CREATE_WRITE() throws IOException { + var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-create-write-test.txt")); + + String text = "we test Files#newByteChannel"; + try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.write(ByteBuffer.wrap(text.getBytes())); + } + + assertThat(path).hasContent(text); + } + + @Test + @DisplayName("newByteChannel with READ and WRITE is supported") + public void newByteChannel_READ_WRITE() throws IOException { + var path = putObject(bucketName, "bc-read-write-test.txt", "xyz"); + + String text = "abcdefhij"; + try (var channel = Files.newByteChannel(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + + // write + channel.position(3); + channel.write(ByteBuffer.wrap("def".getBytes())); + channel.position(0); + channel.write(ByteBuffer.wrap("abc".getBytes())); + channel.position(6); + channel.write(ByteBuffer.wrap("hij".getBytes())); + + // read + var dst = ByteBuffer.allocate(text.getBytes().length); + channel.position(0); + channel.read(dst); + + // verify + assertThat(dst.array()).isEqualTo(text.getBytes()); + } + + assertThat(path).hasContent(text); + } + +} diff --git a/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java b/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java index 8d8ea23c..3828e21f 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java @@ -835,8 +835,9 @@ public void setConfiguration(S3NioSpiConfiguration configuration) { public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - S3FileSystem fs = (S3FileSystem) getFileSystem(path.toUri()); - S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel((S3Path) path, fs.client(), options); + S3Path p = (S3Path) path; + S3FileSystem fs = p.getFileSystem(); + S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options); return new S3FileChannel(s3SeekableByteChannel); } diff --git a/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java b/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java index 1f2f9060..e3669dbb 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java @@ -27,6 +27,7 @@ class S3SeekableByteChannel implements SeekableByteChannel { private static final Logger LOGGER = LoggerFactory.getLogger(S3SeekableByteChannel.class); + private final Set options; private long position; private final S3Path path; private final ReadableByteChannel readDelegate; @@ -44,11 +45,9 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA position = startAt; path = s3Path; closed = false; + this.options = options; s3Path.getFileSystem().registerOpenChannel(this); - if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) { - throw new IOException("This channel does not support read and write access simultaneously"); - } if (options.contains(StandardOpenOption.SYNC) || options.contains(StandardOpenOption.DSYNC)) { throw new IOException("The SYNC/DSYNC options is not supported"); } @@ -86,6 +85,10 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA public int read(ByteBuffer dst) throws IOException { validateOpen(); + if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) { + return writeDelegate.read(dst); + } + if (readDelegate == null) { throw new NonReadableChannelException(); } @@ -115,9 +118,6 @@ public int write(ByteBuffer src) throws IOException { throw new NonWritableChannelException(); } - var length = src.remaining(); - this.position += length; - return writeDelegate.write(src); } @@ -132,6 +132,10 @@ public int write(ByteBuffer src) throws IOException { public long position() throws IOException { validateOpen(); + if (writeDelegate != null) { + return writeDelegate.position(); + } + synchronized (this) { return position; } @@ -170,7 +174,12 @@ public SeekableByteChannel position(long newPosition) throws IOException { throw new ClosedChannelException(); } - // this is only valid to read channels + if (writeDelegate != null) { + writeDelegate.position(newPosition); + return this; + } + + // this is only valid to read-only channels if (readDelegate == null) { throw new NonReadableChannelException(); } @@ -191,6 +200,10 @@ public SeekableByteChannel position(long newPosition) throws IOException { public long size() throws IOException { validateOpen(); + if (writeDelegate != null) { + return writeDelegate.size(); + } + if (size < 0) { fetchSize(); } diff --git a/src/main/java/software/amazon/nio/spi/s3/S3WritableByteChannel.java b/src/main/java/software/amazon/nio/spi/s3/S3WritableByteChannel.java index fcc9df33..8573d26c 100644 --- a/src/main/java/software/amazon/nio/spi/s3/S3WritableByteChannel.java +++ b/src/main/java/software/amazon/nio/spi/s3/S3WritableByteChannel.java @@ -9,7 +9,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -24,7 +23,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.awssdk.services.s3.S3AsyncClient; -class S3WritableByteChannel implements WritableByteChannel { +class S3WritableByteChannel implements SeekableByteChannel { private final S3Path path; private final Path tempFile; private final SeekableByteChannel channel; @@ -110,4 +109,30 @@ protected void force() throws IOException { } s3TransferUtil.uploadLocalFile(path, tempFile); } + + @Override + public long position() throws IOException { + return channel.position(); + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + channel.position(newPosition); + return this; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return channel.read(dst); + } + + @Override + public long size() throws IOException { + return channel.size(); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException("Currently not supported"); + } } diff --git a/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java b/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java index f677d82b..fd7a0c26 100644 --- a/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java +++ b/src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java @@ -106,7 +106,9 @@ public void write() throws IOException { when(mockClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.supplyAsync(() -> PutObjectResponse.builder().build())); try(var channel = new S3SeekableByteChannel(path, mockClient, Set.of(CREATE, WRITE))){ - channel.write(ByteBuffer.allocate(1)); + assertEquals(0L, channel.size()); + channel.write(ByteBuffer.allocate(12)); + assertEquals(12L, channel.size()); } } diff --git a/src/test/java/software/amazon/nio/spi/s3/S3WritableByteChannelTest.java b/src/test/java/software/amazon/nio/spi/s3/S3WritableByteChannelTest.java index fe920600..35f98479 100644 --- a/src/test/java/software/amazon/nio/spi/s3/S3WritableByteChannelTest.java +++ b/src/test/java/software/amazon/nio/spi/s3/S3WritableByteChannelTest.java @@ -14,6 +14,7 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -25,6 +26,8 @@ import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; import static java.util.Collections.emptySet; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -66,6 +69,31 @@ void whenFileDoesNotExistsAndNoCreateNewShouldThrowNoSuchFileException() throws .isInstanceOf(NoSuchFileException.class); } + @Test + @DisplayName("S3WritableByteChannel is a SeekableByteChannel") + void shouldBeSeekable() throws InterruptedException, TimeoutException, IOException { + S3FileSystemProvider provider = mock(); + when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(true); + + S3FileSystem fs = mock(); + when(fs.provider()).thenReturn(provider); + + var file = S3Path.getPath(fs, "somefile"); + var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(READ, WRITE)); + assertThat(channel.size()).isZero(); + assertThat(channel.position()).isZero(); + channel.write(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 })); + assertThat(channel.position()).isEqualTo(4); + assertThat(channel.position(2)).isSameAs(channel); + channel.write(ByteBuffer.wrap(new byte[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 })); + assertThat(channel.size()).isEqualTo(12); + assertThat(channel.position(3)).isSameAs(channel); + ByteBuffer buffer = ByteBuffer.allocate(6); + channel.read(buffer); + assertThat(buffer.array()).contains(4, 5, 6, 7, 8, 9); + assertThatThrownBy(() -> channel.truncate(6)).isInstanceOf(UnsupportedOperationException.class); + } + @ParameterizedTest(name = "can be instantiated when file exists ({0}) and open options are {1}") @MethodSource("acceptedFileExistsAndOpenOptions") @DisplayName("S3WritableByteChannel")