Skip to content

Commit

Permalink
Add Java Csv and Json write pipeline examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Amar3tto committed Feb 27, 2024
1 parent ab29225 commit afd9f81
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 0 deletions.
14 changes: 14 additions & 0 deletions Java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,20 @@
<version>${beam.version}</version>
</dependency>

<!-- Adds a dependency on the Beam CSV IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-csv</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Adds a dependency on the Beam JSON IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-json</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
Expand Down
92 changes: 92 additions & 0 deletions Java/src/main/java/gcs/WriteCsvIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gcs;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.csv.CsvIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.commons.csv.CSVFormat;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

/**
* Pipeline for writing data to CSV files using the {@code CsvIO.write()} transform.
*/
public class WriteCsvIO {

/** Represents an Example CSV record. */
@DefaultSchema(JavaFieldSchema.class)
public static class ExampleRecord implements Serializable {
public int id;
public String name;

public ExampleRecord() {
}

public ExampleRecord(int id, String name) {
this.id = id;
this.name = name;
}
}

/**
* Pipeline options for write to CSV files.
*/
public interface WriteCsvOptions extends PipelineOptions {

@Description("A file path prefix to write CSV files to")
@Validation.Required
String getFilePathPrefix();

void setFilePathPrefix(String filePathPrefix);
}

public static void main(String[] args) {
WriteCsvOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WriteCsvOptions.class);

Pipeline p = Pipeline.create(options);

List<ExampleRecord> rows =
Arrays.asList(
new ExampleRecord(1, "Charles"),
new ExampleRecord(2, "Alice"),
new ExampleRecord(3, "Bob"),
new ExampleRecord(4, "Amanda"),
new ExampleRecord(5, "Alex"),
new ExampleRecord(6, "Eliza"));

CSVFormat csvFormat =
CSVFormat.DEFAULT.withHeaderComments("example comment 1", "example comment 2")
.withCommentMarker('#');

p.apply("Create", Create.of(rows))
.apply(
"Write to CSV",
CsvIO.<ExampleRecord>write(options.getFilePathPrefix(), csvFormat)
.withNumShards(1));
p.run();
}
}
87 changes: 87 additions & 0 deletions Java/src/main/java/gcs/WriteJsonIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gcs;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.json.JsonIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

/**
* Pipeline for writing data to JSON files using the {@code JsonIO.write()} transform.
*/
public class WriteJsonIO {

/** Represents an Example JSON record. */
@DefaultSchema(JavaFieldSchema.class)
public static class ExampleRecord implements Serializable {
public int id;
public String name;

public ExampleRecord() {
}

public ExampleRecord(int id, String name) {
this.id = id;
this.name = name;
}
}

/**
* Pipeline options for write to JSON files.
*/
public interface WriteJsonOptions extends PipelineOptions {

@Description("A file path prefix to write JSON files to")
@Validation.Required
String getFilePathPrefix();

void setFilePathPrefix(String filePathPrefix);
}

public static void main(String[] args) {
WriteJsonOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WriteJsonOptions.class);

Pipeline p = Pipeline.create(options);

List<ExampleRecord> rows =
Arrays.asList(
new ExampleRecord(1, "Charles"),
new ExampleRecord(2, "Alice"),
new ExampleRecord(3, "Bob"),
new ExampleRecord(4, "Amanda"),
new ExampleRecord(5, "Alex"),
new ExampleRecord(6, "Eliza"));

p.apply("Create", Create.of(rows))
.apply(
"Write to JSON",
JsonIO.<ExampleRecord>write(options.getFilePathPrefix())
.withNumShards(1));
p.run();
}
}

0 comments on commit afd9f81

Please sign in to comment.