-
Notifications
You must be signed in to change notification settings - Fork 642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #4897] Implement retry consuming based on pulsar message middleware #4898
base: master
Are you sure you want to change the base?
[ISSUE #4897] Implement retry consuming based on pulsar message middleware #4898
Conversation
…pache#4389) * [ISSUE apache#4388] Improve the rocketmq source connector offset ack * update comments
[ISSUE apache#4371] Move Rabbitmq plugin into Connector from Storage plugin moudle
Update .asf.yaml
…pache#4383) * apache#4195 issue: Update SubscribeProcessor.java * apache#4195 issue: Modified SubscribeProcessor.java * apache#4195 issue:Fixed SubscribeProcessor.java * apache#4195 issue: Changed SubscribeProcessor.java * apache#4195 issue:Resolved Syntax Error in SubscribeProcessor.java * Commiting after removing CI Merge Errors
…apache#4344) * chore: add final to cacheWebHookConfig * fix: write cache only when MODIFY to avoid write empty config when CREATE * chore: correct log grammar mistake * feat: add try-catch and comments * fix: Wait for the notification of file write completion before initializing the cache * fix: use a shared latch * doc: add comment for cacheInit to explain CountDownLatch * feat: add object lock to ensure the atomicity of countDown in case of concurrency * feat: use synchronized instead of CountDownLatch * fix: expand synchronize zone to avoid file deleted before cacheInit * fix checkstyle
…che#4377) * add k8s doc * add k8s doc * k8s doc * k8s doc * k8s doc * k8s doc * k8s doc * eventmesh-operator doc
…in-api (apache#4395) * config: rabbitmq spi without webhook, RESTORE BEFORE MERGE * config: add extensionType in META-INF * fix: resolve extension not found error when startup * feat: add rabbitmq management config * feat: provide limited admin func * Revert "config: rabbitmq spi without webhook, RESTORE BEFORE MERGE" This reverts commit a835cbf. * doc: add PR link to README
apache#4401) * fix: dos2unix shell scripts & remove shell gitignore * fix: use bash instead of sh by default * fix: Verify if the previous process terminated abnormally for stop.sh * fix: Verify if the previous process terminated abnormally for start.sh and optimize output * Optimize: Standardise shell syntax
…e-plugins (apache#4404) * fix: remove knative circulate dependency * feat: realize knative admin-api * feat: realize mongodb admin-api * feat: realize pulsar admin-api * feat: realize pravega admin-api * fix: deliver a config to client instead of null * chore: revert startup plugin config
…rotocol (apache#4333) * refactor: enhance http and tcp server style * fix: create trace-plugin
…lugin moudle (apache#4408) * feat: refactor with pravega plugin. * feat: refactor with pravega plugin. * feat: refactor with pravega plugin. * feat: refactor with pravega plugin. * feat: refactor with pravega plugin. * feat: fix redis admin spi. * feat: refactor with pravega plugin. * feat: refactor with pravega plugin. * feat: refactor with pravega plugin.
* feat: fix redis admin spi. * feat: fix redis admin spi. * feat: fix redis admin spi. * feat: fix redis admin spi. * feat: fix redis admin spi.
…pache#4827) * Sync changes in apache#4719 * minor change * Only keep the artifact name * Run `sed -i 's/-[0-9].*\.jar//g'` * Run `sort known-dependencies.txt | uniq > known-dependencies-unique.txt` * Allow CI to run on branches with namespace in the branch name in forked repos * Correct typo and remove useless command * Use `sort -u -o` instead of `uniq` to remove duplicate artifacts with different version * Enlarge open-pull-requests-limit * minor: polish tips * Test apache/skywalking-eyes/dependency CI result * Fix 'unable to find version `0.6.0`' * See debug log to prove it works * skywalking-eyes/dependency doesn't support gradle, test basic actions/dependency-review-action * Add all denied licenses * Remove redundant check * Remove not included SPDX: ASL, RSAL * Add a useful printAllDependencyTrees task * Exampt safe artifact under multiple licenses * Exempt more safe artifacts (Looks like the last of them) * 'allow-dependencies-licenses' attribute only supports single-line text * Add a TODO comment * Add more file extensions for checkstyle * Resolve some checkstyle header violations * Add back apache/skywalking-eyes * Fix downloaded file didn't have a `.` * Disable Go deps update & Must pass CI before merge * No need to force up-to-date & Auto-approve only * Remove the slash at the end of the homepage url in Repo GitHub desc * Skip patch updates temporarily to reduce PR noise * Logback removed after apache@be06ef7 * Accept patch update * Submit dependency graph * Follow https://github.com/gradle/actions/blob/main/docs/dependency-submission.md#usage-with-pull-requests-from-public-forked-repositories * try to sort dependency graph workflow exec seq * `workflow_run` event will only trigger a workflow run if the workflow file is on the default branch * Grant required permission of CodeQL * Attempt to fix 'No dependency graph files found to submit' * Attempt to fix 'No dependency graph files found to submit' try 2 * Attempt to fix 'No dependency graph files found to submit' try 3 * Attempt to fix 'No dependency graph files found to submit' try 4 * Try to check dependency-review * Only check bundled dependencies * Fix 'No snapshots were found for the head SHA' attempt 1 * Test runtimeClasspath dependencies * Revert "Test runtimeClasspath dependencies" This reverts commit 3de89a5. * Try to retry 1 hr wo wait for snapshot update * Test gradle/actions#196 (comment) * Add todo comments * Keep implementation and compileOnly for now * Keep runtimeOnly deps * [Breaking Change] Remove dependency-review-action and wait for its bugfix * Add checkDeniedLicense into CI * minor code optimization
Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](actions/checkout@v3...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [docker/login-action](https://github.com/docker/login-action) from 2 to 3. - [Release notes](https://github.com/docker/login-action/releases) - [Commits](docker/login-action@v2...v3) --- updated-dependencies: - dependency-name: docker/login-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [docker/metadata-action](https://github.com/docker/metadata-action) from 4 to 5. - [Release notes](https://github.com/docker/metadata-action/releases) - [Upgrade guide](https://github.com/docker/metadata-action/blob/master/UPGRADE.md) - [Commits](docker/metadata-action@v4...v5) --- updated-dependencies: - dependency-name: docker/metadata-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [com.fasterxml.jackson.core:jackson-annotations](https://github.com/FasterXML/jackson) from 2.13.0 to 2.17.1. - [Commits](https://github.com/FasterXML/jackson/commits) --- updated-dependencies: - dependency-name: com.fasterxml.jackson.core:jackson-annotations dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [docker/build-push-action](https://github.com/docker/build-push-action) from 4 to 5. - [Release notes](https://github.com/docker/build-push-action/releases) - [Commits](docker/build-push-action@v4...v5) --- updated-dependencies: - dependency-name: docker/build-push-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps org.slf4j:slf4j-api from 2.0.9 to 2.0.13. --- updated-dependencies: - dependency-name: org.slf4j:slf4j-api dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [org.javassist:javassist](https://github.com/jboss-javassist/javassist) from 3.24.0-GA to 3.30.2-GA. - [Release notes](https://github.com/jboss-javassist/javassist/releases) - [Changelog](https://github.com/jboss-javassist/javassist/blob/master/Changes.md) - [Commits](https://github.com/jboss-javassist/javassist/commits) --- updated-dependencies: - dependency-name: org.javassist:javassist dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [org.apache.pulsar:pulsar-client](https://github.com/apache/pulsar) from 2.10.1 to 2.11.4. - [Release notes](https://github.com/apache/pulsar/releases) - [Commits](apache/pulsar@v2.10.1...v2.11.4) --- updated-dependencies: - dependency-name: org.apache.pulsar:pulsar-client dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps `log4jVersion` from 2.22.1 to 2.23.1. Updates `org.apache.logging.log4j:log4j-api` from 2.22.1 to 2.23.1 Updates `org.apache.logging.log4j:log4j-core` from 2.22.1 to 2.23.1 Updates `org.apache.logging.log4j:log4j-slf4j2-impl` from 2.22.1 to 2.23.1 --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-api dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.logging.log4j:log4j-core dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.logging.log4j:log4j-slf4j2-impl dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Bump org.apache.kafka:kafka-clients from 3.0.0 to 3.7.0 Bumps org.apache.kafka:kafka-clients from 3.0.0 to 3.7.0. --- updated-dependencies: - dependency-name: org.apache.kafka:kafka-clients dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Use stable version * Use stable version --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Pil0tXia <xiatian@apache.org>
Bumps [org.mongodb:mongodb-driver](https://github.com/mongodb/mongo-java-driver) from 3.12.11 to 3.12.14. - [Release notes](https://github.com/mongodb/mongo-java-driver/releases) - [Commits](mongodb/mongo-java-driver@r3.12.11...r3.12.14) --- updated-dependencies: - dependency-name: org.mongodb:mongodb-driver dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [io.cloudevents:cloudevents-kafka](https://github.com/cloudevents/sdk-java) from 2.2.1 to 2.5.0. - [Release notes](https://github.com/cloudevents/sdk-java/releases) - [Commits](cloudevents/sdk-java@2.2.1...2.5.0) --- updated-dependencies: - dependency-name: io.cloudevents:cloudevents-kafka dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…y-based-on-pulsar-message-middleware
.enableRetry(true) | ||
.deadLetterPolicy(DeadLetterPolicy.builder() | ||
.deadLetterTopic(dlqTopic) | ||
.retryLetterTopic(retryTopic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have defined the retry topic here, why do you still need to manually send the message to the retry topic in PulsarRetryStrategyImpl
? Doesn't Pulsar automatically send failed messages to the retry topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} catch (Exception e) { | ||
ackConsumer.negativeAcknowledge(msg); | ||
try { | ||
ackConsumer.reconsumeLater(msg, 5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For negativeAcknowledge()
and reconsumeLater()
, the former will automatically re-consume messages from the retry topic retryTopic
, while the latter will re-consume messages from the current topic subTopic
. Why do you need to use both? Have I misunderstood?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The negativeAcknowledge() method enables the redelivery of failed messages by sending them back to the main queue with the main topic. The reconsumeLater() method, on the other hand, handles retries by moving the failed messages to a special retry topic stored in a retry queue, where they will be reprocessed after a specified delay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the correction. So why? There are two retries here, and the default retry of EventMesh. So why does Pulsar's retry need to be set to three times instead of just once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compared with negative acknowledgment, retry letter topic is more suitable for messages that require a large number of retries with a configurable retry interval.The messages in the retry letter topic are persisted to BookKeeper, while messages that need to be retried due to negative acknowledgment are cached on the client side.Negative acks and retry/DLQ do work together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Negative acks and retry/DLQ do work together
Are you sure it's a collaboration and not duplicate consumption? If you are sure, could you explain this: how can it ensure that only one of the mechanisms will be triggered for retries when there are failed messages, given that both mechanisms exist simultaneously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I had a misunderstanding. The negative acknowledgment will cause duplicate consumption with active retry. I have fixed it.
…tegy-based-on-pulsar-message-middleware' into apache#4556-Implement-retry-strategy-based-on-pulsar-message-middleware
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that the original design of the retry module seems to have some problems, but I'm not sure yet. I need to close issue #4556 first. However, your PR can serve as the implementation of a separate issue and will not be affected.
Based on my current understanding of Pulsar, I have completed the review to the best of my ability. As I said in your last PR, I am not very familiar with Pulsar, so I can't guarantee the quality of my review this time. Please forgive my lack of technical expertise. Reviewers should be responsible for the first "approved" of each PR, so please wait for further review from more members of the community. |
It has been 60 days since the last activity on this pull request. I am reaching out here to gently remind you that the Apache EventMesh community values every pull request, and please feel free to get in touch with the reviewers at any time. They are available to assist you in advancing the progress of your pull request and offering the latest feedback. If you encounter any challenges during development, seeking support within the community is encouraged. We sincerely appreciate your contributions to Apache EventMesh. |
Fixes #4897
Motivation
This implementation will add retry consuming on Pulsar message middleware
Modifications
Implemented the code for supporting retry based on Pulsar
Documentation