Skip to content

Commit a3aede4

Browse files
authored
Merge pull request #4 from sagacorp/add-retry-request
Add retry request
2 parents c672a40 + 92f330c commit a3aede4

File tree

4 files changed

+158
-79
lines changed

4 files changed

+158
-79
lines changed

src/Queue.php

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,23 @@
1313
*/
1414
class Queue extends \yii\queue\cli\Queue
1515
{
16-
//region Public Properties
16+
// region Public Properties
17+
/**
18+
* use this property to filter job execution on a specific id
19+
* You can use this property when you need to run multiple environments with the same queue at the same time, multiple locals envionnements for example.
20+
*
21+
* @see BrokerProperties::$to
22+
*
23+
* @var string|null
24+
*/
25+
public ?string $id = null;
1726
/**
1827
* @var ServiceBus
1928
*/
2029
public $serviceBus = 'serviceBus';
21-
//endregion Public Properties
30+
// endregion Public Properties
2231

23-
//region Initialization
32+
// region Initialization
2433
/**
2534
* @throws \yii\base\InvalidConfigException
2635
*/
@@ -32,10 +41,9 @@ public function init(): void
3241

3342
$this->serviceBus = Instance::ensure($this->serviceBus, ServiceBus::class);
3443
}
35-
//endregion Initialization
36-
37-
//region Public Methods
44+
// endregion Initialization
3845

