Skip to content
This repository was archived by the owner on Sep 11, 2024. It is now read-only.

Commit e8917e4

Browse files
committed
feat: Add support for setting object metadata Content-Encoding
Users willing to leverage GCS capability to decompress gzip objects on server-side when accessing them through the Storage API requested the fixed-metadata `Content-Encoding` (default: null) to become configurable so that its value can be set (ie. to `gzip`) when the connector uploads a new file to the bucket. https://cloud.google.com/storage/docs/metadata#content-encoding
1 parent 227fb1d commit e8917e4

File tree

4 files changed

+44
-2
lines changed

4 files changed

+44
-2
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...}
589589
gcs.credentials.default=true
590590
##
591591

592+
# The value of object metadata Content-Encoding.
593+
# This can be used for leveraging storage-side de-compression before download.
594+
# Optional, the default is null.
595+
gcs.object.content.encoding=gzip
592596

593597
# The set of the fields that are to be output, comma separated.
594598
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.

src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig {
5757
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
5858
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
5959
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
60+
public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding";
6061
public static final String GCS_USER_AGENT = "gcs.user.agent";
6162
private static final String GROUP_FILE = "File";
6263
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";
6364
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
6465
public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";
66+
6567
public static final String FILE_MAX_RECORDS = "file.max.records";
6668
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
6769
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
@@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
135137
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
136138
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);
137139

140+
configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null,
141+
new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW,
142+
"The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++,
143+
ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG);
144+
138145
configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
139146
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
140147
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
@@ -332,7 +339,7 @@ private void validate() {
332339
.filter(Objects::nonNull)
333340
.count();
334341

335-
// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
342+
// only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour.
336343
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
337344
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
338345
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
@@ -371,6 +378,10 @@ public String getBucketName() {
371378
return getString(GCS_BUCKET_NAME_CONFIG);
372379
}
373380

381+
public String getObjectContentEncoding() {
382+
return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG);
383+
}
384+
374385
@Override
375386
public CompressionType getCompressionType() {
376387
return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG));

src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
118118
}
119119

120120
private void flushFile(final String filename, final List<SinkRecord> records) {
121-
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build();
121+
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename)
122+
.setContentEncoding(config.getObjectContentEncoding())
123+
.build();
122124
try (var out = Channels.newOutputStream(storage.writer(blob));
123125
var writer = OutputWriter.builder()
124126
.withExternalProperties(config.originalsStrings())

src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,31 @@ void compression(final String compression) {
252252
readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0));
253253
}
254254

255+
@ParameterizedTest
256+
@ValueSource(strings = { "gzip" })
257+
void contentEncoding(final String compression) {
258+
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, compression);
259+
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
260+
final GcsSinkTask task = new GcsSinkTask(properties, storage);
261+
262+
task.put(basicRecords);
263+
task.flush(null);
264+
265+
final CompressionType compressionType = CompressionType.forName(compression);
266+
267+
final List<String> names = Lists.newArrayList("topic0-0-10");
268+
final List<String> blobNames = names.stream()
269+
.map(n -> n + compressionType.extension())
270+
.collect(Collectors.toList());
271+
272+
assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
273+
// reading a gzip-compressed blob with metadata Content-Encoding=gzip should be the same as reading a
274+
// non-compressed blob
275+
assertIterableEquals(
276+
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
277+
readSplittedAndDecodedLinesFromBlob("topic0-0-10" + compressionType.extension(), "none", 0));
278+
}
279+
255280
@ParameterizedTest
256281
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
257282
void allFields(final String compression) {

0 commit comments

Comments
 (0)