diff --git a/.github/workflows/build-main-branches.yml b/.github/workflows/build-main-branches.yml index 42628a38d3..b931df4f13 100644 --- a/.github/workflows/build-main-branches.yml +++ b/.github/workflows/build-main-branches.yml @@ -30,7 +30,7 @@ jobs: 'version': '21', }, { - 'version': '11', + 'version': '21', 'build_opts': '-Pslow-flaky-tests -Drevapi.skip=true', 'name': 'Slow and Flaky tests', 'continue-on-error': true diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index af2ba97a5c..c4d20607c2 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -29,7 +29,7 @@ jobs: 'version': '21', }, { - 'version': '11', + 'version': '21', 'build_opts': '-Pslow-flaky-tests -Drevapi.skip=true', 'name': 'Slow and Flaky tests', 'continue-on-error': true diff --git a/pom.xml b/pom.xml index 897db3e4da..9bf6910842 100644 --- a/pom.xml +++ b/pom.xml @@ -57,9 +57,10 @@ - 11 - 11 - 11 + true + 21 + 21 + 21 4.5.1 2.2.21 diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/EmitterLatencyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/EmitterLatencyTest.java index bea26c7abe..5a90f837ed 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/EmitterLatencyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/EmitterLatencyTest.java @@ -1,12 +1,17 @@ package io.smallrye.reactive.messaging.kafka.client; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -17,15 +22,16 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.OnOverflow; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import eu.rekawek.toxiproxy.model.ToxicDirection; import io.smallrye.reactive.messaging.kafka.TestTags; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionProxyTestBase; +import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -import eu.rekawek.toxiproxy.model.ToxicDirection; - @Tag(TestTags.SLOW) public class EmitterLatencyTest extends KafkaCompanionProxyTestBase { @@ -43,7 +49,7 @@ void tearDown() throws IOException { } @Test - public void testHighLatencyEmitter() throws IOException { + public void testHighLatencyEmitter() throws IOException, ExecutionException, InterruptedException, TimeoutException { MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") .with("topic", topic) .with("value.serializer", IntegerSerializer.class.getName()); @@ -55,20 +61,32 @@ public void testHighLatencyEmitter() throws IOException { toxics().latency("latency", ToxicDirection.UPSTREAM, 5000); + List> sendAcks = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 10000; i++) { int j = i; - System.out.println("record :" + i); - app.send(Message.of(j)); + CompletableFuture future = new CompletableFuture<>(); + sendAcks.add(future); + KafkaLogging.log.warn("record :" + j); + app.send(Message.of(j, () -> { + KafkaLogging.log.warn("sent " + j); + future.complete(null); + return future; + })); } + CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new)) + .get(2, TimeUnit.MINUTES); + toxics().get("latency").remove(); companion.consumeIntegers().fromTopics(topic, 10000) - .awaitCompletion(); + .awaitCompletion(Duration.of(2, ChronoUnit.MINUTES)); } @Test - public void testHighLatencyEmitterWithBuffer() throws IOException { + public void testHighLatencyEmitterWithBuffer() + throws IOException, ExecutionException, InterruptedException, TimeoutException { MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") .with("topic", topic) .with("value.serializer", IntegerSerializer.class.getName()); @@ -80,12 +98,23 @@ public void testHighLatencyEmitterWithBuffer() throws IOException { toxics().latency("latency", ToxicDirection.UPSTREAM, 5000); + List> sendAcks = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 10000; i++) { int j = i; - System.out.println("record :" + i); - app.send(Message.of(j)); + CompletableFuture future = new CompletableFuture<>(); + sendAcks.add(future); + KafkaLogging.log.warn("record :" + j); + app.send(Message.of(j, () -> { + KafkaLogging.log.warn("sent " + j); + future.complete(null); + return future; + })); } + CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new)) + .get(2, TimeUnit.MINUTES); + toxics().get("latency").remove(); companion.consumeIntegers().fromTopics(topic, 10000) @@ -93,7 +122,8 @@ public void testHighLatencyEmitterWithBuffer() throws IOException { } @Test - public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionException, InterruptedException, TimeoutException, IOException { + public void testHighLatencyEmitterNoWaitWriteCompletion() + throws ExecutionException, InterruptedException, TimeoutException, IOException { MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") .with("topic", topic) @@ -109,11 +139,11 @@ public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionExcept List> sendAcks = new CopyOnWriteArrayList<>(); for (int i = 0; i < 10000; i++) { int j = i; - System.out.println("record :" + i); + KafkaLogging.log.warn("record :" + j); CompletableFuture future = new CompletableFuture<>(); sendAcks.add(future); app.send(Message.of(j, () -> { - System.out.println("sent " + j); + KafkaLogging.log.warn("sent " + j); future.complete(null); return future; })); @@ -128,6 +158,127 @@ public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionExcept .awaitCompletion(); } + @Test + public void testHighLatencySynchronizedEmitterVirtualThreaded() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") + .with("topic", topic) + .with("value.serializer", IntegerSerializer.class.getName()); + + SynchronizedEmitterSender app = runApplication(config, SynchronizedEmitterSender.class); + + // here not creating the topic upfront causes too much wait + // companion.topics().create(topic, 5); + + toxics().latency("latency", ToxicDirection.UPSTREAM, 5000); + + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + List> sendAcks = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 10000; i++) { + int j = i; + CompletableFuture future = new CompletableFuture<>(); + sendAcks.add(future); + executor.execute(() -> { + KafkaLogging.log.warn("record :" + j); + app.send(Message.of(j, () -> { + KafkaLogging.log.warn("sent " + j); + future.complete(null); + return future; + })); + }); + } + + CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new)) + .get(2, TimeUnit.MINUTES); + + toxics().get("latency").remove(); + + companion.consumeIntegers().fromTopics(topic, 10000) + .awaitCompletion(Duration.of(2, ChronoUnit.MINUTES)); + } + + @Test + @Disabled + public void testHighLatencyEmitterVirtualThreaded() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") + .with("topic", topic) + .with("value.serializer", IntegerSerializer.class.getName()); + + EmitterSender app = runApplication(config, EmitterSender.class); + + // here not creating the topic upfront causes too much wait + // companion.topics().create(topic, 5); + + toxics().latency("latency", ToxicDirection.UPSTREAM, 5000); + + // This will fail + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + List> sendAcks = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 10000; i++) { + int j = i; + CompletableFuture future = new CompletableFuture<>(); + sendAcks.add(future); + executor.execute(() -> { + KafkaLogging.log.warn("record :" + j); + app.send(Message.of(j, () -> { + KafkaLogging.log.warn("sent " + j); + future.complete(null); + return future; + })); + }); + } + + CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new)) + .get(2, TimeUnit.MINUTES); + + toxics().get("latency").remove(); + + companion.consumeIntegers().fromTopics(topic, 10000) + .awaitCompletion(Duration.of(2, ChronoUnit.MINUTES)); + } + + @Test + public void testHighLatencyEmitterMultiThreaded() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out") + .with("topic", topic) + .with("value.serializer", IntegerSerializer.class.getName()); + + EmitterSender app = runApplication(config, EmitterSender.class); + + // here not creating the topic upfront causes too much wait + // companion.topics().create(topic, 5); + + toxics().latency("latency", ToxicDirection.UPSTREAM, 5000); + + // This will probably pass because ThrowingEmitter.requested doesn't take buffer size into account, + // and will spill into the buffer, without hitting the "Insufficient downstream requests" + ExecutorService executor = Executors.newFixedThreadPool(100); + List> sendAcks = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 10000; i++) { + int j = i; + CompletableFuture future = new CompletableFuture<>(); + sendAcks.add(future); + executor.execute(() -> { + KafkaLogging.log.warn("record :" + j); + app.send(Message.of(j, () -> { + KafkaLogging.log.warn("sent " + j); + future.complete(null); + return future; + })); + }); + } + + CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new)) + .get(2, TimeUnit.MINUTES); + + toxics().get("latency").remove(); + + companion.consumeIntegers().fromTopics(topic, 10000) + .awaitCompletion(Duration.of(2, ChronoUnit.MINUTES)); + } + @ApplicationScoped public static class EmitterSender { @@ -152,6 +303,39 @@ private void sleep() { } } + @ApplicationScoped + public static class SynchronizedEmitterSender { + + @Inject + @Channel("out") + Emitter emitter; + + ReentrantLock lock = new ReentrantLock(); + + void send(Message value) { + lock.lock(); + try { + while (!emitter.hasRequests()) { + sleep(); + } + emitter.send(value); + } catch (Throwable t) { + throw t; + } finally { + lock.unlock(); + } + } + + private void sleep() { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + System.out.println("Thread was interrupted to send messages to the Kafka topic. " + ex.getMessage()); + Thread.currentThread().interrupt(); + } + } + } + @ApplicationScoped public static class BufferedEmitterSender { diff --git a/smallrye-reactive-messaging-kafka/src/test/resources/log4j.properties b/smallrye-reactive-messaging-kafka/src/test/resources/log4j.properties index ff05d7e598..e9308a3289 100644 --- a/smallrye-reactive-messaging-kafka/src/test/resources/log4j.properties +++ b/smallrye-reactive-messaging-kafka/src/test/resources/log4j.properties @@ -2,7 +2,7 @@ log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %t [%c] - %m%n #log4j.logger.org.apache.kafka=debug log4j.logger.org.apache.kafka.clients=WARN