46+
// region Public Methods
3947
/**
4048
* Listens queue and runs each job.
4149
*
@@ -54,6 +62,9 @@ function (callable $canContinue) use ($repeat, $timeout) {
5462
$message = $this->serviceBus->receiveMessage(ServiceBus::PEEK_LOCK, $timeout);
5563

5664
if ($message !== null && $message->brokerProperties !== null) {
65+
if ($message->brokerProperties->to && !$message->brokerProperties->isTo($this->id)) {
66+
continue;
67+
}
5768
if ($this->handleMessage($message->brokerProperties->messageId, $message->body, $message->brokerProperties->timeToLive, $message->brokerProperties->deliveryCount)) {
5869
$this->serviceBus->deleteMessage($message);
5970
}
@@ -75,7 +86,7 @@ public function status($id): void
7586
{
7687
throw new NotSupportedException('Status is not supported in the driver.');
7788
}
78-
//endregion Public Methods
89+
// endregion Public Methods
7990

8091
//region Protected Methods
8192
/**
@@ -92,12 +103,13 @@ protected function pushMessage($message, $ttr, $delay, $priority): string
92103
{
93104
$azureMessage = new Message(
94105
[
95-
'body' => $message,
96-
'contentType' => 'application/vnd.microsoft.servicebus.yml',
106+
'body' => $message,
107+
'contentType' => 'application/vnd.microsoft.servicebus.yml',
97108
'brokerProperties' => new BrokerProperties(
98109
[
99110
'timeToLive' => $ttr,
100-
'delay' => $delay,
111+
'delay' => $delay,
112+
'to' => $this->id,
101113
]
102114
),
103115
]
@@ -107,5 +119,5 @@ protected function pushMessage($message, $ttr, $delay, $priority): string
107119

108120
return $azureMessage->brokerProperties->messageId;
109121
}
110-
//endregion Protected Methods
122+
// endregion Protected Methods
111123
}

src/service/BrokerProperties.php

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace sagacorp\queue\azure\service;
44

55
use Carbon\Carbon;
6-
use Carbon\CarbonTimeZone;
76
use yii\base\Model;
87

98
/**
@@ -13,7 +12,7 @@
1312
*/
1413
class BrokerProperties extends Model
1514
{
16-
//region Public Properties
15+
// region Public Properties
1716
/**
1817
* The correlation ID.
1918
*/
@@ -66,24 +65,24 @@ class BrokerProperties extends Model
6665
* The to.
6766
*/
6867
public ?string $to = null;
69-
//endregion Public Properties
68+
// endregion Public Properties
7069

71-
//region Private Properties
70+
// region Private Properties
7271
/**
7372
* The enqueued time.
7473
*/
75-
private ?Carbon $enqueuedTimeUtc;
74+
private ?Carbon $enqueuedTimeUtc = null;
7675
/**
7776
* The locked until time.
7877
*/
79-
private ?Carbon $lockedUntilUtc;
78+
private ?Carbon $lockedUntilUtc = null;
8079
/**
8180
* The scheduled enqueue time.
8281
*/
83-
private ?Carbon $scheduledEnqueueTimeUtc;
84-
//endregion Private Properties
82+
private ?Carbon $scheduledEnqueueTimeUtc = null;
83+
// endregion Private Properties
8584

86-
//region Initialization
85+
// region Initialization
8786
public function init(): void
8887
{
8988
parent::init();
@@ -102,16 +101,16 @@ public function __toString()
102101
$values = [];
103102

104103
$settableProperties = [
105-
'CorrelationId' => 'correlationId',
106-
'SessionId' => 'sessionId',
107-
'MessageId' => 'messageId',
108-
'Label' => 'label',
109-
'ReplyTo' => 'replyTo',
110-
'TimeToLive' => 'timeToLive',
111-
'To' => 'to',
104+
'CorrelationId' => 'correlationId',
105+
'SessionId' => 'sessionId',
106+
'MessageId' => 'messageId',
107+
'Label' => 'label',
108+
'ReplyTo' => 'replyTo',
109+
'TimeToLive' => 'timeToLive',
110+
'To' => 'to',
112111
'ScheduledEnqueueTimeUtc' => 'scheduledEnqueueTimeUtc',
113-
'ReplyToSessionId' => 'replyToSessionId',
114-
'PartitionKey' => 'partitionKey',
112+
'ReplyToSessionId' => 'replyToSessionId',
113+
'PartitionKey' => 'partitionKey',
115114
];
116115

117116
foreach ($settableProperties as $key => $value) {
@@ -122,9 +121,9 @@ public function __toString()
122121

123122
return (string) \json_encode($values, JSON_THROW_ON_ERROR);
124123
}
125-
//endregion Initialization
124+
// endregion Initialization
126125

127-
//region Getters/Setters
126+
// region Getters/Setters
128127
public function getEnqueuedTimeUtc(): ?Carbon
129128
{
130129
return $this->enqueuedTimeUtc;
@@ -135,14 +134,14 @@ public function getLockedUntilUtc(): ?Carbon
135134
return $this->lockedUntilUtc;
136135
}
137136

138-
public function setDelay(int $value): void
137+
public function getScheduledEnqueueTimeUtc(): ?Carbon
139138
{
140-
$this->setScheduledEnqueueTimeUtc(Carbon::now()->addSeconds($value)->setTimezone('UTC'));
139+
return $this->scheduledEnqueueTimeUtc;
141140
}
142141

143-
public function getScheduledEnqueueTimeUtc(): ?Carbon
142+
public function setDelay(int $value): void
144143
{
145-
return $this->scheduledEnqueueTimeUtc;
144+
$this->setScheduledEnqueueTimeUtc(Carbon::now()->addSeconds($value)->setTimezone('UTC'));
146145
}
147146

148147
public function setEnqueuedTimeUtc(Carbon|string $enqueuedTimeUtc): void
@@ -171,9 +170,16 @@ public function setScheduledEnqueueTimeUtc(Carbon|string $scheduledEnqueueTimeUt
171170

172171
$this->scheduledEnqueueTimeUtc = $scheduledEnqueueTimeUtc;
173172
}
174-
//endregion Getters/Setters
173+
// endregion Getters/Setters
174+
175+
// region Public Methods
176+
public function isTo(string $id): bool
177+
{
178+
return $this->to === $id;
179+
}
180+
// endregion Public Methods
175181

176-
//region Protected Methods
182+
// region Protected Methods
177183
protected function azureDateToCarbon(string $date): ?Carbon
178184
{
179185
return Carbon::parse($date, 'UTC') ?: null;
@@ -183,5 +189,5 @@ protected function carbonToAzureDate(Carbon $carbon): string
183189
{
184190
return $carbon->format(\DateTimeInterface::RFC7231);
185191
}
186-
//endregion Protected Methods
192+
// endregion Protected Methods
187193
}

src/service/Request.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
namespace sagacorp\queue\azure\service;
4+
5+
use yii\httpclient\Exception as HttpClientException;
6+
7+
class Request extends \yii\httpclient\Request
8+
{
9+
// region Public Properties
10+
public int $maxRetries;
11+
// endregion Public Properties
12+
13+
// region Protected Properties
14+
protected int $attempts = 0;
15+
// endregion Protected Properties
16+
17+
// region Public Methods
18+
/**
19+
* @throws HttpClientException
20+
*/
21+
public function sendAndRetryOnFailure(array $successStatusCodes): \yii\httpclient\Response
22+
{
23+
try {
24+
$response = $this->send();
25+
26+
if (!in_array($response->statusCode, $successStatusCodes, true)) {
27+
throw new HttpClientException($response->toString());
28+
}
29+
} catch (HttpClientException $e) {
30+
\Yii::error($e);
31+
32+
if (!$this->canContinue($this->attempts)) {
33+
throw $e;
34+
}
35+
36+
$delay = $this->getRetryDelay($this->attempts);
37+
38+
\Yii::error('Retry request in ' . $delay . ' seconds');
39+
sleep($delay);
40+
41+
++$this->attempts;
42+
43+
$response = $this->sendAndRetryOnFailure($successStatusCodes);
44+
}
45+
46+
return $response;
47+
}
48+
49+
// endregion Public Methods
50+
51+
// region Protected Methods
52+
protected function canContinue(int $attempts): bool
53+
{
54+
return $attempts < $this->maxRetries;
55+
}
56+
57+
protected function getRetryDelay(int $attempts): int
58+
{
59+
return 4 ** $attempts;
60+
}
61+
// endregion Protected Methods
62+
}

0 commit comments

Comments
 (0)