From 27ea44c0dc3453659564a492c89479683f422db9 Mon Sep 17 00:00:00 2001 From: Paul Hoang Date: Thu, 28 Nov 2019 15:59:07 +0000 Subject: [PATCH 1/4] add the ability to have the type in the routing key for rabbitmq producer --- .../com/zendesk/maxwell/producer/RabbitmqProducer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java b/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java index cebcfeed8..bcd7adbc8 100644 --- a/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java @@ -6,6 +6,7 @@ import com.rabbitmq.client.MessageProperties; import com.zendesk.maxwell.MaxwellContext; import com.zendesk.maxwell.row.RowMap; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,15 +61,14 @@ public void push(RowMap r) throws Exception { } private String getRoutingKeyFromTemplate(RowMap r) { - String table = r.getTable(); - - if ( table == null ) - table = ""; + String table = StringUtils.defaultString(r.getTable(), StringUtils.EMPTY); + String type = StringUtils.defaultString(r.getRowType(), StringUtils.EMPTY); return context .getConfig() .rabbitmqRoutingKeyTemplate .replace("%db%", r.getDatabase()) - .replace("%table%", table); + .replace("%table%", table) + .replace("%type%", type); } } From 6c083273e5a7cd818aa779b78b41d0bd588e6f3c Mon Sep 17 00:00:00 2001 From: Paul Hoang Date: Sat, 30 Nov 2019 23:55:34 +0000 Subject: [PATCH 2/4] changes to the kafka and redis producers to use the "type" when generating the topic/channel --- .../com/zendesk/maxwell/producer/MaxwellKafkaProducer.java | 5 ++++- .../com/zendesk/maxwell/producer/MaxwellRedisProducer.java | 6 +++++- src/main/java/com/zendesk/maxwell/row/RowIdentity.java | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index b86d3f864..f9e91fe0b 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -237,7 +237,10 @@ private Integer getNumPartitions(String topic) { private String generateTopic(String topic, RowIdentity pk){ if ( interpolateTopic ) - return topic.replaceAll("%\\{database\\}", pk.getDatabase()).replaceAll("%\\{table\\}", pk.getTable()); + return topic + .replaceAll("%\\{database\\}", pk.getDatabase()) + .replaceAll("%\\{table\\}", pk.getTable()) + .replaceAll("%\\{type\\}", pk.getRowType()); else return topic; } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java index 29f1110c9..fda0a1bc0 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java @@ -45,7 +45,11 @@ public MaxwellRedisProducer(MaxwellContext context) { private String generateChannel(RowIdentity pk){ if (interpolateChannel) { - return channel.replaceAll("%\\{database}", pk.getDatabase()).replaceAll("%\\{table}", pk.getTable()); + return channel + .replaceAll("%\\{database}", pk.getDatabase()) + .replaceAll("%\\{table}", pk.getTable()) + .replaceAll("%\\{type}", pk.getRowType()) + ; } return channel; diff --git a/src/main/java/com/zendesk/maxwell/row/RowIdentity.java b/src/main/java/com/zendesk/maxwell/row/RowIdentity.java index 4c44e0c38..c977dd476 100644 --- a/src/main/java/com/zendesk/maxwell/row/RowIdentity.java +++ b/src/main/java/com/zendesk/maxwell/row/RowIdentity.java @@ -28,6 +28,10 @@ public String getTable() { return table; } + public String getRowType() { + return rowType; + } + public String toKeyJson(RowMap.KeyFormat keyFormat) throws IOException { MaxwellJson json = MaxwellJson.getInstance(); JsonGenerator g = json.reset(); From 9072a27eef6f08295a7a244cdf223eed05628237 Mon Sep 17 00:00:00 2001 From: Paul Hoang Date: Sun, 1 Dec 2019 20:00:01 +0000 Subject: [PATCH 3/4] introduced new interface for the kafka and redis producers to implement. Gives a default implementation so that procuders can builder topic/channel(destination) strings --- .../maxwell/producer/DestinationBuilder.java | 17 +++++++++++++++++ .../maxwell/producer/MaxwellKafkaProducer.java | 10 ++-------- .../maxwell/producer/MaxwellRedisProducer.java | 12 ++---------- 3 files changed, 21 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java diff --git a/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java b/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java new file mode 100644 index 000000000..5ecd5abe2 --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java @@ -0,0 +1,17 @@ +package com.zendesk.maxwell.producer; + +import com.zendesk.maxwell.row.RowIdentity; + +public interface DestinationBuilder { + + default String buildDestinationString(boolean interpolateChannel, String channel, RowIdentity pk) { + if (interpolateChannel) { + return channel + .replaceAll("%\\{database}", pk.getDatabase()) + .replaceAll("%\\{table}", pk.getTable()) + .replaceAll("%\\{type}", pk.getRowType()) + ; + } + return channel; + } +} diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index f9e91fe0b..33721a7fc 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -134,7 +134,7 @@ public KafkaProducerDiagnostic getDiagnostic() { } } -class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { +class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask, DestinationBuilder { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final Producer kafka; @@ -236,13 +236,7 @@ private Integer getNumPartitions(String topic) { } private String generateTopic(String topic, RowIdentity pk){ - if ( interpolateTopic ) - return topic - .replaceAll("%\\{database\\}", pk.getDatabase()) - .replaceAll("%\\{table\\}", pk.getTable()) - .replaceAll("%\\{type\\}", pk.getRowType()); - else - return topic; + return this.buildDestinationString(interpolateTopic, topic, pk); } @Override diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java index fda0a1bc0..edeb6c614 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java @@ -12,7 +12,7 @@ import java.util.HashMap; import java.util.Map; -public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask { +public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask, DestinationBuilder { private static final Logger logger = LoggerFactory.getLogger(MaxwellRedisProducer.class); private final String channel; private final boolean interpolateChannel; @@ -44,15 +44,7 @@ public MaxwellRedisProducer(MaxwellContext context) { } private String generateChannel(RowIdentity pk){ - if (interpolateChannel) { - return channel - .replaceAll("%\\{database}", pk.getDatabase()) - .replaceAll("%\\{table}", pk.getTable()) - .replaceAll("%\\{type}", pk.getRowType()) - ; - } - - return channel; + return this.buildDestinationString(interpolateChannel, channel, pk); } private void sendToRedis(RowMap msg) throws Exception { From e05e9d3d16e79c09a2b295d50a11206e3c70ece2 Mon Sep 17 00:00:00 2001 From: Paul Hoang Date: Sat, 14 Dec 2019 23:48:58 +0000 Subject: [PATCH 4/4] removed previsously created Destination builder for the kafka,redis and rabbit producers and moved the default method to build the destination String into the RowMap class --- .../maxwell/producer/DestinationBuilder.java | 17 ----------------- .../maxwell/producer/MaxwellKafkaProducer.java | 15 +++++++++------ .../maxwell/producer/MaxwellRedisProducer.java | 12 +++++++----- .../maxwell/producer/RabbitmqProducer.java | 12 +++--------- .../java/com/zendesk/maxwell/row/RowMap.java | 17 +++++++++++++++++ 5 files changed, 36 insertions(+), 37 deletions(-) delete mode 100644 src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java diff --git a/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java b/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java deleted file mode 100644 index 5ecd5abe2..000000000 --- a/src/main/java/com/zendesk/maxwell/producer/DestinationBuilder.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.zendesk.maxwell.producer; - -import com.zendesk.maxwell.row.RowIdentity; - -public interface DestinationBuilder { - - default String buildDestinationString(boolean interpolateChannel, String channel, RowIdentity pk) { - if (interpolateChannel) { - return channel - .replaceAll("%\\{database}", pk.getDatabase()) - .replaceAll("%\\{table}", pk.getTable()) - .replaceAll("%\\{type}", pk.getRowType()) - ; - } - return channel; - } -} diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 33721a7fc..7da3f1133 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -134,7 +134,7 @@ public KafkaProducerDiagnostic getDiagnostic() { } } -class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask, DestinationBuilder { +class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final Producer kafka; @@ -235,8 +235,12 @@ private Integer getNumPartitions(String topic) { } } - private String generateTopic(String topic, RowIdentity pk){ - return this.buildDestinationString(interpolateTopic, topic, pk); + private String generateTopic(String topic, RowMap rowMap){ + if(rowMap == null || !interpolateTopic){ + return topic; + } + return rowMap.buildDestinationString(topic, "%\\{database}", + "%\\{table}", "%\\{type}"); } @Override @@ -270,7 +274,6 @@ void sendAsync(ProducerRecord record, Callback callback) { } ProducerRecord makeProducerRecord(final RowMap r) throws Exception { - RowIdentity pk = r.getRowIdentity(); String key = r.pkToJson(keyFormat); String value = r.toJSON(outputConfig); ProducerRecord record; @@ -282,7 +285,7 @@ record = new ProducerRecord<>(this.ddlTopic, this.ddlPartitioner.kafkaPartition( // javascript topic override topic = r.getKafkaTopic(); if ( topic == null ) { - topic = generateTopic(this.topic, pk); + topic = generateTopic(this.topic, r); } record = new ProducerRecord<>(topic, this.partitioner.kafkaPartition(r, getNumPartitions(topic)), key, value); @@ -293,7 +296,7 @@ record = new ProducerRecord<>(topic, this.partitioner.kafkaPartition(r, getNumPa ProducerRecord makeFallbackRecord(String fallbackTopic, final RowIdentity pk, Exception reason) throws Exception { String key = pk.toKeyJson(keyFormat); String value = pk.toFallbackValueWithReason(reason.getClass().getSimpleName()); - String topic = generateTopic(fallbackTopic, pk); + String topic = generateTopic(fallbackTopic, queue.peek()); return new ProducerRecord<>(topic, key, value); } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java index edeb6c614..af3ae57f2 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java @@ -1,7 +1,6 @@ package com.zendesk.maxwell.producer; import com.zendesk.maxwell.MaxwellContext; -import com.zendesk.maxwell.row.RowIdentity; import com.zendesk.maxwell.row.RowMap; import com.zendesk.maxwell.util.StoppableTask; import org.slf4j.Logger; @@ -12,7 +11,7 @@ import java.util.HashMap; import java.util.Map; -public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask, DestinationBuilder { +public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask { private static final Logger logger = LoggerFactory.getLogger(MaxwellRedisProducer.class); private final String channel; private final boolean interpolateChannel; @@ -43,14 +42,17 @@ public MaxwellRedisProducer(MaxwellContext context) { } } - private String generateChannel(RowIdentity pk){ - return this.buildDestinationString(interpolateChannel, channel, pk); + private String generateChannel(RowMap rowMap){ + if(interpolateChannel) { + return rowMap.buildDestinationString(channel, "%\\{database}", "%\\{table}", "%\\{type}"); + } + return channel; } private void sendToRedis(RowMap msg) throws Exception { String messageStr = msg.toJSON(outputConfig); - String channel = this.generateChannel(msg.getRowIdentity()); + String channel = this.generateChannel(msg); switch (redisType) { case "lpush": diff --git a/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java b/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java index bcd7adbc8..4262d2534 100644 --- a/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java @@ -61,14 +61,8 @@ public void push(RowMap r) throws Exception { } private String getRoutingKeyFromTemplate(RowMap r) { - String table = StringUtils.defaultString(r.getTable(), StringUtils.EMPTY); - String type = StringUtils.defaultString(r.getRowType(), StringUtils.EMPTY); - - return context - .getConfig() - .rabbitmqRoutingKeyTemplate - .replace("%db%", r.getDatabase()) - .replace("%table%", table) - .replace("%type%", type); + String destination = context.getConfig().rabbitmqRoutingKeyTemplate; + return r.buildDestinationString(destination, + "%db%", "%table%", "%type%"); } } diff --git a/src/main/java/com/zendesk/maxwell/row/RowMap.java b/src/main/java/com/zendesk/maxwell/row/RowMap.java index 95c310123..12e5c4975 100644 --- a/src/main/java/com/zendesk/maxwell/row/RowMap.java +++ b/src/main/java/com/zendesk/maxwell/row/RowMap.java @@ -5,6 +5,7 @@ import com.zendesk.maxwell.replication.BinlogPosition; import com.zendesk.maxwell.producer.MaxwellOutputConfig; import com.zendesk.maxwell.replication.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -413,4 +414,20 @@ public void setPartitionString(String partitionString) { this.partitionString = partitionString; } + /** + * Used by Producers to build the destination String (Topic, Channel, RoutingKey) + * + * @param destination The default destination String + * @param databaseRegex database regex to be replaced by the database name + * @param tableRegex The table regex to be replaced by the table name + * @param typeRegex The type regex to be replaced by the type + * @return The destination string with all the regex's replaced by their corresponding types + */ + public String buildDestinationString(String destination, String databaseRegex, String tableRegex, + String typeRegex) { + return destination + .replaceAll(databaseRegex, StringUtils.defaultString(this.database, StringUtils.EMPTY)) + .replaceAll(tableRegex, StringUtils.defaultString(this.table, StringUtils.EMPTY)) + .replaceAll(typeRegex, StringUtils.defaultString(this.rowType, StringUtils.EMPTY)); + } }