Skip to content

Commit

Permalink
emitter latency
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Feb 2, 2024
1 parent 1859f19 commit 51fb2d2
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-main-branches.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
'version': '21',
},
{
'version': '11',
'version': '21',
'build_opts': '-Pslow-flaky-tests -Drevapi.skip=true',
'name': 'Slow and Flaky tests',
'continue-on-error': true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
'version': '21',
},
{
'version': '11',
'version': '21',
'build_opts': '-Pslow-flaky-tests -Drevapi.skip=true',
'name': 'Slow and Flaky tests',
'continue-on-error': true
Expand Down
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@


<properties>
<maven.compiler.release>11</maven.compiler.release>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<format.skip>true</format.skip>
<maven.compiler.release>21</maven.compiler.release>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>

<vertx.version>4.5.1</vertx.version>
<rxjava.version>2.2.21</rxjava.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.smallrye.reactive.messaging.kafka.client;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -17,15 +22,16 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import io.smallrye.reactive.messaging.kafka.TestTags;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionProxyTestBase;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

import eu.rekawek.toxiproxy.model.ToxicDirection;

@Tag(TestTags.SLOW)
public class EmitterLatencyTest extends KafkaCompanionProxyTestBase {

Expand All @@ -43,7 +49,7 @@ void tearDown() throws IOException {
}

@Test
public void testHighLatencyEmitter() throws IOException {
public void testHighLatencyEmitter() throws IOException, ExecutionException, InterruptedException, TimeoutException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());
Expand All @@ -55,20 +61,32 @@ public void testHighLatencyEmitter() throws IOException {

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();

for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
app.send(Message.of(j));
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
KafkaLogging.log.warn("record :" + j);
app.send(Message.of(j, () -> {
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
}

CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new))
.get(2, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion();
.awaitCompletion(Duration.of(2, ChronoUnit.MINUTES));
}

@Test
public void testHighLatencyEmitterWithBuffer() throws IOException {
public void testHighLatencyEmitterWithBuffer()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());
Expand All @@ -80,20 +98,32 @@ public void testHighLatencyEmitterWithBuffer() throws IOException {

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();

for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
app.send(Message.of(j));
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
KafkaLogging.log.warn("record :" + j);
app.send(Message.of(j, () -> {
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
}

CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new))
.get(2, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion();
}

@Test
public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionException, InterruptedException, TimeoutException, IOException {
public void testHighLatencyEmitterNoWaitWriteCompletion()
throws ExecutionException, InterruptedException, TimeoutException, IOException {

MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
Expand All @@ -109,11 +139,11 @@ public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionExcept
List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
KafkaLogging.log.warn("record :" + j);
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
app.send(Message.of(j, () -> {
System.out.println("sent " + j);
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
Expand All @@ -128,6 +158,127 @@ public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionExcept
.awaitCompletion();
}

@Test
public void testHighLatencySynchronizedEmitterVirtualThreaded()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());

SynchronizedEmitterSender app = runApplication(config, SynchronizedEmitterSender.class);

// here not creating the topic upfront causes too much wait
// companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
int j = i;
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
executor.execute(() -> {
KafkaLogging.log.warn("record :" + j);
app.send(Message.of(j, () -> {
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
});
}

CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new))
.get(2, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion(Duration.of(2, ChronoUnit.MINUTES));
}

@Test
@Disabled
public void testHighLatencyEmitterVirtualThreaded()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());

EmitterSender app = runApplication(config, EmitterSender.class);

// here not creating the topic upfront causes too much wait
// companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

// This will fail
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
int j = i;
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
executor.execute(() -> {
KafkaLogging.log.warn("record :" + j);
app.send(Message.of(j, () -> {
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
});
}

CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new))
.get(2, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion(Duration.of(2, ChronoUnit.MINUTES));
}

@Test
public void testHighLatencyEmitterMultiThreaded()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());

EmitterSender app = runApplication(config, EmitterSender.class);

// here not creating the topic upfront causes too much wait
// companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

// This will probably pass because ThrowingEmitter.requested doesn't take buffer size into account,
// and will spill into the buffer, without hitting the "Insufficient downstream requests"
ExecutorService executor = Executors.newFixedThreadPool(100);
List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
int j = i;
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
executor.execute(() -> {
KafkaLogging.log.warn("record :" + j);
app.send(Message.of(j, () -> {
KafkaLogging.log.warn("sent " + j);
future.complete(null);
return future;
}));
});
}

CompletableFuture.allOf(sendAcks.toArray(CompletableFuture[]::new))
.get(2, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion(Duration.of(2, ChronoUnit.MINUTES));
}

@ApplicationScoped
public static class EmitterSender {

Expand All @@ -152,6 +303,39 @@ private void sleep() {
}
}

@ApplicationScoped
public static class SynchronizedEmitterSender {

@Inject
@Channel("out")
Emitter<Integer> emitter;

ReentrantLock lock = new ReentrantLock();

void send(Message<Integer> value) {
lock.lock();
try {
while (!emitter.hasRequests()) {
sleep();
}
emitter.send(value);
} catch (Throwable t) {
throw t;
} finally {
lock.unlock();
}
}

private void sleep() {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
System.out.println("Thread was interrupted to send messages to the Kafka topic. " + ex.getMessage());
Thread.currentThread().interrupt();
}
}
}

@ApplicationScoped
public static class BufferedEmitterSender {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %t [%c] - %m%n

#log4j.logger.org.apache.kafka=debug
log4j.logger.org.apache.kafka.clients=WARN
Expand Down

0 comments on commit 51fb2d2

Please sign in to comment.