forked from awslabs/aws-java-nio-spi-for-s3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathS3WritableByteChannel.java
138 lines (117 loc) · 4.36 KB
/
S3WritableByteChannel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.nio.spi.s3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.awssdk.services.s3.S3AsyncClient;
class S3WritableByteChannel implements SeekableByteChannel {
private final S3Path path;
private final Path tempFile;
private final SeekableByteChannel channel;
private final S3TransferUtil s3TransferUtil;
private boolean open;
S3WritableByteChannel(
S3Path path,
S3AsyncClient client,
S3TransferUtil s3TransferUtil,
Set<? extends OpenOption> options
) throws IOException {
Objects.requireNonNull(path);
Objects.requireNonNull(client);
this.s3TransferUtil = s3TransferUtil;
this.path = path;
try {
var fileSystemProvider = (S3FileSystemProvider) path.getFileSystem().provider();
var exists = fileSystemProvider.exists(client, path);
if (exists && options.contains(StandardOpenOption.CREATE_NEW)) {
throw new FileAlreadyExistsException("File at path:" + path + " already exists");
}
if (!exists && !options.contains(StandardOpenOption.CREATE_NEW) && !options.contains(StandardOpenOption.CREATE)) {
throw new NoSuchFileException("File at path:" + path + " does not exist yet");
}
tempFile = Files.createTempFile("aws-s3-nio-", ".tmp");
if (exists) {
s3TransferUtil.downloadToLocalFile(path, tempFile);
}
channel = Files.newByteChannel(this.tempFile, removeCreateNew(options));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Could not open the path:" + path, e);
} catch (TimeoutException | ExecutionException e) {
throw new IOException("Could not open the path:" + path, e);
}
this.open = true;
}
private @NonNull Set<? extends OpenOption> removeCreateNew(Set<? extends OpenOption> options) {
var auxOptions = new HashSet<>(options);
auxOptions.remove(StandardOpenOption.CREATE_NEW);
return Set.copyOf(auxOptions);
}
@Override
public int write(ByteBuffer src) throws IOException {
return channel.write(src);
}
@Override
public boolean isOpen() {
return open;
}
@Override
public void close() throws IOException {
channel.close();
if (!open) {
// channel has already been closed -> close() should have no effect
return;
}
s3TransferUtil.uploadLocalFile(path, tempFile);
Files.deleteIfExists(tempFile);
open = false;
}
/**
* Cause the local tmp data to be written to S3 without closing the channel and without deleting the tmp file.
* @throws IOException if an error occurs during the upload
* @throws ClosedChannelException if the channel is closed
*/
protected void force() throws IOException {
if (!open) {
throw new ClosedChannelException();
}
s3TransferUtil.uploadLocalFile(path, tempFile);
}
@Override
public long position() throws IOException {
return channel.position();
}
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
channel.position(newPosition);
return this;
}
@Override
public int read(ByteBuffer dst) throws IOException {
return channel.read(dst);
}
@Override
public long size() throws IOException {
return channel.size();
}
@Override
public SeekableByteChannel truncate(long size) throws IOException {
throw new UnsupportedOperationException("Currently not supported");
}
}