Skip to content

Commit d839b8a

Browse files
feature: implements newFileChannel and newAsynchronousFileChannel (#488)
* enables creation of FileChannels and AsynchronousFileChannels and provides demos.
1 parent 7123685 commit d839b8a

File tree

7 files changed

+1092
-0
lines changed

7 files changed

+1092
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.examples;
7+
8+
import java.net.URI;
9+
import java.nio.ByteBuffer;
10+
import java.nio.channels.AsynchronousFileChannel;
11+
import java.nio.file.Paths;
12+
import java.nio.file.StandardOpenOption;
13+
import java.util.concurrent.TimeUnit;
14+
15+
public class AsyncFileChannelDemo {
16+
public static void main(String[] args) {
17+
var path = Paths.get(URI.create(args[0]));
18+
try (var channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)) {
19+
var buffer = ByteBuffer.allocate(1024);
20+
long position = 0;
21+
int bytesRead;
22+
while ((bytesRead = channel.read(buffer, position).get(10, TimeUnit.SECONDS)) != -1) {
23+
position += bytesRead;
24+
buffer.flip();
25+
while (buffer.hasRemaining()) {
26+
System.out.print((char) buffer.get());
27+
}
28+
buffer.clear();
29+
}
30+
} catch (Exception e) {
31+
throw new RuntimeException(e);
32+
}
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.examples;
7+
8+
import java.io.IOException;
9+
import java.net.URI;
10+
import java.nio.ByteBuffer;
11+
import java.nio.channels.FileChannel;
12+
import java.nio.file.Paths;
13+
import java.nio.file.StandardOpenOption;
14+
15+
public class FileChannelDemo {
16+
17+
public static void main(String[] args) {
18+
var path = Paths.get(URI.create(args[0]));
19+
try (var channel = FileChannel.open(path, StandardOpenOption.READ)) {
20+
var buffer = ByteBuffer.allocate(1024);
21+
while (channel.read(buffer) != -1) {
22+
buffer.flip();
23+
while (buffer.hasRemaining()) {
24+
System.out.print((char) buffer.get());
25+
}
26+
buffer.clear();
27+
}
28+
} catch (IOException e) {
29+
e.printStackTrace();
30+
System.exit(1);
31+
}
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import java.io.IOException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.channels.AsynchronousFileChannel;
11+
import java.nio.channels.CompletionHandler;
12+
import java.nio.channels.FileLock;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.TimeUnit;
16+
import software.amazon.nio.spi.s3.util.TimeOutUtils;
17+
18+
public class AsyncS3FileChannel extends AsynchronousFileChannel {
19+
20+
private final S3SeekableByteChannel byteChannel;
21+
22+
AsyncS3FileChannel(S3SeekableByteChannel byteChannel) {
23+
this.byteChannel = byteChannel;
24+
}
25+
26+
@Override
27+
public long size() throws IOException {
28+
return byteChannel.size();
29+
}
30+
31+
@Override
32+
public AsynchronousFileChannel truncate(long size) throws IOException {
33+
byteChannel.truncate(size);
34+
return this;
35+
}
36+
37+
@Override
38+
public void force(boolean metaData) throws IOException {
39+
if (byteChannel.getWriteDelegate() != null) {
40+
((S3WritableByteChannel) byteChannel.getWriteDelegate()).force();
41+
}
42+
}
43+
44+
@Override
45+
public <A> void lock(long position, long size, boolean shared, A attachment, CompletionHandler<FileLock, ? super A> handler) {
46+
throw new UnsupportedOperationException("S3 does not support file locking");
47+
}
48+
49+
@Override
50+
public Future<FileLock> lock(long position, long size, boolean shared) {
51+
return CompletableFuture.failedFuture(new UnsupportedOperationException("S3 does not support file locking"));
52+
}
53+
54+
@Override
55+
public FileLock tryLock(long position, long size, boolean shared) throws IOException {
56+
throw new IOException(new UnsupportedOperationException("S3 does not support file locking"));
57+
}
58+
59+
@Override
60+
public <A> void read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer, ? super A> handler) {
61+
Future<Integer> future = read(dst, position);
62+
try {
63+
handler.completed(future.get(TimeOutUtils.TIMEOUT_TIME_LENGTH_5, TimeUnit.MINUTES), attachment);
64+
} catch (Exception e) {
65+
handler.failed(e, attachment);
66+
}
67+
}
68+
69+
@Override
70+
public Future<Integer> read(ByteBuffer dst, long position) {
71+
if (position < 0) {
72+
throw new IllegalArgumentException("position: " + position);
73+
}
74+
75+
return CompletableFuture.supplyAsync(() -> {
76+
try {
77+
byteChannel.position(position);
78+
return byteChannel.read(dst);
79+
} catch (IOException e) {
80+
throw new RuntimeException(e);
81+
}
82+
});
83+
}
84+
85+
@Override
86+
public <A> void write(ByteBuffer src, long position, A attachment, CompletionHandler<Integer, ? super A> handler) {
87+
Future<Integer> future = write(src, position);
88+
try {
89+
handler.completed(future.get(TimeOutUtils.TIMEOUT_TIME_LENGTH_5, TimeUnit.MINUTES), attachment);
90+
} catch (Exception e) {
91+
handler.failed(e, attachment);
92+
}
93+
}
94+
95+
@Override
96+
public Future<Integer> write(ByteBuffer src, long position) {
97+
if (position < 0) {
98+
throw new IllegalArgumentException("position: " + position);
99+
}
100+
101+
return CompletableFuture.supplyAsync(() -> {
102+
try {
103+
byteChannel.position(position);
104+
return byteChannel.write(src);
105+
} catch (IOException e) {
106+
throw new RuntimeException(e);
107+
}
108+
});
109+
}
110+
111+
@Override
112+
public boolean isOpen() {
113+
return byteChannel.isOpen();
114+
}
115+
116+
@Override
117+
public void close() throws IOException {
118+
byteChannel.close();
119+
}
120+
}

0 commit comments

Comments
 (0)