Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 34215c6

Browse files
author
Anton Dorozhkin
committed
0.0.1 version
1 parent 71aafcd commit 34215c6

File tree

8 files changed

+363
-1
lines changed

8 files changed

+363
-1
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/vendor/
2+
/composer.lock
3+
/.php_cs.cache
4+
/tests/_support/_generated/

.php_cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
$finder = PhpCsFixer\Finder::create()
4+
->exclude('vendor')
5+
->in(__DIR__);
6+
7+
return PhpCsFixer\Config::create()
8+
->setRules([
9+
'@Symfony' => true,
10+
'concat_space' => ['spacing' => 'one'],
11+
'phpdoc_align' => false,
12+
'phpdoc_to_comment' => false,
13+
'header_comment' => false,
14+
])
15+
->setFinder($finder);

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
WORKING_DIR=$(CURDIR)
2+
3+
php-cs-check:
4+
$(WORKING_DIR)/vendor/bin/php-cs-fixer fix --dry-run --format=junit --diff
5+
6+
php-cs-fix:
7+
$(WORKING_DIR)/vendor/bin/php-cs-fixer fix
8+
9+
test-unit:
10+
./vendor/bin/codecept run unit

README.md

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,73 @@
1-
# codeception-kafka
1+
# Codeception Kafka Extension
2+
3+
## THIS MODULE IS NOT PRODUCTION READY
4+
5+
This extension supports working with Apache Kafka.
6+
7+
## Installation
8+
9+
1. Install library
10+
```bash
11+
composer require lamoda/codeception-kafka
12+
```
13+
14+
2. Create message serializer for your data transfer object
15+
16+
```
17+
namespace Tests\KafkaModule;
18+
19+
use App\EventBus\DtoInterface;
20+
use Lamoda\Codeception\Extension\MessageSerializer\MessageSerializerInterface;
21+
22+
class AcmeMessageSerializer implements MessageSerializerInterface
23+
{
24+
public function serialize($dto): string
25+
{
26+
if (!$dto instanceif DtoInterface) {
27+
throw new \RuntimeException('This value must be an ' . DtoInterface::class);
28+
}
29+
30+
$message = json_encode($dto->toArray());
31+
32+
if (!is_string($message)) {
33+
throw new \RuntimeException(json_last_error(), json_last_error_msg());
34+
}
35+
36+
return $message;
37+
}
38+
}
39+
```
40+
41+
The default message serializer is Lamoda\Codeception\Extension\MessageSerializer\ArrayMessageSerializer.
42+
43+
2. Include to suite and configure
44+
```yaml
45+
modules:
46+
enabled:
47+
- \Lamoda\Codeception\Extension\KafkaModule
48+
serializer: 'Tests\KafkaModule\AcmeMessageSerializer'
49+
config:
50+
metadata.broker.list: '192.168.99.100:9092'
51+
group.id: 'group_for_tests'
52+
topic_config:
53+
offset.store.sync.interval.ms: '0'
54+
auto.commit.interval.ms: '500'
55+
auto.offset.reset: 'smallest'
56+
```
57+
58+
## Development
59+
60+
### PHP Coding Standards Fixer
61+
62+
```bash
63+
make php-cs-check
64+
make php-cs-fix
65+
```
66+
67+
### Tests
68+
69+
Unit
70+
71+
```bash
72+
make test-unit
73+
```

composer.json

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "lamoda/codeception-kafka",
3+
"description": "Kafka helper for codeception tests",
4+
"type": "library",
5+
"license": "MIT",
6+
"minimum-stability": "stable",
7+
"authors": [
8+
{
9+
"name": "Lamoda developers",
10+
"homepage": "https://tech.lamoda.ru/"
11+
}
12+
],
13+
"require": {
14+
"php": ">=7.1",
15+
"codeception/codeception": "~2.5"
16+
},
17+
"autoload": {
18+
"psr-4": {
19+
"Lamoda\\Codeception\\Extension\\": "src/Extension/"
20+
}
21+
},
22+
"require-dev": {
23+
"friendsofphp/php-cs-fixer": "^2.13",
24+
"kwn/php-rdkafka-stubs": "^1.1.0"
25+
}
26+
}

