Skip to content

Spanner support #1315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ config/*.yml
.classpath
.project
.settings/
.vscode/
.factorypath
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@
<artifactId>simpleclient_servlet</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>1.30.0</version>
</dependency>
</dependencies>

<build>
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class MaxwellConfig extends AbstractConfig {
public boolean rabbitmqMessagePersistent;
public boolean rabbitmqDeclareExchange;

public String spannerProject;
public String spannerInstance;
public String spannerDatabase;
public String spannerSourceDatabase;

public String redisHost;
public int redisPort;
public String redisAuth;
Expand Down Expand Up @@ -288,6 +293,17 @@ protected OptionParser buildOptionParser() {

parser.accepts( "__separator_10" );

parser.accepts("spanner_project_id",
"provide a google cloud platform project id associated with the Spanner instance").withRequiredArg();
parser.accepts("spanner_instance", "provide a spanner instance")
.withRequiredArg();
parser.accepts("spanner_database",
"provide a spanner database")
.withRequiredArg();
parser.accepts("spanner_source_database", "provide a source database from which replication is setup").withRequiredArg();

parser.accepts( "__separator_11" );

parser.accepts( "metrics_prefix", "the prefix maxwell will apply to all metrics" ).withRequiredArg();
parser.accepts( "metrics_type", "how maxwell metrics will be reported, at least one of slf4j|jmx|http|datadog" ).withRequiredArg();
parser.accepts( "metrics_slf4j_interval", "the frequency metrics are emitted to the log, in seconds, when slf4j reporting is configured" ).withRequiredArg();
Expand All @@ -305,7 +321,7 @@ protected OptionParser buildOptionParser() {
parser.accepts( "http_diagnostic_timeout", "the http diagnostic response timeout in ms when http_diagnostic=true. default: 10000" ).withRequiredArg();
parser.accepts( "metrics_jvm", "enable jvm metrics: true|false. default: false" ).withRequiredArg();

parser.accepts( "__separator_11" );
parser.accepts( "__separator_12" );

parser.accepts( "help", "display help" ).forHelp();

Expand Down Expand Up @@ -410,6 +426,10 @@ private void setup(OptionSet options, Properties properties) {
this.redisPubChannel = fetchOption("redis_pub_channel", options, properties, "maxwell");
this.redisListKey = fetchOption("redis_list_key", options, properties, "maxwell");
this.redisType = fetchOption("redis_type", options, properties, "pubsub");
this.spannerProject = fetchOption("spanner_project_id", options, properties, null);
this.spannerInstance = fetchOption("spanner_instance", options, properties, null);
this.spannerDatabase = fetchOption("spanner_database", options, properties, null);
this.spannerSourceDatabase = fetchOption("spanner_source_database", options, properties, null);

String kafkaBootstrapServers = fetchOption("kafka.bootstrap.servers", options, properties, null);
if ( kafkaBootstrapServers != null )
Expand Down Expand Up @@ -647,6 +667,8 @@ public void validate() {
usageForOptions("please specify a stream name for kinesis", "kinesis_stream");
} else if (this.producerType.equals("sqs") && this.sqsQueueUri == null) {
usageForOptions("please specify a queue uri for sqs", "sqs_queue_uri");
} else if (this.producerType.equals("spanner") && (this.spannerProject == null || this.spannerInstance == null || this.spannerDatabase == null || this.spannerSourceDatabase == null)) {
usageForOptions("please specify a spanner project, instance, database and source_database", "spanner_project_id", "spanner_instance", "spanner_database", "spanner_source_database");
}

if ( !this.bootstrapperType.equals("async")
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ public AbstractProducer getProducer() throws IOException {
case "redis":
this.producer = new MaxwellRedisProducer(this, this.config.redisPubChannel, this.config.redisListKey, this.config.redisType);
break;
case "spanner":
this.producer = new SpannerProducer(this, this.config.spannerProject, this.config.spannerInstance, this.config.spannerDatabase, this.config.spannerSourceDatabase);
break;
case "none":
this.producer = new NoneProducer(this);
break;
Expand Down
211 changes: 211 additions & 0 deletions src/main/java/com/zendesk/maxwell/producer/SpannerProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package com.zendesk.maxwell.producer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowIdentity;
import com.zendesk.maxwell.row.RowMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerProducer extends AbstractProducer {
public static final Logger LOGGER = LoggerFactory.getLogger(SpannerProducer.class);

private final DatabaseClient dbClient;

private String sourceDatabase;

public SpannerProducer(MaxwellContext context, String project, String instance, String database, String sourceDatabase) throws IOException {
super(context);
SpannerOptions options = SpannerOptions.newBuilder().build();
Spanner spanner = options.getService();
DatabaseId db = DatabaseId.of(project, instance, database);
this.dbClient = spanner.getDatabaseClient(db);
this.sourceDatabase = sourceDatabase;
LOGGER.info("Spanner Producer initialized with client" + this.dbClient.toString() + ", Project: "+ project);
}

public void manipulate(DatabaseClient dbClient, String sql) {
dbClient
.readWriteTransaction()
.run(new TransactionCallable<Void>() {
public Void run(TransactionContext tx) throws Exception {
long startTime = System.nanoTime();
List<Statement> stmts = new ArrayList<Statement>();
stmts.add(Statement.newBuilder(sql).build());
tx.batchUpdate(stmts);
long endTime = System.nanoTime() - startTime;
LOGGER.info("Took " + endTime + "ns");
return null;
}
});
}

@Override
public void push(RowMap r) throws Exception {
try {
String db = r.getDatabase();
if (!db.equalsIgnoreCase(sourceDatabase)) {
LOGGER.info("Replication not whitelisted for DB: " + db);
return;
}
String table = r.getTable();
String type = r.getRowType();
LOGGER.info("Table:" + table);
LinkedHashMap<String, Object> data = r.getData();
String sql = "";
if (type == "insert" || type == "replace") {
sql = this.getInsertSql(r);
} else if (type == "update") {
sql = this.getUpdateSql(r);
} else if (type == "delete") {
sql = this.getDeleteSql(r);
} else {
sql = r.getRowQuery();
}

LOGGER.info("SQL:" + sql);
this.manipulate(dbClient, sql);

this.context.setPosition(r);
} catch (Exception e) {
LOGGER.info("Error: " + e.getMessage() + ", Data: " + r.toJSON(outputConfig));
}
}

private String getInsertSql(RowMap r) {
String table = r.getTable();
LinkedHashMap<String, Object> data = r.getData();
String sql = "";
sql = "INSERT INTO " + table + "(";
Iterator<String> keys = data.keySet().iterator();
while (keys.hasNext()) {
sql += keys.next() + ",";
}
if (sql.endsWith(",")) {
sql = sql.substring(0, sql.length() - 1);
}
sql += ") VALUES (";
Iterator<Object> values = data.values().iterator();
while (values.hasNext()) {
Object nextObject = values.next();
if (nextObject instanceof Long) {
sql += (Long) nextObject + ",";
} else if (nextObject instanceof Integer) {
sql += (String) nextObject + ",";
} else if (nextObject instanceof String) {
sql += "'" + this.escapeString((String) nextObject) + "'" + ",";
} else {
sql += (String) nextObject + ",";
}
}
if (sql.endsWith(",")) {
sql = sql.substring(0, sql.length() - 1);
}
sql += ")";
return sql;
}

private String getUpdateSql(RowMap r) {
String table = r.getTable();
LinkedHashMap<String, Object> data = r.getData();
LinkedHashMap<String, Object> oldData = r.getOldData();
String sql = "";
sql = "UPDATE " + table + " SET ";
Iterator<String> oldKeys = oldData.keySet().iterator();
while (oldKeys.hasNext()) {
String oldKey = oldKeys.next();
sql += oldKey + " = ";
Object valueObject = data.get(oldKey);
if (valueObject instanceof Long) {
sql += (Long) valueObject + ",";
} else if (valueObject instanceof Integer) {
sql += (String) valueObject + ",";
} else if (valueObject instanceof String) {
sql += "'" + this.escapeString((String) valueObject) + "'" + ",";
} else {
sql += (String) valueObject + ",";
}
}
if (sql.endsWith(",")) {
sql = sql.substring(0, sql.length() - 1);
}
sql += " WHERE ";
for (String pkColumn : r.getPKColumns()) {
sql += pkColumn + " = ";
Object valueObject = data.get(pkColumn);
if (valueObject instanceof Long) {
sql += (Long) valueObject + " AND ";
} else if (valueObject instanceof Integer) {
sql += (String) valueObject + " AND ";
} else if (valueObject instanceof String) {
sql += "'" + this.escapeString((String) valueObject) + "'" + " AND ";
} else {
sql += (String) valueObject + " AND ";
}
}
if (sql.endsWith(" AND ")) {
sql = sql.substring(0, sql.length() - 5);
}
return sql;
}

private String getDeleteSql(RowMap r) {
String table = r.getTable();
LinkedHashMap<String, Object> data = r.getData();
LinkedHashMap<String, Object> oldData = r.getOldData();
String sql = "";
sql = "DELETE FROM " + table + " WHERE ";
for (String pkColumn : r.getPKColumns()) {
sql += pkColumn + " = ";
Object valueObject = data.get(pkColumn);
if (valueObject instanceof Long) {
sql += (Long) valueObject + " AND ";
} else if (valueObject instanceof Integer) {
sql += (String) valueObject + " AND ";
} else if (valueObject instanceof String) {
sql += "'" + this.escapeString((String) valueObject) + "'" + " AND ";
} else {
sql += (String) valueObject + " AND ";
}
}
if (sql.endsWith(" AND ")) {
sql = sql.substring(0, sql.length() - 5);
}
return sql;
}

private String escapeString(String in) {
String out = "";
out = in.replaceAll("'", "\''");
return out;
}

private Boolean checkBase64(String in) {
if (in.endsWith("=")) {
Base64.Decoder decoder = Base64.getDecoder();
try {
decoder.decode(in);
return true;
} catch (Exception e) {
return false;
}
} else {
return false;
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/zendesk/maxwell/row/RowMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ public String getRowType() {
return this.rowType;
}

public List<String> getPKColumns() {
return this.pkColumns;
}

// determines whether there is anything for the producer to output
// override this for extended classes that don't output a value
// return false when there is a heartbeat row or other row with suppressed output
Expand Down