Skip to content

Commit 1fb09ad

Browse files
committed
Add checksum when uploading file
1 parent cb17d45 commit 1fb09ad

16 files changed

+360
-19
lines changed

src/integrationTest/java/software/amazon/nio/spi/s3/FilesNewByteChannelTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package software.amazon.nio.spi.s3;
77

8+
import static com.github.stefanbirkner.systemlambda.SystemLambda.*;
89
import static org.assertj.core.api.Assertions.*;
910
import static software.amazon.nio.spi.s3.Containers.*;
1011

@@ -20,6 +21,8 @@
2021
import org.junit.jupiter.api.Test;
2122
import org.junit.jupiter.api.TestInstance;
2223

24+
import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;
25+
2326
@DisplayName("Files$newByteChannel* should read and write on S3")
2427
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2528
public class FilesNewByteChannelTest {
@@ -73,4 +76,34 @@ public void newByteChannel_READ_WRITE() throws IOException {
7376
assertThat(path).hasContent(text);
7477
}
7578

79+
@Test
80+
@DisplayName("newByteChannel with CRC32C integrity check")
81+
public void newByteChannel_withIntegrityCheck_CRC32C() throws Exception {
82+
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));
83+
84+
String text = "we test the integrity check when closing the byte channel";
85+
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC32C").execute(() -> {
86+
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
87+
channel.write(ByteBuffer.wrap(text.getBytes()));
88+
}
89+
});
90+
91+
assertThat(path).hasContent(text);
92+
}
93+
94+
@Test
95+
@DisplayName("newByteChannel with CRC64NVME integrity check")
96+
public void newByteChannel_withIntegrityCheck_CRC64NVME() throws Exception {
97+
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));
98+
99+
String text = "we test the integrity check when closing the byte channel";
100+
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC64NVME").execute(() -> {
101+
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
102+
channel.write(ByteBuffer.wrap(text.getBytes()));
103+
}
104+
});
105+
106+
assertThat(path).hasContent(text);
107+
}
108+
76109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import java.io.IOException;
9+
import java.io.UncheckedIOException;
10+
import java.nio.ByteBuffer;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import software.amazon.awssdk.crt.checksums.CRC32C;
14+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
15+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
16+
import software.amazon.awssdk.utils.internal.Base16;
17+
18+
class Crc32cFileIntegrityCheck implements S3ObjectIntegrityCheck {
19+
private final byte[] buffer = new byte[16 * 1024];
20+
private final CRC32C checksum = new CRC32C();
21+
private final ByteBuffer checksumBuffer = ByteBuffer.allocate(Integer.BYTES);
22+
23+
@Override
24+
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
25+
checksum.reset();
26+
checksumBuffer.clear();
27+
try (var in = Files.newInputStream(file)) {
28+
int len;
29+
while ((len = in.read(buffer)) != -1) {
30+
checksum.update(buffer, 0, len);
31+
}
32+
checksumBuffer.putInt((int) checksum.getValue());
33+
builder.checksumAlgorithm(ChecksumAlgorithm.CRC32_C);
34+
builder.checksumCRC32C(Base16.encodeAsString(checksumBuffer.array()));
35+
} catch (IOException cause) {
36+
throw new UncheckedIOException(cause);
37+
}
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import java.io.IOException;
9+
import java.io.UncheckedIOException;
10+
import java.nio.ByteBuffer;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import software.amazon.awssdk.crt.checksums.CRC64NVME;
14+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
15+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
16+
import software.amazon.awssdk.utils.internal.Base16;
17+
18+
class Crc64nvmeFileIntegrityCheck implements S3ObjectIntegrityCheck {
19+
private final byte[] buffer = new byte[16 * 1024];
20+
private final CRC64NVME checksum = new CRC64NVME();
21+
private final ByteBuffer checksumBuffer = ByteBuffer.allocate(Long.BYTES);
22+
23+
@Override
24+
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
25+
checksum.reset();
26+
checksumBuffer.clear();
27+
try (var in = Files.newInputStream(file)) {
28+
int len;
29+
while ((len = in.read(buffer)) != -1) {
30+
checksum.update(buffer, 0, len);
31+
}
32+
checksumBuffer.putLong(checksum.getValue());
33+
builder.checksumAlgorithm(ChecksumAlgorithm.CRC64_NVME);
34+
builder.checksumCRC64NVME(Base16.encodeAsString(checksumBuffer.array()));
35+
} catch (IOException cause) {
36+
throw new UncheckedIOException(cause);
37+
}
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import java.nio.file.Path;
9+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
10+
11+
enum DisabledFileIntegrityCheck implements S3ObjectIntegrityCheck {
12+
INSTANCE;
13+
14+
@Override
15+
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
16+
// nothing to do
17+
}
18+
}

src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java

+16
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,22 @@ public void close() throws IOException {
120120
provider.closeFileSystem(this);
121121
}
122122

123+
/**
124+
* Returns the implementation for creating a checksum to check the integrity of an object uploaded to S3.
125+
*
126+
* @return integrity check implementation
127+
*/
128+
S3ObjectIntegrityCheck integrityCheck() {
129+
var algorithm = configuration.getIntegrityCheckAlgorithm();
130+
if (algorithm.equalsIgnoreCase("CRC32C")) {
131+
return new Crc32cFileIntegrityCheck();
132+
}
133+
if (algorithm.equalsIgnoreCase("CRC64NVME")) {
134+
return new Crc64nvmeFileIntegrityCheck();
135+
}
136+
return DisabledFileIntegrityCheck.INSTANCE;
137+
}
138+
123139
/**
124140
* Tells whether this file system is open.
125141
*

src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public SeekableByteChannel newByteChannel(
328328

329329
final var s3Path = checkPath(path);
330330
final var fs = s3Path.getFileSystem();
331-
final var channel = new S3SeekableByteChannel(s3Path, fs.client(), options);
331+
final var channel = new S3SeekableByteChannel(s3Path, fs.client(), options, fs.integrityCheck());
332332

333333
fs.registerOpenChannel(channel);
334334

@@ -837,7 +837,7 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
837837

838838
S3Path p = (S3Path) path;
839839
S3FileSystem fs = p.getFileSystem();
840-
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options);
840+
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options, fs.integrityCheck());
841841
return new S3FileChannel(s3SeekableByteChannel);
842842
}
843843

@@ -864,7 +864,7 @@ public AsynchronousFileChannel newAsynchronousFileChannel(Path path,
864864
FileAttribute<?>... attrs) throws IOException {
865865
S3FileSystem fs = (S3FileSystem) getFileSystem(path.toUri());
866866
S3AsyncClient s3Client = fs.client();
867-
var byteChannel = new S3SeekableByteChannel((S3Path) path, s3Client, options);
867+
var byteChannel = new S3SeekableByteChannel((S3Path) path, s3Client, options, fs.integrityCheck());
868868
return new AsyncS3FileChannel(byteChannel);
869869
}
870870

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.nio.spi.s3;
7+
8+
import java.nio.file.Path;
9+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
10+
11+
/**
12+
* Defines how to create a checksum to check the integrity of an object uploaded to S3.
13+
*/
14+
public interface S3ObjectIntegrityCheck {
15+
16+
/**
17+
* Calculates the checksum for the specified file and adds it as a header to the PUT object request to be created.
18+
*
19+
* @param file
20+
* the file to be used for creating the checksum
21+
* @param builder
22+
* put object request
23+
*/
24+
void addChecksumToRequest(Path file, PutObjectRequest.Builder builder);
25+
26+
}

src/main/java/software/amazon/nio/spi/s3/S3SeekableByteChannel.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,24 @@ class S3SeekableByteChannel implements SeekableByteChannel {
3636
private boolean closed;
3737
private long size = -1L;
3838

39-
S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, Set<? extends OpenOption> options) throws IOException {
40-
this(s3Path, s3Client, 0L, options, null, null);
39+
S3SeekableByteChannel(
40+
S3Path s3Path,
41+
S3AsyncClient s3Client,
42+
Set<? extends OpenOption> options,
43+
S3ObjectIntegrityCheck integrityCheck)
44+
throws IOException {
45+
this(s3Path, s3Client, 0L, options, null, null, integrityCheck);
4146
}
4247

43-
private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startAt, Set<? extends OpenOption> options,
44-
Long timeout, TimeUnit timeUnit) throws IOException {
48+
private S3SeekableByteChannel(
49+
S3Path s3Path,
50+
S3AsyncClient s3Client,
51+
long startAt,
52+
Set<? extends OpenOption> options,
53+
Long timeout,
54+
TimeUnit timeUnit,
55+
S3ObjectIntegrityCheck integrityCheck)
56+
throws IOException {
4557
position = startAt;
4658
path = s3Path;
4759
closed = false;
@@ -56,7 +68,7 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
5668
if (options.contains(StandardOpenOption.WRITE)) {
5769
LOGGER.debug("using S3WritableByteChannel as write delegate for path '{}'", s3Path.toUri());
5870
readDelegate = null;
59-
var transferUtil = new S3TransferUtil(s3Client, timeout, timeUnit);
71+
var transferUtil = new S3TransferUtil(s3Client, timeout, timeUnit, integrityCheck);
6072
writeDelegate = new S3WritableByteChannel(s3Path, s3Client, transferUtil, options);
6173
position = 0L;
6274
} else if (options.contains(StandardOpenOption.READ) || options.isEmpty()) {

src/main/java/software/amazon/nio/spi/s3/S3TransferUtil.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
2020

2121
final class S3TransferUtil {
22+
private final S3ObjectIntegrityCheck integrityCheck;
2223
private final S3AsyncClient client;
2324
private final Long timeout;
2425
private final TimeUnit timeUnit;
2526

26-
S3TransferUtil(S3AsyncClient client, Long timeout, TimeUnit timeUnit) {
27+
S3TransferUtil(S3AsyncClient client, Long timeout, TimeUnit timeUnit, S3ObjectIntegrityCheck integrityCheck) {
2728
this.client = client;
2829
this.timeout = timeout;
2930
this.timeUnit = timeUnit;
31+
this.integrityCheck = integrityCheck;
3032
}
3133

3234
void downloadToLocalFile(S3Path path, Path destination) throws InterruptedException, ExecutionException, TimeoutException {
@@ -51,13 +53,14 @@ void downloadToLocalFile(S3Path path, Path destination) throws InterruptedExcept
5153

5254
void uploadLocalFile(S3Path path, Path localFile) throws IOException {
5355
try (var s3TransferManager = S3TransferManager.builder().s3Client(client).build()) {
56+
var putObjectRequest = PutObjectRequest.builder()
57+
.bucket(path.bucketName())
58+
.key(path.getKey())
59+
.contentType(Files.probeContentType(localFile));
60+
integrityCheck.addChecksumToRequest(localFile, putObjectRequest);
5461
var uploadCompletableFuture = s3TransferManager.uploadFile(
5562
UploadFileRequest.builder()
56-
.putObjectRequest(PutObjectRequest.builder()
57-
.bucket(path.bucketName())
58-
.key(path.getKey())
59-
.contentType(Files.probeContentType(localFile))
60-
.build())
63+
.putObjectRequest(putObjectRequest.build())
6164
.source(localFile)
6265
.build()
6366
).completionFuture();

src/main/java/software/amazon/nio/spi/s3/config/S3NioSpiConfiguration.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,17 @@ public class S3NioSpiConfiguration extends HashMap<String, Object> {
9595
*/
9696
public static final Long S3_SPI_TIMEOUT_HIGH_DEFAULT = TimeOutUtils.TIMEOUT_TIME_LENGTH_5;
9797
/**
98-
* The default value of the credentials property
98+
* The name of the credentials property
9999
*/
100100
public static final String S3_SPI_CREDENTIALS_PROPERTY = "s3.spi.credentials";
101+
/**
102+
* The name of the S3 object integrity check property
103+
*/
104+
public static final String S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY = "s3.integrity-check-algorithm";
105+
/**
106+
* The default value of the S3 object integrity check property
107+
*/
108+
public static final String S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT = "disabled";
101109

102110
private static final Pattern ENDPOINT_REGEXP = Pattern.compile("(\\w[\\w\\-\\.]*)?(:(\\d+))?");
103111

@@ -131,6 +139,7 @@ public S3NioSpiConfiguration(Map<String, ?> overrides) {
131139
put(S3_SPI_TIMEOUT_LOW_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_LOW_DEFAULT));
132140
put(S3_SPI_TIMEOUT_MEDIUM_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_MEDIUM_DEFAULT));
133141
put(S3_SPI_TIMEOUT_HIGH_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_HIGH_DEFAULT));
142+
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
134143

135144
//
136145
// With the below we pick existing environment variables and system
@@ -382,6 +391,22 @@ public S3NioSpiConfiguration withTimeoutHigh(Long timeoutHigh) {
382391
return this;
383392
}
384393

394+
/**
395+
* Get the value of the Integrity Check Algorithm
396+
*
397+
* @param algorithm the new value; can be null
398+
* @return this instance
399+
*/
400+
public S3NioSpiConfiguration withIntegrityCheckAlgorithm(String algorithm) {
401+
if (algorithm == null) {
402+
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
403+
} else {
404+
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, algorithm);
405+
}
406+
407+
return this;
408+
}
409+
385410
/**
386411
* Get the value of the Maximum Fragment Size
387412
*
@@ -511,6 +536,15 @@ public Long getTimeoutHigh() {
511536
String.valueOf(S3_SPI_TIMEOUT_HIGH_DEFAULT)));
512537
}
513538

539+
/**
540+
* Get the value of the Integrity Check Algorithm
541+
*
542+
* @return the configured value or the default if not overridden
543+
*/
544+
public String getIntegrityCheckAlgorithm() {
545+
return (String) getOrDefault(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
546+
}
547+
514548
/**
515549
* Generates an environment variable name from a property name. E.g 'some.property' becomes 'SOME_PROPERTY'
516550
*

0 commit comments

Comments
 (0)