Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit b656167

Browse files
author
Pavel Batanov
authoredMar 2, 2021
Merge pull request #2 from lamoda/feature/rdkafka-4
Support RDKafka 4
2 parents 34215c6 + 005b531 commit b656167

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed
 

‎composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
],
1313
"require": {
1414
"php": ">=7.1",
15-
"codeception/codeception": "~2.5"
15+
"ext-rdkafka": "^3 || ^4",
16+
"codeception/codeception": "^2.5 | ^3.0 | ^4.0"
1617
},
1718
"autoload": {
1819
"psr-4": {

‎src/Extension/KafkaModule.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Codeception\Module;
88
use Exception;
9+
use Lamoda\Codeception\Extension\MessageSerializer\ArrayMessageSerializer;
910
use Lamoda\Codeception\Extension\MessageSerializer\MessageSerializerInterface;
1011
use RdKafka\Conf;
1112
use RdKafka\Consumer;
@@ -17,6 +18,7 @@
1718
class KafkaModule extends Module
1819
{
1920
protected const DEFAULT_PARTITION = 0;
21+
protected const FLUSH_TIMEOUT_MS = 3000;
2022

2123
/**
2224
* @var MessageSerializerInterface
@@ -83,6 +85,10 @@ public function putMessageInTopic(string $topicName, string $message, ?int $part
8385
$topic = $producer->newTopic($topicName, $this->topicConf);
8486

8587
$topic->produce($partition ?? static::DEFAULT_PARTITION, 0, $message);
88+
89+
if (method_exists($producer, 'flush')) {
90+
$producer->flush(self::FLUSH_TIMEOUT_MS);
91+
}
8692
}
8793

8894
public function putMessageListInTopic(string $topicName, array $messages, ?int $partition = null): void

0 commit comments

Comments
 (0)