Skip to content

Commit

Permalink
KafkaToBigQueryFlex template: Add support to persist Kafka keys for j…
Browse files Browse the repository at this point in the history
…son format

Fixes GoogleCloudPlatform#2088
  • Loading branch information
jayakumarc committed Dec 30, 2024
1 parent 569a68c commit 9c480fe
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ public static Pipeline runJsonPipeline(
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setReloadIntervalMinutes(
options.getJavascriptTextTransformReloadIntervalMinutes())
.setPersistKafkaKey(options.getPersistKafkaKey())
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package com.google.cloud.teleport.v2.transforms;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.convertJsonToTableRow;

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.utils.BigQueryConstants;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import javax.annotation.Nullable;
Expand All @@ -32,10 +34,8 @@
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.*;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;

/**
* The {@link StringMessageToTableRow} class is a {@link PTransform} which transforms incoming Kafka
Expand All @@ -60,6 +60,8 @@ public abstract class StringMessageToTableRow

public abstract @Nullable String fileSystemPath();

public abstract @Nullable Boolean persistKafkaKey();

public abstract @Nullable String functionName();

public abstract @Nullable Integer reloadIntervalMinutes();
Expand All @@ -76,6 +78,8 @@ public static Builder newBuilder() {
public abstract static class Builder {
public abstract Builder setFileSystemPath(@Nullable String fileSystemPath);

public abstract Builder setPersistKafkaKey(@Nullable Boolean persistKafkaKey);

public abstract Builder setFunctionName(@Nullable String functionName);

public abstract Builder setReloadIntervalMinutes(@Nullable Integer value);
Expand Down Expand Up @@ -138,10 +142,7 @@ public PCollectionTuple expand(PCollection<KafkaRecord<String, String>> input) {
PCollectionTuple tableRowOut =
failsafeElements.apply(
"JsonToTableRow",
FailsafeJsonToTableRow.<KafkaRecord<String, String>>newBuilder()
.setSuccessTag(TABLE_ROW_OUT)
.setFailureTag(TABLE_ROW_DEADLETTER_OUT)
.build());
new JsonStringToTableRow(persistKafkaKey(), TABLE_ROW_OUT, TABLE_ROW_DEADLETTER_OUT));

PCollection<FailsafeElement<KafkaRecord<String, String>, String>> badRecords =
tableRowOut.get(TABLE_ROW_DEADLETTER_OUT).setCoder(FAILSAFE_CODER);
Expand Down Expand Up @@ -176,4 +177,56 @@ public void processElement(ProcessContext context) {
context.output(FailsafeElement.of(message, message.getKV().getValue()));
}
}

static class JsonStringToTableRow
extends PTransform<
PCollection<FailsafeElement<KafkaRecord<String, String>, String>>, PCollectionTuple> {

private final Boolean persistKafkaKey;

private final TupleTag<TableRow> successTag;

private final TupleTag<FailsafeElement<KafkaRecord<String, String>, String>> failureTag;

JsonStringToTableRow(
Boolean persistKafkaKey,
TupleTag<TableRow> successTag,
TupleTag<FailsafeElement<KafkaRecord<String, String>, String>> failureTag) {
this.persistKafkaKey = persistKafkaKey;
this.successTag = successTag;
this.failureTag = failureTag;
}

@Override
public PCollectionTuple expand(
PCollection<FailsafeElement<KafkaRecord<String, String>, String>> failsafeElements) {
return failsafeElements.apply(
"JsonStringToTableRow",
ParDo.of(
new DoFn<FailsafeElement<KafkaRecord<String, String>, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext context) {
FailsafeElement<KafkaRecord<String, String>, String> element =
context.element();
String json = element.getPayload();

try {
TableRow row = convertJsonToTableRow(json);
if (persistKafkaKey) {
String key = element.getOriginalPayload().getKV().getKey();
row.set(BigQueryConstants.KAFKA_KEY_FIELD, key);
}
context.output(row);
} catch (Exception e) {
context.output(
failureTag,
FailsafeElement.of(element)
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
})
.withOutputTags(successTag, TupleTagList.of(failureTag)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ public void testKafkaToBigQuery() throws IOException {
.addParameter("kafkaReadAuthenticationMode", "NONE"));
}

@Test
public void testKafkaToBigQueryWithKey() throws IOException {
baseKafkaToBigQuery(
b ->
b.addParameter("messageFormat", "JSON")
.addParameter("writeMode", "SINGLE_TABLE_NAME")
.addParameter("useBigQueryDLQ", "false")
.addParameter("kafkaReadAuthenticationMode", "NONE")
.addParameter("persistKafkaKey", "true"));
}

@Test
public void testKafkaToBigQueryWithUdfFunction() throws RestClientException, IOException {
String udfFileName = "input/transform.js";
Expand Down Expand Up @@ -175,7 +186,8 @@ public void baseKafkaToBigQuery(
Schema bqSchema =
Schema.of(
Field.of("id", StandardSQLTypeName.INT64),
Field.of("name", StandardSQLTypeName.STRING));
Field.of("name", StandardSQLTypeName.STRING),
Field.of("_key", StandardSQLTypeName.STRING));

TableId tableId = bigQueryClient.createTable(bqTable, bqSchema);
TableId deadletterTableId = TableId.of(tableId.getDataset(), tableId.getTable() + "_dlq");
Expand Down Expand Up @@ -226,11 +238,21 @@ public void baseKafkaToBigQuery(
TableResult tableRows = bigQueryClient.readTable(bqTable);
TableResult dlqRows = bigQueryClient.readTable(deadletterTableId);

assertThatBigQueryRecords(tableRows)
.hasRecordsUnordered(
List.of(
Map.of("id", 11, "name", namePostProcessor.apply("Dataflow")),
Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub"))));
if (options.getParameter("persistKafkaKey") != null
&& options.getParameter("persistKafkaKey").equals("true")) {
assertThatBigQueryRecords(tableRows)
.hasRecordsUnordered(
List.of(
Map.of("id", 11, "name", namePostProcessor.apply("Dataflow"), "_key", "11"),
Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub"), "_key", "12")));
} else {
assertThatBigQueryRecords(tableRows)
.hasRecordsUnordered(
List.of(
Map.of("id", 11, "name", namePostProcessor.apply("Dataflow"), "_key", "null"),
Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub"), "_key", "null")));
}

assertThatBigQueryRecords(dlqRows).hasRecordsWithStrings(List.of("bad json string"));
}

Expand Down

0 comments on commit 9c480fe

Please sign in to comment.