Skip to content
This repository was archived by the owner on Sep 11, 2024. It is now read-only.

Commit 420c2fa

Browse files
committed
feat: Add support for setting object metadata Content-Encoding
Users willing to leverage GCS capability to decompress gzip objects on server-side when accessing them through the Storage API requested the fixed-metadata `Content-Encoding` (default: null) to become configurable so that its value can be set (ie. to `gzip`) when the connector uploads a new file to the bucket. https://cloud.google.com/storage/docs/metadata#content-encoding
1 parent 01cf0b9 commit 420c2fa

File tree

5 files changed

+129
-3
lines changed

5 files changed

+129
-3
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...}
589589
gcs.credentials.default=true
590590
##
591591

592+
# The value of object metadata Content-Encoding.
593+
# This can be used for leveraging storage-side de-compression before download.
594+
# Optional, the default is null.
595+
gcs.object.content.encoding=gzip
592596

593597
# The set of the fields that are to be output, comma separated.
594598
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.

src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig {
5757
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
5858
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
5959
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
60+
public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding";
6061
public static final String GCS_USER_AGENT = "gcs.user.agent";
6162
private static final String GROUP_FILE = "File";
6263
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";
6364
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
6465
public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";
66+
6567
public static final String FILE_MAX_RECORDS = "file.max.records";
6668
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
6769
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
@@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
135137
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
136138
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);
137139

140+
configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null,
141+
new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW,
142+
"The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++,
143+
ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG);
144+
138145
configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
139146
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
140147
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
@@ -332,7 +339,7 @@ private void validate() {
332339
.filter(Objects::nonNull)
333340
.count();
334341

335-
// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
342+
// only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour.
336343
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
337344
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
338345
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
@@ -371,6 +378,10 @@ public String getBucketName() {
371378
return getString(GCS_BUCKET_NAME_CONFIG);
372379
}
373380

381+
public String getObjectContentEncoding() {
382+
return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG);
383+
}
384+
374385
@Override
375386
public CompressionType getCompressionType() {
376387
return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG));

src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
118118
}
119119

120120
private void flushFile(final String filename, final List<SinkRecord> records) {
121-
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build();
121+
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename)
122+
.setContentEncoding(config.getObjectContentEncoding())
123+
.build();
122124
try (var out = Channels.newOutputStream(storage.writer(blob));
123125
var writer = OutputWriter.builder()
124126
.withExternalProperties(config.originalsStrings())

src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.junit.jupiter.api.BeforeEach;
6464
import org.junit.jupiter.api.Test;
6565
import org.junit.jupiter.params.ParameterizedTest;
66+
import org.junit.jupiter.params.provider.CsvSource;
6667
import org.junit.jupiter.params.provider.ValueSource;
6768
import org.mockito.ArgumentCaptor;
6869
import org.threeten.bp.Duration;
@@ -252,6 +253,45 @@ void compression(final String compression) {
252253
readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0));
253254
}
254255

