Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full object checksum when uploading a locally cached file to S3 #636

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package software.amazon.nio.spi.s3;

import static com.github.stefanbirkner.systemlambda.SystemLambda.*;
import static org.assertj.core.api.Assertions.*;
import static software.amazon.nio.spi.s3.Containers.*;

Expand All @@ -20,6 +21,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;

@DisplayName("Files$newByteChannel* should read and write on S3")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FilesNewByteChannelTest {
Expand Down Expand Up @@ -73,4 +76,34 @@ public void newByteChannel_READ_WRITE() throws IOException {
assertThat(path).hasContent(text);
}

@Test
@DisplayName("newByteChannel with CRC32C integrity check")
public void newByteChannel_withIntegrityCheck_CRC32C() throws Exception {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));

String text = "we test the integrity check when closing the byte channel";
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC32C").execute(() -> {
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}
});

assertThat(path).hasContent(text);
}

@Test
@DisplayName("newByteChannel with CRC64NVME integrity check")
public void newByteChannel_withIntegrityCheck_CRC64NVME() throws Exception {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));

String text = "we test the integrity check when closing the byte channel";
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC64NVME").execute(() -> {
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}
});

assertThat(path).hasContent(text);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import software.amazon.awssdk.crt.checksums.CRC32C;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.internal.Base16;

