Skip to content

Commit 4a97c14

Browse files
committed
dequeue
1 parent e393d56 commit 4a97c14

File tree

4 files changed

+208
-35
lines changed

4 files changed

+208
-35
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ class My_Job
134134
}
135135
```
136136

137+
### Dequeueing Jobs ###
138+
139+
This method can be used to conveniently remove a job from a queue.
140+
141+
```php
142+
// Removes 'My_Job' of queue 'default'
143+
Resque::dequeue('default', ['My_Job']);
144+
```
145+
146+
If no jobs are given, this method will dequeue all jobs matching the provided queue.
147+
148+
```php
149+
// Removes all jobs of queue 'default'
150+
Resque::dequeue('default');
151+
```
152+
137153
### Tracking Job Statuses ###
138154

139155
php-resque has the ability to perform basic status tracking of a queued

lib/Resque.php

Lines changed: 117 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -120,39 +120,55 @@ public static function pop($queue)
120120
return json_decode($item, true);
121121
}
122122

123-
/**
124-
* Pop an item off the end of the specified queues, using blocking list pop,
125-
* decode it and return it.
126-
*
127-
* @param array $queues
128-
* @param int $timeout
129-
* @return null|array Decoded item from the queue.
130-
*/
131-
public static function blpop(array $queues, $timeout)
132-
{
133-
$list = array();
134-
foreach($queues AS $queue) {
135-
$list[] = 'queue:' . $queue;
136-
}
137-
138-
$item = self::redis()->blpop($list, (int)$timeout);
139-
140-
if(!$item) {
141-
return;
142-
}
143-
144-
/**
145-
* Normally the Resque_Redis class returns queue names without the prefix
146-
* But the blpop is a bit different. It returns the name as prefix:queue:name
147-
* So we need to strip off the prefix:queue: part
148-
*/
149-
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
150-
151-
return array(
152-
'queue' => $queue,
153-
'payload' => json_decode($item[1], true)
154-
);
155-
}
123+
/**
124+
* Remove items of the specified queue
125+
*
126+
* @param string $queue The name of the queue to fetch an item from.
127+
* @param array $items
128+
* @return integer number of deleted items
129+
*/
130+
public static function dequeue($queue, $items = Array())
131+
{
132+
if(count($items) > 0) {
133+
return self::removeItems($queue, $items);
134+
} else {
135+
return self::removeList($queue);
136+
}
137+
}
138+
139+
/**
140+
* Pop an item off the end of the specified queues, using blocking list pop,
141+
* decode it and return it.
142+
*
143+
* @param array $queues
144+
* @param int $timeout
145+
* @return null|array Decoded item from the queue.
146+
*/
147+
public static function blpop(array $queues, $timeout)
148+
{
149+
$list = array();
150+
foreach($queues AS $queue) {
151+
$list[] = 'queue:' . $queue;
152+
}
153+
154+
$item = self::redis()->blpop($list, (int)$timeout);
155+
156+
if(!$item) {
157+
return;
158+
}
159+
160+
/**
161+
* Normally the Resque_Redis class returns queue names without the prefix
162+
* But the blpop is a bit different. It returns the name as prefix:queue:name
163+
* So we need to strip off the prefix:queue: part
164+
*/
165+
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
166+
167+
return array(
168+
'queue' => $queue,
169+
'payload' => json_decode($item[1], true)
170+
);
171+
}
156172

157173
/**
158174
* Return the size (number of pending jobs) of the specified queue.
@@ -215,4 +231,72 @@ public static function queues()
215231
}
216232
return $queues;
217233
}
234+
235+
/**
236+
* Remove Items from the queue
237+
* Safely moving each item to a temporary queue before processing it
238+
* If the Job matches, counts otherwise puts it in a requeue_queue
239+
* which at the end eventually be copied back into the original queue
240+
*
241+
* @private
242+
*
243+
* @param string $queue The name of the queue
244+
* @param array $items
245+
* @return integer number of deleted items
246+
*/
247+
private static function removeItems($queue, $items = Array())
248+
{
249+
$counter = 0;
250+
$originalQueue = 'queue:'. $queue;
251+
$tempQueue = $originalQueue. ':temp:'. time();
252+
$requeueQueue = $tempQueue. ':requeue';
253+
254+
// move each item from original queue to temp queue and process it
255+
$finished = false;
256+
while(!$finished) {
257+
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
258+
259+
if(!empty($string)) {
260+
$decoded = json_decode($string, true);
261+
if(in_array($decoded['class'], $items)) {
262+
$counter++;
263+
} else {
264+
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
265+
}
266+
} else {
267+
$finished = true;
268+
}
269+
}
270+
271+
// move back from temp queue to original queue
272+
$finished = false;
273+
while(!$finished) {
274+
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
275+
if (empty($string)) {
276+
$finished = true;
277+
}
278+
}
279+
280+
// remove temp queue and requeue queue
281+
self::redis()->del($requeueQueue);
282+
self::redis()->del($tempQueue);
283+
284+
return $counter;
285+
}
286+
287+
/**
288+
* Remove List
289+
*
290+
* @private
291+
*
292+
* @params string $queue the name of the queue
293+
* @return integer number of deleted items belongs to this list
294+
*/
295+
private static function removeList($queue)
296+
{
297+
$counter = self::size($queue);
298+
$result = self::redis()->del('queue:' . $queue);
299+
return ($result == 1) ? $counter : 0;
300+
}
218301
}
302+

lib/Resque/Redis.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class Resque_Redis
7878
'zremrangebyscore',
7979
'sort',
8080
'rename'
81+
'rpoplpush'
8182
);
8283
// sinterstore
8384
// sunion
@@ -86,7 +87,6 @@ class Resque_Redis
8687
// sdiffstore
8788
// sinter
8889
// smove
89-
// rpoplpush
9090
// mget
9191
// msetnx
9292
// mset

test/Resque/Tests/JobTest.php

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,77 @@ public function testJobWithNamespace()
180180
Resque_Redis::prefix('resque');
181181
$this->assertEquals(Resque::size($queue), 0);
182182
}
183-
}
183+
184+
public function testDequeueAll()
185+
{
186+
$queue = 'jobs';
187+
Resque::enqueue($queue, 'Test_Job_Dequeue');
188+
Resque::enqueue($queue, 'Test_Job_Dequeue');
189+
$this->assertEquals(Resque::size($queue), 2);
190+
$this->assertEquals(Resque::dequeue($queue), 2);
191+
$this->assertEquals(Resque::size($queue), 0);
192+
}
193+
194+
public function testDequeueMakeSureNotDeleteOthers()
195+
{
196+
$queue = 'jobs';
197+
Resque::enqueue($queue, 'Test_Job_Dequeue');
198+
Resque::enqueue($queue, 'Test_Job_Dequeue');
199+
$other_queue = 'other_jobs';
200+
Resque::enqueue($other_queue, 'Test_Job_Dequeue');
201+
Resque::enqueue($other_queue, 'Test_Job_Dequeue');
202+
$this->assertEquals(Resque::size($queue), 2);
203+
$this->assertEquals(Resque::size($other_queue), 2);
204+
$this->assertEquals(Resque::dequeue($queue), 2);
205+
$this->assertEquals(Resque::size($queue), 0);
206+
$this->assertEquals(Resque::size($other_queue), 2);
207+
}
208+
209+
public function testDequeueSpecificItem()
210+
{
211+
$queue = 'jobs';
212+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
213+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
214+
$this->assertEquals(Resque::size($queue), 2);
215+
$test = array('Test_Job_Dequeue2');
216+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
217+
$this->assertEquals(Resque::size($queue), 1);
218+
}
219+
220+
public function testDequeueSpecificMultipleItems()
221+
{
222+
$queue = 'jobs';
223+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
224+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
225+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
226+
$this->assertEquals(Resque::size($queue), 3);
227+
$test = array('Test_Job_Dequeue2', 'Test_Job_Dequeue3');
228+
$this->assertEquals(Resque::dequeue($queue, $test), 2);
229+
$this->assertEquals(Resque::size($queue), 1);
230+
}
231+
232+
public function testDequeueNonExistingItem()
233+
{
234+
$queue = 'jobs';
235+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
236+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
237+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
238+
$this->assertEquals(Resque::size($queue), 3);
239+
$test = array('Test_Job_Dequeue4');
240+
$this->assertEquals(Resque::dequeue($queue, $test), 0);
241+
$this->assertEquals(Resque::size($queue), 3);
242+
}
243+
244+
public function testDequeueNonExistingItem2()
245+
{
246+
$queue = 'jobs';
247+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
248+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
249+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
250+
$this->assertEquals(Resque::size($queue), 3);
251+
$test = array('Test_Job_Dequeue4', 'Test_Job_Dequeue1');
252+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
253+
$this->assertEquals(Resque::size($queue), 2);
254+
}
255+
256+
}

0 commit comments

Comments
 (0)