From 873083862a2396e84599869d33ea89f8e9917f20 Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Wed, 25 Sep 2024 11:22:05 -0700 Subject: [PATCH] pubsub to text with attributes --- .../templates/pubsubtotext/PubsubToText.java | 57 +++++++++++++++++-- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 0b66afd011..19a42bce80 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -24,19 +24,25 @@ import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options; import com.google.cloud.teleport.v2.utils.DurationUtils; import com.google.common.base.Strings; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; /** * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into @@ -65,6 +71,7 @@ }, streaming = true, supportsAtLeastOnce = true) + public class PubsubToText { /** @@ -72,6 +79,7 @@ public class PubsubToText { * *

Inherits standard configuration options. */ + public interface Options extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { @@ -151,6 +159,18 @@ public interface Options String getOutputFilenameSuffix(); void setOutputFilenameSuffix(String value); + + @TemplateParameter.Text( + order = 7, + groupName = "Target", + optional = true, + description = "Include attributes in pull", + helpText = "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False" + ) + String getAttributeFlag(); + + void setAttributeFlag(String value); } /** @@ -176,6 +196,8 @@ public static void main(String[] args) { */ public static PipelineResult run(Options options) { boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); + boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); + boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); if (useInputSubscription == useInputTopic) { throw new IllegalArgumentException( @@ -187,17 +209,40 @@ public static PipelineResult run(Options options) { PCollection messages = null; - /* + /* * Steps: * 1) Read string messages from PubSub * 2) Window the messages into minute intervals specified by the executor. * 3) Output the windowed files to GCS */ + if (useInputSubscription) { - messages = - pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + if (pullAttributes){ + PCollection messagesAttr = pipeline.apply( + "Read PubSub Events", + PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); + messages = messagesAttr.apply( + "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) + .via((PubsubMessage message) -> { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string format + Map attributes = message.getAttributeMap(); + String attributesString = attributes.entrySet() + .stream() + .map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") + .collect(Collectors.joining(", ", "{", "}")); + + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; + }) + ); + }else{ + messages = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + } } else { messages = pipeline.apply( @@ -245,4 +290,4 @@ public static PipelineResult run(Options options) { private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; } -} +} \ No newline at end of file