class Crc32cFileIntegrityCheck implements S3ObjectIntegrityCheck {
private final byte[] buffer = new byte[16 * 1024];
private final CRC32C checksum = new CRC32C();
private final ByteBuffer checksumBuffer = ByteBuffer.allocate(Integer.BYTES);

@Override
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
checksum.reset();
checksumBuffer.clear();
try (var in = Files.newInputStream(file)) {
int len;
while ((len = in.read(buffer)) != -1) {
checksum.update(buffer, 0, len);
}
checksumBuffer.putInt((int) checksum.getValue());
builder.checksumAlgorithm(ChecksumAlgorithm.CRC32_C);
builder.checksumCRC32C(Base16.encodeAsString(checksumBuffer.array()));
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import software.amazon.awssdk.crt.checksums.CRC64NVME;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.internal.Base16;

class Crc64nvmeFileIntegrityCheck implements S3ObjectIntegrityCheck {
private final byte[] buffer = new byte[16 * 1024];
private final CRC64NVME checksum = new CRC64NVME();
private final ByteBuffer checksumBuffer = ByteBuffer.allocate(Long.BYTES);

@Override
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
checksum.reset();
checksumBuffer.clear();
try (var in = Files.newInputStream(file)) {
int len;
while ((len = in.read(buffer)) != -1) {
checksum.update(buffer, 0, len);
}
checksumBuffer.putLong(checksum.getValue());
builder.checksumAlgorithm(ChecksumAlgorithm.CRC64_NVME);
builder.checksumCRC64NVME(Base16.encodeAsString(checksumBuffer.array()));
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.nio.file.Path;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

enum DisabledFileIntegrityCheck implements S3ObjectIntegrityCheck {
INSTANCE;

@Override
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
// nothing to do
}
}
16 changes: 16 additions & 0 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,22 @@ public void close() throws IOException {
provider.closeFileSystem(this);
}

/**
* Returns the implementation for creating a checksum to check the integrity of an object uploaded to S3.
*
* @return integrity check implementation
*/
S3ObjectIntegrityCheck integrityCheck() {
var algorithm = configuration.getIntegrityCheckAlgorithm();
if (algorithm.equalsIgnoreCase("CRC32C")) {
return new Crc32cFileIntegrityCheck();
}
if (algorithm.equalsIgnoreCase("CRC64NVME")) {
return new Crc64nvmeFileIntegrityCheck();
}
return DisabledFileIntegrityCheck.INSTANCE;
}

/**
* Tells whether this file system is open.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public SeekableByteChannel newByteChannel(

final var s3Path = checkPath(path);
final var fs = s3Path.getFileSystem();
final var channel = new S3SeekableByteChannel(s3Path, fs.client(), options);
final var channel = new S3SeekableByteChannel(s3Path, fs.client(), options, fs.integrityCheck());

fs.registerOpenChannel(channel);

Expand Down Expand Up @@ -837,7 +837,7 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,

S3Path p = (S3Path) path;
S3FileSystem fs = p.getFileSystem();
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options);
S3SeekableByteChannel s3SeekableByteChannel = new S3SeekableByteChannel(p, fs.client(), options, fs.integrityCheck());
return new S3FileChannel(s3SeekableByteChannel);
}

Expand All @@ -864,7 +864,7 @@ public AsynchronousFileChannel newAsynchronousFileChannel(Path path,
FileAttribute<?>... attrs) throws IOException {
S3FileSystem fs = (S3FileSystem) getFileSystem(path.toUri());
S3AsyncClient s3Client = fs.client();
var byteChannel = new S3SeekableByteChannel((S3Path) path, s3Client, options);
var byteChannel = new S3SeekableByteChannel((S3Path) path, s3Client, options, fs.integrityCheck());
return new AsyncS3FileChannel(byteChannel);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.nio.file.Path;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* Defines how to create a checksum to check the integrity of an object uploaded to S3.
*/
public interface S3ObjectIntegrityCheck {

/**
* Calculates the checksum for the specified file and adds it as a header to the PUT object request to be created.
*
* @param file
* the file to be used for creating the checksum
* @param builder
* put object request
*/
void addChecksumToRequest(Path file, PutObjectRequest.Builder builder);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,24 @@ class S3SeekableByteChannel implements SeekableByteChannel {
private boolean closed;
private long size = -1L;

S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, Set<? extends OpenOption> options) throws IOException {
this(s3Path, s3Client, 0L, options, null, null);
S3SeekableByteChannel(
S3Path s3Path,
S3AsyncClient s3Client,
Set<? extends OpenOption> options,
S3ObjectIntegrityCheck integrityCheck)
throws IOException {
this(s3Path, s3Client, 0L, options, null, null, integrityCheck);
}

private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startAt, Set<? extends OpenOption> options,
Long timeout, TimeUnit timeUnit) throws IOException {
private S3SeekableByteChannel(
S3Path s3Path,
S3AsyncClient s3Client,
long startAt,
Set<? extends OpenOption> options,
Long timeout,
TimeUnit timeUnit,
S3ObjectIntegrityCheck integrityCheck)
throws IOException {
position = startAt;
path = s3Path;
closed = false;
Expand All @@ -56,7 +68,7 @@ private S3SeekableByteChannel(S3Path s3Path, S3AsyncClient s3Client, long startA
if (options.contains(StandardOpenOption.WRITE)) {
LOGGER.debug("using S3WritableByteChannel as write delegate for path '{}'", s3Path.toUri());
readDelegate = null;
var transferUtil = new S3TransferUtil(s3Client, timeout, timeUnit);
var transferUtil = new S3TransferUtil(s3Client, timeout, timeUnit, integrityCheck);
writeDelegate = new S3WritableByteChannel(s3Path, s3Client, transferUtil, options);
position = 0L;
} else if (options.contains(StandardOpenOption.READ) || options.isEmpty()) {
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/software/amazon/nio/spi/s3/S3TransferUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

final class S3TransferUtil {
private final S3ObjectIntegrityCheck integrityCheck;
private final S3AsyncClient client;
private final Long timeout;
private final TimeUnit timeUnit;

S3TransferUtil(S3AsyncClient client, Long timeout, TimeUnit timeUnit) {
S3TransferUtil(S3AsyncClient client, Long timeout, TimeUnit timeUnit, S3ObjectIntegrityCheck integrityCheck) {
this.client = client;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.integrityCheck = integrityCheck;
}

void downloadToLocalFile(S3Path path, Path destination) throws InterruptedException, ExecutionException, TimeoutException {
Expand All @@ -51,13 +53,14 @@ void downloadToLocalFile(S3Path path, Path destination) throws InterruptedExcept

void uploadLocalFile(S3Path path, Path localFile) throws IOException {
try (var s3TransferManager = S3TransferManager.builder().s3Client(client).build()) {
var putObjectRequest = PutObjectRequest.builder()
.bucket(path.bucketName())
.key(path.getKey())
.contentType(Files.probeContentType(localFile));
integrityCheck.addChecksumToRequest(localFile, putObjectRequest);
var uploadCompletableFuture = s3TransferManager.uploadFile(
UploadFileRequest.builder()
.putObjectRequest(PutObjectRequest.builder()
.bucket(path.bucketName())
.key(path.getKey())
.contentType(Files.probeContentType(localFile))
.build())
.putObjectRequest(putObjectRequest.build())
.source(localFile)
.build()
).completionFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,17 @@ public class S3NioSpiConfiguration extends HashMap<String, Object> {
*/
public static final Long S3_SPI_TIMEOUT_HIGH_DEFAULT = TimeOutUtils.TIMEOUT_TIME_LENGTH_5;
/**
* The default value of the credentials property
* The name of the credentials property
*/
public static final String S3_SPI_CREDENTIALS_PROPERTY = "s3.spi.credentials";
/**
* The name of the S3 object integrity check property
*/
public static final String S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY = "s3.integrity-check-algorithm";
/**
* The default value of the S3 object integrity check property
*/
public static final String S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT = "disabled";

private static final Pattern ENDPOINT_REGEXP = Pattern.compile("(\\w[\\w\\-\\.]*)?(:(\\d+))?");

Expand Down Expand Up @@ -131,6 +139,7 @@ public S3NioSpiConfiguration(Map<String, ?> overrides) {
put(S3_SPI_TIMEOUT_LOW_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_LOW_DEFAULT));
put(S3_SPI_TIMEOUT_MEDIUM_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_MEDIUM_DEFAULT));
put(S3_SPI_TIMEOUT_HIGH_PROPERTY, String.valueOf(S3_SPI_TIMEOUT_HIGH_DEFAULT));
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);

//
// With the below we pick existing environment variables and system
Expand Down Expand Up @@ -382,6 +391,22 @@ public S3NioSpiConfiguration withTimeoutHigh(Long timeoutHigh) {
return this;
}

/**
* Get the value of the Integrity Check Algorithm
*
* @param algorithm the new value; can be null
* @return this instance
*/
public S3NioSpiConfiguration withIntegrityCheckAlgorithm(String algorithm) {
if (algorithm == null) {
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
} else {
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, algorithm);
}

return this;
}

/**
* Get the value of the Maximum Fragment Size
*
Expand Down Expand Up @@ -511,6 +536,15 @@ public Long getTimeoutHigh() {
String.valueOf(S3_SPI_TIMEOUT_HIGH_DEFAULT)));
}

/**
* Get the value of the Integrity Check Algorithm
*
* @return the configured value or the default if not overridden
*/
public String getIntegrityCheckAlgorithm() {
return (String) getOrDefault(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
}

/**
* Generates an environment variable name from a property name. E.g 'some.property' becomes 'SOME_PROPERTY'
*
Expand Down
Loading