Skip to content

Commit

Permalink
Add Java MongoDB read/write pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
Amar3tto committed Feb 16, 2024
1 parent 64118dc commit e89285e
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 0 deletions.
91 changes: 91 additions & 0 deletions Java/src/main/java/mongodb/ReadMongoDB.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongodb;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadMongoDB {

private static final Logger LOG =
LoggerFactory.getLogger(ReadMongoDB.class);

private static final String ID_COLUMN = "id";
private static final String NAME_COLUMN = "name";

/**
* Pipeline options for read from MongoDB.
*/
public interface ReadMongoDbOptions extends PipelineOptions {
@Description("The MongoDB connection string following the URI format")
@Default.String("mongodb://localhost:27017")
String getUri();

void setUri(String uri);

@Description("The MongoDB database name")
@Validation.Required
String getDbName();

void setDbName(String dbName);

@Description("The MongoDB collection name")
@Validation.Required
String getCollection();

void setCollection(String collection);
}

public static void main(String[] args) {
ReadMongoDbOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(ReadMongoDbOptions.class);

Pipeline p = Pipeline.create(options);

p.apply(
"Read from MongoDB",
MongoDbIO.read()
.withUri(options.getUri())
.withDatabase(options.getDbName())
.withCollection(options.getCollection()))
.apply(
"Log Data",
ParDo.of(
new DoFn<Document, Document>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
LOG.info(
"Id = {}, Name = {}",
c.element().get(ID_COLUMN),
c.element().get(NAME_COLUMN));
c.output(c.element());
}
}));

p.run();
}
}
85 changes: 85 additions & 0 deletions Java/src/main/java/mongodb/WriteMongoDB.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongodb;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Create;
import org.bson.Document;

import java.util.Arrays;
import java.util.List;

public class WriteMongoDB {

private static final String ID_COLUMN = "id";
private static final String NAME_COLUMN = "name";

/**
* Pipeline options for write to MongoDB.
*/
public interface WriteMongoDbOptions extends PipelineOptions {
@Description("The MongoDB connection string following the URI format")
@Default.String("mongodb://localhost:27017")
String getUri();

void setUri(String uri);

@Description("The MongoDB database name")
@Validation.Required
String getDbName();

void setDbName(String dbName);

@Description("The MongoDB collection name")
@Validation.Required
String getCollection();

void setCollection(String collection);
}

public static void main(String[] args) {
WriteMongoDbOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WriteMongoDbOptions.class);

Pipeline p = Pipeline.create(options);

List<Document> rows = Arrays.asList(
new Document(ID_COLUMN, 1).append(NAME_COLUMN, "Charles"),
new Document(ID_COLUMN, 2).append(NAME_COLUMN, "Alice"),
new Document(ID_COLUMN, 3).append(NAME_COLUMN, "Bob"),
new Document(ID_COLUMN, 4).append(NAME_COLUMN, "Amanda"),
new Document(ID_COLUMN, 5).append(NAME_COLUMN, "Alex"),
new Document(ID_COLUMN, 6).append(NAME_COLUMN, "Eliza")
);

p.apply("Create", Create.of(rows))
.apply(
"Write to MongoDB",
MongoDbIO.write()
.withUri(options.getUri())
.withDatabase(options.getDbName())
.withCollection(options.getCollection()));

p.run();
}
}

0 comments on commit e89285e

Please sign in to comment.