Skip to content

Commit

Permalink
pubsub to text with attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
niallscc committed Sep 25, 2024
1 parent 1430d47 commit 8730838
Showing 1 changed file with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
Expand Down Expand Up @@ -65,13 +71,15 @@
},
streaming = true,
supportsAtLeastOnce = true)

public class PubsubToText {

/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/

public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

Expand Down Expand Up @@ -151,6 +159,18 @@ public interface Options
String getOutputFilenameSuffix();

void setOutputFilenameSuffix(String value);

@TemplateParameter.Text(
order = 7,
groupName = "Target",
optional = true,
description = "Include attributes in pull",
helpText = "If specified, pull the message and the attributes from the topic or subscription",
example = "True,False"
)
String getAttributeFlag();

void setAttributeFlag(String value);
}

/**
Expand All @@ -176,6 +196,8 @@ public static void main(String[] args) {
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag());

boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
Expand All @@ -187,17 +209,40 @@ public static PipelineResult run(Options options) {

PCollection<String> messages = null;

/*
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/

if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
if (pullAttributes){
PCollection<PubsubMessage> messagesAttr = pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()));
messages = messagesAttr.apply(
"ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class))
.via((PubsubMessage message) -> {
// Get the message payload as a string
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
// Get the message attributes and convert them to a JSON-like string format
Map<String, String> attributes = message.getAttributeMap();
String attributesString = attributes.entrySet()
.stream()
.map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"")
.collect(Collectors.joining(", ", "{", "}"));

// Return the concatenated string with both the payload and attributes
return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }";
})
);
}else{
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
}
} else {
messages =
pipeline.apply(
Expand Down Expand Up @@ -245,4 +290,4 @@ public static PipelineResult run(Options options) {
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}
}

0 comments on commit 8730838

Please sign in to comment.