Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable random access when opening a SeekableByteChannel or FileChannel #618

Merged
merged 1 commit into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import static org.assertj.core.api.Assertions.*;
import static software.amazon.nio.spi.s3.Containers.*;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@DisplayName("FileChannel$open* should read and write on S3")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FileChannelOpenTest {

String bucketName;

@BeforeEach
public void createBucket() {
bucketName = "file-channel-bucket" + System.currentTimeMillis();
Containers.createBucket(bucketName);
}

@Test
@DisplayName("open with CREATE and WRITE is supported")
public void open_CREATE_WRITE() throws IOException {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/fc-create-write-test.txt"));

String text = "we test FileChannel#open with CREATE and WRITE options";
try (var channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}

assertThat(path).hasContent(text);
}

@Test
@DisplayName("open with READ and WRITE is supported")
public void open_READ_WRITE() throws IOException {
var path = putObject(bucketName, "fc-read-write-test.txt");

String text = "abcdefhij";
try (var channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) {

// write
channel.write(ByteBuffer.wrap("def".getBytes()), 3);
channel.write(ByteBuffer.wrap("abc".getBytes()), 0);
channel.write(ByteBuffer.wrap("hij".getBytes()), 6);

// read
var dst = ByteBuffer.allocate(text.getBytes().length);
channel.read(dst, 0);

// verify
assertThat(dst.array()).isEqualTo(text.getBytes());
}

assertThat(path).hasContent(text);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import static org.assertj.core.api.Assertions.*;
import static software.amazon.nio.spi.s3.Containers.*;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@DisplayName("Files$newByteChannel* should read and write on S3")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FilesNewByteChannelTest {

String bucketName;

@BeforeEach
public void createBucket() {
bucketName = "byte-channel-bucket" + System.currentTimeMillis();
Containers.createBucket(bucketName);
}

@Test
@DisplayName("newByteChannel with CREATE and WRITE is supported")
public void newByteChannel_CREATE_WRITE() throws IOException {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-create-write-test.txt"));

String text = "we test Files#newByteChannel";
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}

assertThat(path).hasContent(text);
}

@Test
@DisplayName("newByteChannel with READ and WRITE is supported")
public void newByteChannel_READ_WRITE() throws IOException {
var path = putObject(bucketName, "bc-read-write-test.txt", "xyz");

String text = "abcdefhij";
try (var channel = Files.newByteChannel(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) {

// write
channel.position(3);
channel.write(ByteBuffer.wrap("def".getBytes()));
channel.position(0);
channel.write(ByteBuffer.wrap("abc".getBytes()));
channel.position(6);
channel.write(ByteBuffer.wrap("hij".getBytes()));

// read
var dst = ByteBuffer.allocate(text.getBytes().length);
channel.position(0);
channel.read(dst);

// verify
assertThat(dst.array()).isEqualTo(text.getBytes());
}

assertThat(path).hasContent(text);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,9 @@ public void setConfiguration(S3NioSpiConfiguration configuration) {
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs)
throws IOException {

S3FileSystem fs = (S3FileSystem) getFileSystem(path.toUri());
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel((S3Path) path, fs.client(), options);
S3Path p = (S3Path) path;
S3FileSystem fs = p.getFileSystem();
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options);
return new S3FileChannel(s3SeekableByteChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class S3SeekableByteChannel implements SeekableByteChannel {

private static final Logger LOGGER = LoggerFactory.getLogger(S3SeekableByteChannel.class);

private final Set<? extends OpenOption> options;
private long position;
private final S3Path path;
private final ReadableByteChannel readDelegate;
Expand All @@ -44,11 +45,9 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
position = startAt;
path = s3Path;
closed = false;
this.options = options;
s3Path.getFileSystem().registerOpenChannel(this);

if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) {
throw new IOException("This channel does not support read and write access simultaneously");
}
if (options.contains(StandardOpenOption.SYNC) || options.contains(StandardOpenOption.DSYNC)) {
throw new IOException("The SYNC/DSYNC options is not supported");
}
Expand Down Expand Up @@ -86,6 +85,10 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
public int read(ByteBuffer dst) throws IOException {
validateOpen();

if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) {
return writeDelegate.read(dst);
}

if (readDelegate == null) {
throw new NonReadableChannelException();
}
Expand Down Expand Up @@ -115,9 +118,6 @@ public int write(ByteBuffer src) throws IOException {
throw new NonWritableChannelException();
}

var length = src.remaining();
this.position += length;

return writeDelegate.write(src);
}

Expand All @@ -132,6 +132,10 @@ public int write(ByteBuffer src) throws IOException {
public long position() throws IOException {
validateOpen();

if (writeDelegate != null) {
return writeDelegate.position();
}

synchronized (this) {
return position;
}
Expand Down Expand Up @@ -170,7 +174,12 @@ public SeekableByteChannel position(long newPosition) throws IOException {
throw new ClosedChannelException();
}

// this is only valid to read channels
if (writeDelegate != null) {
writeDelegate.position(newPosition);
return this;
}

// this is only valid to read-only channels
if (readDelegate == null) {
throw new NonReadableChannelException();
}
Expand All @@ -191,6 +200,10 @@ public SeekableByteChannel position(long newPosition) throws IOException {
public long size() throws IOException {
validateOpen();

if (writeDelegate != null) {
return writeDelegate.size();
}

if (size < 0) {
fetchSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand All @@ -24,7 +23,7 @@
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.awssdk.services.s3.S3AsyncClient;

class S3WritableByteChannel implements WritableByteChannel {
class S3WritableByteChannel implements SeekableByteChannel {
private final S3Path path;
private final Path tempFile;
private final SeekableByteChannel channel;
Expand Down Expand Up @@ -110,4 +109,30 @@ protected void force() throws IOException {
}
s3TransferUtil.uploadLocalFile(path, tempFile);
}

@Override
public long position() throws IOException {
return channel.position();
}

@Override
public SeekableByteChannel position(long newPosition) throws IOException {
channel.position(newPosition);
return this;
}

@Override
public int read(ByteBuffer dst) throws IOException {
return channel.read(dst);
}

@Override
public long size() throws IOException {
return channel.size();
}

@Override
public SeekableByteChannel truncate(long size) throws IOException {
throw new UnsupportedOperationException("Currently not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public void write() throws IOException {
when(mockClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.supplyAsync(() ->
PutObjectResponse.builder().build()));
try(var channel = new S3SeekableByteChannel(path, mockClient, Set.<OpenOption>of(CREATE, WRITE))){
channel.write(ByteBuffer.allocate(1));
assertEquals(0L, channel.size());
channel.write(ByteBuffer.allocate(12));
assertEquals(12L, channel.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand All @@ -25,6 +26,8 @@

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -66,6 +69,31 @@ void whenFileDoesNotExistsAndNoCreateNewShouldThrowNoSuchFileException() throws
.isInstanceOf(NoSuchFileException.class);
}

@Test
@DisplayName("S3WritableByteChannel is a SeekableByteChannel")
void shouldBeSeekable() throws InterruptedException, TimeoutException, IOException {
S3FileSystemProvider provider = mock();
when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(true);

S3FileSystem fs = mock();
when(fs.provider()).thenReturn(provider);

var file = S3Path.getPath(fs, "somefile");
var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(READ, WRITE));
assertThat(channel.size()).isZero();
assertThat(channel.position()).isZero();
channel.write(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }));
assertThat(channel.position()).isEqualTo(4);
assertThat(channel.position(2)).isSameAs(channel);
channel.write(ByteBuffer.wrap(new byte[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }));
assertThat(channel.size()).isEqualTo(12);
assertThat(channel.position(3)).isSameAs(channel);
ByteBuffer buffer = ByteBuffer.allocate(6);
channel.read(buffer);
assertThat(buffer.array()).contains(4, 5, 6, 7, 8, 9);
assertThatThrownBy(() -> channel.truncate(6)).isInstanceOf(UnsupportedOperationException.class);
}

@ParameterizedTest(name = "can be instantiated when file exists ({0}) and open options are {1}")
@MethodSource("acceptedFileExistsAndOpenOptions")
@DisplayName("S3WritableByteChannel")
Expand Down