256+
@ParameterizedTest
257+
@CsvSource({ "none,none", "gzip,none", "gzip,none", "gzip,gzip" })
258+
void contentEncodingAwareDownload(final String compression, final String encoding) {
259+
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
260+
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, encoding);
261+
final GcsSinkTask task = new GcsSinkTask(properties, storage);
262+
263+
task.put(basicRecords);
264+
task.flush(null);
265+
266+
final CompressionType compressionType = CompressionType.forName(compression);
267+
268+
final List<String> names = Lists.newArrayList("topic0-0-10", "topic0-1-20", "topic0-2-50", "topic1-0-30",
269+
"topic1-1-40");
270+
final List<String> blobNames = names.stream()
271+
.map(n -> n + compressionType.extension())
272+
.collect(Collectors.toList());
273+
274+
assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
275+
// given a blob with metadata Content-Encoding equal to its byte compression,
276+
// the result of its GS-downloaded bytes is automatically un-compressed (gzip support only)
277+
// see https://cloud.google.com/storage/docs/metadata#content-encoding
278+
assertIterableEquals(
279+
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
280+
readDecodedFieldsFromDownload("topic0-0-10" + compressionType.extension(), compression, 0));
281+
assertIterableEquals(
282+
Lists.newArrayList(Collections.singletonList("value1"), Collections.singletonList("value6")),
283+
readDecodedFieldsFromDownload("topic0-1-20" + compressionType.extension(), compression, 0));
284+
assertIterableEquals(
285+
Lists.newArrayList(Collections.singletonList("value4"), Collections.singletonList("value9")),
286+
readDecodedFieldsFromDownload("topic0-2-50" + compressionType.extension(), compression, 0));
287+
assertIterableEquals(
288+
Lists.newArrayList(Collections.singletonList("value2"), Collections.singletonList("value7")),
289+
readDecodedFieldsFromDownload("topic1-0-30" + compressionType.extension(), compression, 0));
290+
assertIterableEquals(
291+
Lists.newArrayList(Collections.singletonList("value3"), Collections.singletonList("value8")),
292+
readDecodedFieldsFromDownload("topic1-1-40" + compressionType.extension(), compression, 0));
293+
}
294+
255295
@ParameterizedTest
256296
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
257297
void allFields(final String compression) {
@@ -745,6 +785,11 @@ private Collection<List<String>> readSplittedAndDecodedLinesFromBlob(final Strin
745785
return testBucketAccessor.readAndDecodeLines(blobName, compression, fieldsToDecode);
746786
}
747787

788+
private Collection<List<String>> readDecodedFieldsFromDownload(final String blobName, final String compression,
789+
final int... fieldsToDecode) {
790+
return testBucketAccessor.downloadBlobAndDecodeFields(blobName, compression, fieldsToDecode);
791+
}
792+
748793
private Map<String, Collection<List<String>>> buildBlobNameValuesMap(final String compression) {
749794
final CompressionType compressionType = CompressionType.forName(compression);
750795
final String extension = compressionType.extension();

src/test/java/io/aiven/kafka/connect/gcs/testutils/BucketAccessor.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
import java.io.BufferedReader;
2020
import java.io.ByteArrayInputStream;
21+
import java.io.File;
2122
import java.io.IOException;
2223
import java.io.InputStream;
2324
import java.io.InputStreamReader;
2425
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Files;
27+
import java.nio.file.Path;
28+
import java.nio.file.Paths;
2529
import java.util.Arrays;
2630
import java.util.Base64;
2731
import java.util.HashMap;
@@ -41,6 +45,7 @@
4145

4246
import com.github.luben.zstd.ZstdInputStream;
4347
import com.google.cloud.storage.Blob;
48+
import com.google.cloud.storage.BlobId;
4449
import com.google.cloud.storage.BlobInfo;
4550
import com.google.cloud.storage.Storage;
4651
import org.xerial.snappy.SnappyInputStream;
@@ -53,6 +58,7 @@ public final class BucketAccessor {
5358
private List<String> blobNamesCache;
5459
private final Map<String, String> stringContentCache = new HashMap<>();
5560
private final Map<String, List<String>> linesCache = new HashMap<>();
61+
private final Map<String, List<String>> downloadedLinesCache = new HashMap<>();
5662
private final Map<String, List<List<String>>> decodedLinesCache = new HashMap<>();
5763

5864
public BucketAccessor(final Storage storage, final String bucketName, final boolean cache) {
@@ -121,6 +127,7 @@ public void clear(final String prefix) {
121127
stringContentCache.clear();
122128
linesCache.clear();
123129
decodedLinesCache.clear();
130+
downloadedLinesCache.clear();
124131
}
125132
}
126133

@@ -165,13 +172,49 @@ private List<String> readLines0(final String blobName, final String compression)
165172
InputStream decompressedStream = getDecompressedStream(bais, compression);
166173
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
167174
BufferedReader bufferedReader = new BufferedReader(reader)) {
168-
169175
return bufferedReader.lines().collect(Collectors.toList());
170176
} catch (final IOException e) {
171177
throw new RuntimeException(e); // NOPMD
172178
}
173179
}
174180

181+
public List<String> downloadBlobAndReadLines(final String blobName, final String compression) {
182+
Objects.requireNonNull(blobName, "blobName cannot be null");
183+
Objects.requireNonNull(compression, "compression cannot be null");
184+
if (cache) {
185+
return downloadedLinesCache.computeIfAbsent(blobName,
186+
k -> downloadBlobAndReadLines0(blobName, compression));
187+
} else {
188+
return downloadBlobAndReadLines0(blobName, compression);
189+
}
190+
}
191+
192+
private List<String> downloadBlobAndReadLines0(final String blobName, final String compression) {
193+
final String filePath = downloadBlobToTempFile(blobName);
194+
try {
195+
final byte[] bytes = Files.readAllBytes(Path.of(filePath));
196+
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
197+
InputStream decompressedStream = getDecompressedStream(bais, compression);
198+
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
199+
BufferedReader bufferedReader = new BufferedReader(reader)) {
200+
return bufferedReader.lines().collect(Collectors.toList());
201+
}
202+
} catch (IOException exception) {
203+
throw new RuntimeException(exception); // NOPMD
204+
}
205+
}
206+
207+
private String downloadBlobToTempFile(final String blobName) {
208+
try {
209+
final File file = File.createTempFile("tmp", null);
210+
final String filePath = file.getAbsolutePath();
211+
storage.downloadTo(BlobId.fromGsUtilUri("gs://" + bucketName + "/" + blobName), Paths.get(filePath));
212+
return filePath;
213+
} catch (final IOException e) {
214+
throw new RuntimeException(e); // NOPMD
215+
}
216+
}
217+
175218
private InputStream getDecompressedStream(final InputStream inputStream, final String compression)
176219
throws IOException {
177220
Objects.requireNonNull(inputStream, "inputStream cannot be null");
@@ -211,6 +254,27 @@ private List<List<String>> readAndDecodeLines0(final String blobName, final Stri
211254
.collect(Collectors.toList());
212255
}
213256

257+
public List<List<String>> downloadBlobAndDecodeFields(final String blobName, final String compression,
258+
final int... fieldsToDecode) {
259+
Objects.requireNonNull(blobName, "blobName cannot be null");
260+
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");
261+
262+
if (cache) {
263+
return decodedLinesCache.computeIfAbsent(blobName,
264+
k -> downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode));
265+
} else {
266+
return downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode);
267+
}
268+
}
269+
270+
private List<List<String>> downloadBlobAndDecodeFields0(final String blobName, final String compression,
271+
final int... fieldsToDecode) {
272+
return downloadBlobAndReadLines(blobName, compression).stream()
273+
.map(l -> l.split(","))
274+
.map(fields -> decodeRequiredFields(fields, fieldsToDecode))
275+
.collect(Collectors.toList());
276+
}
277+
214278
private List<String> decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) {
215279
Objects.requireNonNull(originalFields, "originalFields cannot be null");
216280
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");

0 commit comments

Comments
 (0)