From 3bfdef8661d9acc420bb7665501651c395461e92 Mon Sep 17 00:00:00 2001 From: onceicy <64947722+onceicy@users.noreply.github.com> Date: Thu, 23 May 2024 23:22:49 +0800 Subject: [PATCH] [ISSUES #530] fix order message bug (#532) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [ISSUES #530] fix order message bug * [ISSUES #530] add order message compatible with rocketmq 4.x config * [ISSUES #530] add order message compatible with rocketmq 4.x config user guide * [ISSUES #530] Change the configuration item from 'ordering.msg.compatible.v4' to 'ordering.msg.enable' --------- Co-authored-by: 靖愉 --- README.md | 7 ++++--- .../connect/runtime/config/SourceConnectorConfig.java | 1 + .../runtime/connectorwrapper/WorkerSourceTask.java | 11 ++++++++++- .../service/AbstractConfigManagementService.java | 6 ++++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 37ae98ce0..49a874646 100644 --- a/README.md +++ b/README.md @@ -344,9 +344,10 @@ public interface Transform extends Component { ### Source Connector特殊配置 -| key | nullable | default | description | -|-----------------------------|---------|---------|------------------------------------------------------| -| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 | +| key | nullable | default | description | +|---------------------|---------|---------|--------------------------------------------------------------| +| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 | +| ordering.msg.enable | true | false | 当目标集群不为rocketmq 5.x时,顺序消息会乱序,若设置为true,才能支持顺序,但会降低connector性能 | ### Sink Connector特殊配置 | key | nullable | default | description | diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java index 120129178..f05abf3cc 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java @@ -26,6 +26,7 @@ public class SourceConnectorConfig extends ConnectorConfig { public static final String CONNECT_TOPICNAME = "connect.topicname"; + public static final String ORDERING_MSG_ENABLE = "ordering.msg.enable"; public SourceConnectorConfig(ConnectKeyValue config) { super(config); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 79317bdb9..1ac8f928a 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -276,7 +276,16 @@ public void onException(Throwable throwable) { } else { // Partition message ordering, // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business - producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); + if (taskConfig.getProperties().get(SourceConnectorConfig.ORDERING_MSG_ENABLE).equals("true")) { + try { + SendResult result = producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys()); + callback.onSuccess(result); + } catch (Exception e) { + callback.onException(e); + } + } else { + producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); + } } } catch (RetriableException e) { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java index 855c08248..2c51c4ff5 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java @@ -340,6 +340,12 @@ public void recomputeTaskConfigs(String connectorName, ConnectKeyValue configs) if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) { newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME, configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME)); } + // put ordering msg enable config + if (configs.containsKey(SourceConnectorConfig.ORDERING_MSG_ENABLE)) { + newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, configs.getString(SourceConnectorConfig.ORDERING_MSG_ENABLE)); + } else { + newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, "false"); + } // sink consume topic if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) { newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES, configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));