src/Extension/KafkaModule.php

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Lamoda\Codeception\Extension;
6+
7+
use Codeception\Module;
8+
use Exception;
9+
use Lamoda\Codeception\Extension\MessageSerializer\MessageSerializerInterface;
10+
use RdKafka\Conf;
11+
use RdKafka\Consumer;
12+
use RdKafka\Message;
13+
use RdKafka\Producer;
14+
use RdKafka\Queue;
15+
use RdKafka\TopicConf;
16+
17+
class KafkaModule extends Module
18+
{
19+
protected const DEFAULT_PARTITION = 0;
20+
21+
/**
22+
* @var MessageSerializerInterface
23+
*/
24+
protected $messageSerializer;
25+
26+
/**
27+
* @var Conf
28+
*/
29+
protected $conf;
30+
31+
/**
32+
* @var TopicConf
33+
*/
34+
protected $topicConf;
35+
36+
/**
37+
* @var Consumer
38+
*/
39+
protected $consumer;
40+
41+
/**
42+
* @var Queue
43+
*/
44+
protected $queue;
45+
46+
/**
47+
* @param array $settings
48+
*/
49+
public function _beforeSuite($settings = []): void
50+
{
51+
parent::_beforeSuite();
52+
53+
if (isset($this->config['serializer']) && class_exists($this->config['serializer'])) {
54+
$this->messageSerializer = new $this->config['serializer']();
55+
} else {
56+
$this->messageSerializer = new ArrayMessageSerializer();
57+
}
58+
59+
$this->conf = new Conf();
60+
61+
if (isset($this->config['config']) && is_array($this->config['config'])) {
62+
foreach ($this->config['config'] as $key => $value) {
63+
$this->conf->set($key, $value);
64+
}
65+
}
66+
67+
$this->topicConf = new TopicConf();
68+
69+
if (isset($this->config['topic_config']) && is_array($this->config['topic_config'])) {
70+
foreach ($this->config['topic_config'] as $key => $value) {
71+
$this->topicConf->set($key, $value);
72+
}
73+
}
74+
75+
$this->consumer = new Consumer($this->conf);
76+
$this->queue = $this->consumer->newQueue();
77+
}
78+
79+
public function putMessageInTopic(string $topicName, string $message, ?int $partition = null): void
80+
{
81+
$producer = new Producer($this->conf);
82+
83+
$topic = $producer->newTopic($topicName, $this->topicConf);
84+
85+
$topic->produce($partition ?? static::DEFAULT_PARTITION, 0, $message);
86+
}
87+
88+
public function putMessageListInTopic(string $topicName, array $messages, ?int $partition = null): void
89+
{
90+
foreach ($messages as $message) {
91+
$this->putMessageInTopic($topicName, $message, $partition);
92+
}
93+
}
94+
95+
/**
96+
* @throws Exception
97+
*/
98+
public function seeMessageInTopic(string $topicName, string $message, ?int $partition = null): void
99+
{
100+
$topMessage = $this->readOneMessageByCurrentOffset($topicName, $partition ?? static::DEFAULT_PARTITION);
101+
102+
$this->assertNotNull($topMessage);
103+
$this->assertEquals($message, $topMessage->payload);
104+
}
105+
106+
/**
107+
* @throws Exception
108+
*/
109+
public function readAllMessagesFromTopic(string $topicName, ?int $partition = null, ?string $groupId = null): void
110+
{
111+
$topMessage = true;
112+
113+
while (null !== $topMessage) {
114+
$topMessage = $this->readOneMessageByCurrentOffset(
115+
$topicName,
116+
$partition ?? static::DEFAULT_PARTITION,
117+
$groupId
118+
);
119+
}
120+
}
121+
122+
/**
123+
* @param mixed $dto
124+
*/
125+
public function putDtoInTopic(string $topicName, $dto, ?int $partition = null): void
126+
{
127+
$message = $this->messageSerializer->serialize($dto);
128+
$this->putMessageInTopic($topicName, $message, $partition);
129+
}
130+
131+
/**
132+
* @param mixed $dto
133+
*
134+
* @throws Exception
135+
*/
136+
public function seeDtoInTopic(string $topicName, $dto, ?int $partition = null): void
137+
{
138+
$message = $this->messageSerializer->serialize($dto);
139+
$this->seeMessageInTopic($topicName, $message, $partition);
140+
}
141+
142+
/**
143+
* @throws Exception
144+
*/
145+
public function assertTopicNotContainsUnreadMessages(string $topicName, ?int $partition = null): void
146+
{
147+
$this->assertNull($this->readOneMessageByCurrentOffset($topicName, $partition ?? static::DEFAULT_PARTITION));
148+
}
149+
150+
/**
151+
* @throws Exception
152+
*/
153+
private function readOneMessageByCurrentOffset(string $topicName, int $partition, ?string $groupId = null): ?Message
154+
{
155+
if (null === $groupId) {
156+
$consumer = $this->consumer;
157+
$queue = $this->queue;
158+
} else {
159+
$conf = new Conf();
160+
foreach ($this->config['config'] as $key => $value) {
161+
$conf->set($key, $value);
162+
}
163+
$conf->set('group.id', $groupId);
164+
$consumer = new Consumer($conf);
165+
$queue = $consumer->newQueue();
166+
}
167+
168+
$topic = $consumer->newTopic($topicName, $this->topicConf);
169+
$topic->consumeQueueStart($partition, RD_KAFKA_OFFSET_STORED, $queue);
170+
171+
$message = $queue->consume(2000);
172+
173+
$topic->consumeStop($partition);
174+
175+
return $this->decideUponMessage($message);
176+
}
177+
178+
/**
179+
* @throws Exception
180+
*/
181+
private function decideUponMessage(?Message $message = null): ?Message
182+
{
183+
if (!($message instanceof Message)) {
184+
return null;
185+
}
186+
187+
switch ($message->err) {
188+
case RD_KAFKA_RESP_ERR_NO_ERROR:
189+
return $message;
190+
break;
191+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
192+
return null;
193+
break;
194+
case RD_KAFKA_RESP_ERR__TIMED_OUT:
195+
throw new Exception('Timed out');
196+
break;
197+
default:
198+
throw new Exception($message->errstr(), $message->err);
199+
break;
200+
}
201+
}
202+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Lamoda\Codeception\Extension\MessageSerializer;
6+
7+
class ArrayMessageSerializer implements MessageSerializerInterface
8+
{
9+
public function serialize($dto): string
10+
{
11+
if (!is_array($dto)) {
12+
throw new \RuntimeException('This value must be an array');
13+
}
14+
15+
$message = json_encode($dto);
16+
17+
if (!is_string($message)) {
18+
throw new \RuntimeException(json_last_error(), json_last_error_msg());
19+
}
20+
21+
return $message;
22+
}
23+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Lamoda\Codeception\Extension\MessageSerializer;
6+
7+
interface MessageSerializerInterface
8+
{
9+
public function serialize($dto);
10+
}

0 commit comments

Comments
 (0)