Skip to content

Commit

Permalink
Added test to simulate Kafka producer latency handling with emitters.
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Feb 1, 2024
1 parent f2bc998 commit 1859f19
Showing 1 changed file with 179 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package io.smallrye.reactive.messaging.kafka.client;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.kafka.TestTags;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionProxyTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

import eu.rekawek.toxiproxy.model.ToxicDirection;

@Tag(TestTags.SLOW)
public class EmitterLatencyTest extends KafkaCompanionProxyTestBase {

@AfterEach
void tearDown() throws IOException {
toxics().getAll().stream().filter(toxic -> toxic.getName().equals("latency"))
.findFirst()
.ifPresent(toxic -> {
try {
toxic.remove();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testHighLatencyEmitter() throws IOException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());

EmitterSender app = runApplication(config, EmitterSender.class);

// here not creating the topic upfront causes too much wait
companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
app.send(Message.of(j));
}

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion();
}

@Test
public void testHighLatencyEmitterWithBuffer() throws IOException {
MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName());

BufferedEmitterSender app = runApplication(config, BufferedEmitterSender.class);

// don't create the topic upfront
// companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
app.send(Message.of(j));
}

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion();
}

@Test
public void testHighLatencyEmitterNoWaitWriteCompletion() throws ExecutionException, InterruptedException, TimeoutException, IOException {

MapBasedConfig config = kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.with("value.serializer", IntegerSerializer.class.getName())
.with("waitForWriteCompletion", false);

EmitterSender app = runApplication(config, EmitterSender.class);

companion.topics().create(topic, 5);

toxics().latency("latency", ToxicDirection.UPSTREAM, 5000);

List<CompletableFuture<Void>> sendAcks = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
int j = i;
System.out.println("record :" + i);
CompletableFuture<Void> future = new CompletableFuture<>();
sendAcks.add(future);
app.send(Message.of(j, () -> {
System.out.println("sent " + j);
future.complete(null);
return future;
}));
}

CompletableFuture.allOf(sendAcks.stream().toArray(CompletableFuture[]::new))
.get(1, TimeUnit.MINUTES);

toxics().get("latency").remove();

companion.consumeIntegers().fromTopics(topic, 10000)
.awaitCompletion();
}

@ApplicationScoped
public static class EmitterSender {

@Inject
@Channel("out")
Emitter<Integer> emitter;

void send(Message<Integer> value) {
while (!emitter.hasRequests()) {
sleep();
}
emitter.send(value);
}

private void sleep() {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
System.out.println("Thread was interrupted to send messages to the Kafka topic. " + ex.getMessage());
Thread.currentThread().interrupt();
}
}
}

@ApplicationScoped
public static class BufferedEmitterSender {

@Inject
@Channel("out")
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1024)
Emitter<Integer> emitter;

void send(Message<Integer> value) {
while (!emitter.hasRequests()) {
sleep();
}
emitter.send(value);
}

private void sleep() {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
System.out.println("Thread was interrupted to send messages to the Kafka topic. " + ex.getMessage());
Thread.currentThread().interrupt();
}
}
}
}

0 comments on commit 1859f19

Please sign in to comment.