Skip to content

Commit

Permalink
[ISSUES #530] fix order message bug (#532)
Browse files Browse the repository at this point in the history
* [ISSUES #530] fix order message bug

* [ISSUES #530] add order message compatible with rocketmq 4.x config

* [ISSUES #530] add order message compatible with rocketmq 4.x config user guide

* [ISSUES #530] Change the configuration item from 'ordering.msg.compatible.v4' to 'ordering.msg.enable'

---------

Co-authored-by: 靖愉 <chenyuquan.cyq@antgroup.com>
  • Loading branch information
onceicy and 靖愉 authored May 23, 2024
1 parent 02a168c commit 3bfdef8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,10 @@ public interface Transform<R extends ConnectRecord> extends Component {


### Source Connector特殊配置
| key | nullable | default | description |
|-----------------------------|---------|---------|------------------------------------------------------|
| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 |
| key | nullable | default | description |
|---------------------|---------|---------|--------------------------------------------------------------|
| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 |
| ordering.msg.enable | true | false | 当目标集群不为rocketmq 5.x时,顺序消息会乱序,若设置为true,才能支持顺序,但会降低connector性能 |

### Sink Connector特殊配置
| key | nullable | default | description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class SourceConnectorConfig extends ConnectorConfig {

public static final String CONNECT_TOPICNAME = "connect.topicname";
public static final String ORDERING_MSG_ENABLE = "ordering.msg.enable";

public SourceConnectorConfig(ConnectKeyValue config) {
super(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,16 @@ public void onException(Throwable throwable) {
} else {
// Partition message ordering,
// At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
if (taskConfig.getProperties().get(SourceConnectorConfig.ORDERING_MSG_ENABLE).equals("true")) {
try {
SendResult result = producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys());
callback.onSuccess(result);
} catch (Exception e) {
callback.onException(e);
}
} else {
producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
}
}

} catch (RetriableException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public void recomputeTaskConfigs(String connectorName, ConnectKeyValue configs)
if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME, configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME));
}
// put ordering msg enable config
if (configs.containsKey(SourceConnectorConfig.ORDERING_MSG_ENABLE)) {
newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, configs.getString(SourceConnectorConfig.ORDERING_MSG_ENABLE));
} else {
newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, "false");
}
// sink consume topic
if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) {
newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES, configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));
Expand Down

0 comments on commit 3bfdef8

Please sign in to comment.