From 69b99f078ae42dd5c7eb29d8d5dc44471034963d Mon Sep 17 00:00:00 2001 From: lichang Date: Wed, 30 Dec 2020 17:46:40 +0800 Subject: [PATCH] rocketmq --- config.properties.example | 6 ++ pom.xml | 5 ++ .../com/zendesk/maxwell/MaxwellConfig.java | 28 ++++++++- .../com/zendesk/maxwell/MaxwellContext.java | 31 +++++++-- .../maxwell/producer/RocketmqProducer.java | 63 +++++++++++++++++++ 5 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/zendesk/maxwell/producer/RocketmqProducer.java diff --git a/config.properties.example b/config.properties.example index a862236bc..cb0ec246c 100644 --- a/config.properties.example +++ b/config.properties.example @@ -4,6 +4,12 @@ log_level=info producer=kafka kafka.bootstrap.servers=localhost:9092 +producer=rocketmq +rocketmq_namesrv_addr=localhost:9876 +rocketmq_producer_group=rocketmq_producer_group +rocketmq_send_topic=maxwell +rocketmq_tags=* + # mysql login info host=localhost user=maxwell diff --git a/pom.xml b/pom.xml index 0c51d0787..639e8ea43 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,11 @@ jgroups-raft 1.0.0.Final + + org.apache.rocketmq + rocketmq-client + 4.7.1 + diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 633abfbe3..8a930685d 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -21,10 +21,13 @@ import org.slf4j.LoggerFactory; import org.threeten.bp.Duration; -import java.util.*; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; import java.util.regex.Pattern; -public class MaxwellConfig extends AbstractConfig { +public class MaxwellConfig extends AbstractConfig +{ static final Logger LOGGER = LoggerFactory.getLogger(MaxwellConfig.class); public static final String GTID_MODE_ENV = "GTID_MODE"; @@ -131,6 +134,11 @@ public class MaxwellConfig extends AbstractConfig { public boolean rabbitmqMessagePersistent; public boolean rabbitmqDeclareExchange; + public String rocketmqNamesrvAddr; + public String rocketmqProducerGroup; + public String rocketmqSendTopic; + public String rocketmqTags; + public String redisHost; public int redisPort; public String redisAuth; @@ -427,6 +435,14 @@ protected MaxwellOptionParser buildOptionParser() { parser.accepts( "rabbitmq_message_persistent", "Message persistence. Defaults to false" ).withOptionalArg(); parser.accepts( "rabbitmq_declare_exchange", "Should declare the exchange for rabbitmq publisher. Defaults to true" ).withOptionalArg(); + + parser.section( "rocketmq" ); + + parser.accepts( "rocketmq_namesrv_addr", "Rocketmq NameServer" ).withRequiredArg(); + parser.accepts( "rocketmq_producer_group", "Rocketmq ProducerGroup" ).withRequiredArg(); + parser.accepts( "rocketmq_send_topic", "optionally provide a topic name to push to. default: maxwell" ).withRequiredArg(); + parser.accepts( "rocketmq_tags", "Rocketmq send messgae tags. default:tags" ).withRequiredArg(); + parser.section( "redis" ); parser.accepts( "redis_host", "Host of Redis server" ).withRequiredArg(); @@ -557,6 +573,11 @@ private void setup(OptionSet options, Properties properties) { this.rabbitmqMessagePersistent = fetchBooleanOption("rabbitmq_message_persistent", options, properties, false); this.rabbitmqDeclareExchange = fetchBooleanOption("rabbitmq_declare_exchange", options, properties, true); + this.rocketmqNamesrvAddr = fetchStringOption("rocketmq_namesrv_addr", options, properties, "localhost:9876"); + this.rocketmqProducerGroup = fetchStringOption("rocketmq_producer_group", options, properties, "rocketmq_producer_group"); + this.rocketmqSendTopic = fetchStringOption("rocketmq_send_topic", options, properties, "maxwell"); + this.rocketmqTags = fetchStringOption("rocketmq_tags", options, properties, "tags"); + this.redisHost = fetchStringOption("redis_host", options, properties, "localhost"); this.redisPort = fetchIntegerOption("redis_port", options, properties, 6379); this.redisAuth = fetchStringOption("redis_auth", options, properties, null); @@ -966,7 +987,8 @@ public Properties getKafkaProperties() { return this.kafkaProperties; } - public static Pattern compileStringToPattern(String name) throws InvalidFilterException { + public static Pattern compileStringToPattern(String name) throws InvalidFilterException + { name = name.trim(); if ( name.startsWith("/") ) { if ( !name.endsWith("/") ) { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 81d03bc19..3385870f0 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -5,22 +5,42 @@ import com.zendesk.maxwell.bootstrap.BootstrapController; import com.zendesk.maxwell.bootstrap.SynchronousBootstrapper; import com.zendesk.maxwell.filtering.Filter; -import com.zendesk.maxwell.monitoring.*; -import com.zendesk.maxwell.producer.*; +import com.zendesk.maxwell.monitoring.MaxwellDiagnostic; +import com.zendesk.maxwell.monitoring.MaxwellDiagnosticContext; +import com.zendesk.maxwell.monitoring.MaxwellHTTPServer; +import com.zendesk.maxwell.monitoring.MaxwellMetrics; +import com.zendesk.maxwell.monitoring.Metrics; +import com.zendesk.maxwell.producer.AbstractProducer; +import com.zendesk.maxwell.producer.BufferedProducer; +import com.zendesk.maxwell.producer.FileProducer; +import com.zendesk.maxwell.producer.MaxwellKafkaProducer; +import com.zendesk.maxwell.producer.MaxwellKinesisProducer; +import com.zendesk.maxwell.producer.MaxwellPubsubProducer; +import com.zendesk.maxwell.producer.MaxwellRedisProducer; +import com.zendesk.maxwell.producer.MaxwellSQSProducer; +import com.zendesk.maxwell.producer.NoneProducer; +import com.zendesk.maxwell.producer.ProfilerProducer; +import com.zendesk.maxwell.producer.RabbitmqProducer; +import com.zendesk.maxwell.producer.RocketmqProducer; +import com.zendesk.maxwell.producer.StdoutProducer; import com.zendesk.maxwell.recovery.RecoveryInfo; -import com.zendesk.maxwell.replication.*; +import com.zendesk.maxwell.replication.BinlogConnectorDiagnostic; +import com.zendesk.maxwell.replication.HeartbeatNotifier; +import com.zendesk.maxwell.replication.MysqlVersion; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.replication.Replicator; import com.zendesk.maxwell.row.RowMap; import com.zendesk.maxwell.schema.MysqlPositionStore; import com.zendesk.maxwell.schema.MysqlSchemaCompactor; import com.zendesk.maxwell.schema.PositionStoreThread; import com.zendesk.maxwell.schema.ReadOnlyMysqlPositionStore; import com.zendesk.maxwell.util.C3P0ConnectionPool; +import com.zendesk.maxwell.util.ConnectionPool; import com.zendesk.maxwell.util.RunLoopProcess; import com.zendesk.maxwell.util.StoppableTask; import com.zendesk.maxwell.util.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.zendesk.maxwell.util.ConnectionPool; import java.io.IOException; import java.net.URISyntaxException; @@ -400,6 +420,9 @@ public AbstractProducer getProducer() throws IOException { case "rabbitmq": this.producer = new RabbitmqProducer(this); break; + case "rocketmq": + this.producer = new RocketmqProducer(this); + break; case "redis": this.producer = new MaxwellRedisProducer(this); break; diff --git a/src/main/java/com/zendesk/maxwell/producer/RocketmqProducer.java b/src/main/java/com/zendesk/maxwell/producer/RocketmqProducer.java new file mode 100644 index 000000000..f74a08a9f --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/producer/RocketmqProducer.java @@ -0,0 +1,63 @@ +package com.zendesk.maxwell.producer; + +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.row.RowMap; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketmqProducer extends AbstractProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RocketmqProducer.class); + // 实例化消息生产者Producer + private DefaultMQProducer producer; + + private String rocketmqSendTopic; + private String rocketmqTags; + + + public RocketmqProducer(MaxwellContext context) { + super(context); + try { + producer = new DefaultMQProducer(context.getConfig().rocketmqProducerGroup); + // 设置NameServer的地址 + producer.setNamesrvAddr(context.getConfig().rocketmqNamesrvAddr); + + rocketmqSendTopic = context.getConfig().rocketmqSendTopic; + rocketmqTags = context.getConfig().rocketmqTags; + // 启动Producer实例 + producer.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + + } + + @Override + public void push(RowMap r) throws Exception { + if ( !r.shouldOutput(outputConfig) ) { + context.setPosition(r.getNextPosition()); + return; + } + String value = r.toJSON(outputConfig); + // 创建消息,并指定Topic,Tag和消息体 + Message msg = new Message(rocketmqSendTopic /* Topic */, + rocketmqTags /* Tag */, + (value).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); + + producer.send(msg); + + if ( r.isTXCommit() ) { + context.setPosition(r.getNextPosition()); + } + if ( LOGGER.isDebugEnabled()) { + LOGGER.debug("-> topic:" + rocketmqSendTopic + ", tags:" + rocketmqTags); + } + } + + +}