Skip to content

Commit 33e4046

Browse files
authored
feat: Implement SeekableByteChannel in S3WritableByteChannel (#618)
- Get size from write channel instead of fetching - This is a workaround for the case where the file does not exist remotely but the caller is working with the open option `CREATE`. - Add unit tests
1 parent 8acaf28 commit 33e4046

File tree

7 files changed

+229
-12
lines changed

7 files changed

+229
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import static org.assertj.core.api.Assertions.*;
9+
import static software.amazon.nio.spi.s3.Containers.*;
10+
11+
import java.io.IOException;
12+
import java.net.URI;
13+
import java.nio.ByteBuffer;
14+
import java.nio.channels.FileChannel;
15+
import java.nio.file.Paths;
16+
import java.nio.file.StandardOpenOption;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.DisplayName;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.TestInstance;
22+
23+
@DisplayName("FileChannel$open* should read and write on S3")
24+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
25+
public class FileChannelOpenTest {
26+
27+
String bucketName;
28+
29+
@BeforeEach
30+
public void createBucket() {
31+
bucketName = "file-channel-bucket" + System.currentTimeMillis();
32+
Containers.createBucket(bucketName);
33+
}
34+
35+
@Test
36+
@DisplayName("open with CREATE and WRITE is supported")
37+
public void open_CREATE_WRITE() throws IOException {
38+
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/fc-create-write-test.txt"));
39+
40+
String text = "we test FileChannel#open with CREATE and WRITE options";
41+
try (var channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
42+
channel.write(ByteBuffer.wrap(text.getBytes()));
43+
}
44+
45+
assertThat(path).hasContent(text);
46+
}
47+
48+
@Test
49+
@DisplayName("open with READ and WRITE is supported")
50+
public void open_READ_WRITE() throws IOException {
51+
var path = putObject(bucketName, "fc-read-write-test.txt");
52+
53+
String text = "abcdefhij";
54+
try (var channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
55+
56+
// write
57+
channel.write(ByteBuffer.wrap("def".getBytes()), 3);
58+
channel.write(ByteBuffer.wrap("abc".getBytes()), 0);
59+
channel.write(ByteBuffer.wrap("hij".getBytes()), 6);
60+
61+
// read
62+
var dst = ByteBuffer.allocate(text.getBytes().length);
63+
channel.read(dst, 0);
64+
65+
// verify
66+
assertThat(dst.array()).isEqualTo(text.getBytes());
67+
}
68+
69+
assertThat(path).hasContent(text);
70+
}
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import static org.assertj.core.api.Assertions.*;
9+
import static software.amazon.nio.spi.s3.Containers.*;
10+
11+
import java.io.IOException;
12+
import java.net.URI;
13+
import java.nio.ByteBuffer;
14+
import java.nio.file.Files;
15+
import java.nio.file.Paths;
16+
import java.nio.file.StandardOpenOption;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.DisplayName;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.TestInstance;
22+
23+
@DisplayName("Files$newByteChannel* should read and write on S3")
24+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
25+
public class FilesNewByteChannelTest {
26+
27+
String bucketName;
28+
29+
@BeforeEach
30+
public void createBucket() {
31+
bucketName = "byte-channel-bucket" + System.currentTimeMillis();
32+
Containers.createBucket(bucketName);
33+
}
34+
35+
@Test
36+
@DisplayName("newByteChannel with CREATE and WRITE is supported")
37+
public void newByteChannel_CREATE_WRITE() throws IOException {
38+
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-create-write-test.txt"));
39+
40+
String text = "we test Files#newByteChannel";
41+
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
42+
channel.write(ByteBuffer.wrap(text.getBytes()));
43+
}
44+
45+
assertThat(path).hasContent(text);
46+
}
47+
48+
@Test
49+
@DisplayName("newByteChannel with READ and WRITE is supported")
50+
public void newByteChannel_READ_WRITE() throws IOException {
51+
var path = putObject(bucketName, "bc-read-write-test.txt", "xyz");
52+
53+
String text = "abcdefhij";
54+
try (var channel = Files.newByteChannel(path, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
55+
56+
// write
57+
channel.position(3);
58+
channel.write(ByteBuffer.wrap("def".getBytes()));
59+
channel.position(0);
60+
channel.write(ByteBuffer.wrap("abc".getBytes()));
61+
channel.position(6);
62+
channel.write(ByteBuffer.wrap("hij".getBytes()));
63+
64+
// read
65+
var dst = ByteBuffer.allocate(text.getBytes().length);
66+
channel.position(0);
67+
channel.read(dst);
68+
69+
// verify
70+
assertThat(dst.array()).isEqualTo(text.getBytes());
71+
}
72+
73+
assertThat(path).hasContent(text);
74+
}
75+
76+
}

src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -835,8 +835,9 @@ public void setConfiguration(S3NioSpiConfiguration configuration) {
835835
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs)
836836
throws IOException {
837837

838-
S3FileSystem fs = (S3FileSystem) getFileSystem(path.toUri());
839-
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel((S3Path) path, fs.client(), options);
838+
S3Path p = (S3Path) path;
839+
S3FileSystem fs = p.getFileSystem();
840+
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options);
840841
return new S3FileChannel(s3SeekableByteChannel);
841842
}
842843

src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java

+20-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class S3SeekableByteChannel implements SeekableByteChannel {
2727

2828
private static final Logger LOGGER = LoggerFactory.getLogger(S3SeekableByteChannel.class);
2929

30+
private final Set<? extends OpenOption> options;
3031
private long position;
3132
private final S3Path path;
3233
private final ReadableByteChannel readDelegate;
@@ -44,11 +45,9 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
4445
position = startAt;
4546
path = s3Path;
4647
closed = false;
48+
this.options = options;
4749
s3Path.getFileSystem().registerOpenChannel(this);
4850

49-
if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) {
50-
throw new IOException("This channel does not support read and write access simultaneously");
51-
}
5251
if (options.contains(StandardOpenOption.SYNC) || options.contains(StandardOpenOption.DSYNC)) {
5352
throw new IOException("The SYNC/DSYNC options is not supported");
5453
}
@@ -86,6 +85,10 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
8685
public int read(ByteBuffer dst) throws IOException {
8786
validateOpen();
8887

88+
if (options.contains(StandardOpenOption.WRITE) && options.contains(StandardOpenOption.READ)) {
89+
return writeDelegate.read(dst);
90+
}
91+
8992
if (readDelegate == null) {
9093
throw new NonReadableChannelException();
9194
}
@@ -115,9 +118,6 @@ public int write(ByteBuffer src) throws IOException {
115118
throw new NonWritableChannelException();
116119
}
117120

118-
var length = src.remaining();
119-
this.position += length;
120-
121121
return writeDelegate.write(src);
122122
}
123123

@@ -132,6 +132,10 @@ public int write(ByteBuffer src) throws IOException {
132132
public long position() throws IOException {
133133
validateOpen();
134134

135+
if (writeDelegate != null) {
136+
return writeDelegate.position();
137+
}
138+
135139
synchronized (this) {
136140
return position;
137141
}
@@ -170,7 +174,12 @@ public SeekableByteChannel position(long newPosition) throws IOException {
170174
throw new ClosedChannelException();
171175
}
172176

173-
// this is only valid to read channels
177+
if (writeDelegate != null) {
178+
writeDelegate.position(newPosition);
179+
return this;
180+
}
181+
182+
// this is only valid to read-only channels
174183
if (readDelegate == null) {
175184
throw new NonReadableChannelException();
176185
}
@@ -191,6 +200,10 @@ public SeekableByteChannel position(long newPosition) throws IOException {
191200
public long size() throws IOException {
192201
validateOpen();
193202

203+
if (writeDelegate != null) {
204+
return writeDelegate.size();
205+
}
206+
194207
if (size < 0) {
195208
fetchSize();
196209
}

src/main/java/software/amazon/nio/spi/s3/S3WritableByteChannel.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.nio.ByteBuffer;
1010
import java.nio.channels.ClosedChannelException;
1111
import java.nio.channels.SeekableByteChannel;
12-
import java.nio.channels.WritableByteChannel;
1312
import java.nio.file.FileAlreadyExistsException;
1413
import java.nio.file.Files;
1514
import java.nio.file.NoSuchFileException;
@@ -24,7 +23,7 @@
2423
import org.checkerframework.checker.nullness.qual.NonNull;
2524
import software.amazon.awssdk.services.s3.S3AsyncClient;
2625

27-
class S3WritableByteChannel implements WritableByteChannel {
26+
class S3WritableByteChannel implements SeekableByteChannel {
2827
private final S3Path path;
2928
private final Path tempFile;
3029
private final SeekableByteChannel channel;
@@ -110,4 +109,30 @@ protected void force() throws IOException {
110109
}
111110
s3TransferUtil.uploadLocalFile(path, tempFile);
112111
}
112+
113+
@Override
114+
public long position() throws IOException {
115+
return channel.position();
116+
}
117+
118+
@Override
119+
public SeekableByteChannel position(long newPosition) throws IOException {
120+
channel.position(newPosition);
121+
return this;
122+
}
123+
124+
@Override
125+
public int read(ByteBuffer dst) throws IOException {
126+
return channel.read(dst);
127+
}
128+
129+
@Override
130+
public long size() throws IOException {
131+
return channel.size();
132+
}
133+
134+
@Override
135+
public SeekableByteChannel truncate(long size) throws IOException {
136+
throw new UnsupportedOperationException("Currently not supported");
137+
}
113138
}

src/test/java/software/amazon/nio/spi/s3/S3SeekableByteChannelTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ public void write() throws IOException {
106106
when(mockClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.supplyAsync(() ->
107107
PutObjectResponse.builder().build()));
108108
try(var channel = new S3SeekableByteChannel(path, mockClient, Set.<OpenOption>of(CREATE, WRITE))){
109-
channel.write(ByteBuffer.allocate(1));
109+
assertEquals(0L, channel.size());
110+
channel.write(ByteBuffer.allocate(12));
111+
assertEquals(12L, channel.size());
110112
}
111113
}
112114

src/test/java/software/amazon/nio/spi/s3/S3WritableByteChannelTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.junit.jupiter.params.provider.MethodSource;
1515

1616
import java.io.IOException;
17+
import java.nio.ByteBuffer;
1718
import java.nio.file.FileAlreadyExistsException;
1819
import java.nio.file.Files;
1920
import java.nio.file.NoSuchFileException;
@@ -25,6 +26,8 @@
2526

2627
import static java.nio.file.StandardOpenOption.CREATE;
2728
import static java.nio.file.StandardOpenOption.CREATE_NEW;
29+
import static java.nio.file.StandardOpenOption.READ;
30+
import static java.nio.file.StandardOpenOption.WRITE;
2831
import static java.util.Collections.emptySet;
2932
import static org.assertj.core.api.Assertions.assertThat;
3033
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,6 +69,31 @@ void whenFileDoesNotExistsAndNoCreateNewShouldThrowNoSuchFileException() throws
6669
.isInstanceOf(NoSuchFileException.class);
6770
}
6871

72+
@Test
73+
@DisplayName("S3WritableByteChannel is a SeekableByteChannel")
74+
void shouldBeSeekable() throws InterruptedException, TimeoutException, IOException {
75+
S3FileSystemProvider provider = mock();
76+
when(provider.exists(any(S3AsyncClient.class), any())).thenReturn(true);
77+
78+
S3FileSystem fs = mock();
79+
when(fs.provider()).thenReturn(provider);
80+
81+
var file = S3Path.getPath(fs, "somefile");
82+
var channel = new S3WritableByteChannel(file, mock(), mock(), Set.of(READ, WRITE));
83+
assertThat(channel.size()).isZero();
84+
assertThat(channel.position()).isZero();
85+
channel.write(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }));
86+
assertThat(channel.position()).isEqualTo(4);
87+
assertThat(channel.position(2)).isSameAs(channel);
88+
channel.write(ByteBuffer.wrap(new byte[] { 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }));
89+
assertThat(channel.size()).isEqualTo(12);
90+
assertThat(channel.position(3)).isSameAs(channel);
91+
ByteBuffer buffer = ByteBuffer.allocate(6);
92+
channel.read(buffer);
93+
assertThat(buffer.array()).contains(4, 5, 6, 7, 8, 9);
94+
assertThatThrownBy(() -> channel.truncate(6)).isInstanceOf(UnsupportedOperationException.class);
95+
}
96+
6997
@ParameterizedTest(name = "can be instantiated when file exists ({0}) and open options are {1}")
7098
@MethodSource("acceptedFileExistsAndOpenOptions")
7199
@DisplayName("S3WritableByteChannel")

0 commit comments

Comments
 (0)