Skip to content

Commit b6b972c

Browse files
authored
Merge pull request #684 from alcaeus/fix-bulkwrite-pipeline
PHPLIB-481: Add the ability to specify a pipeline to an update within bulk write
2 parents 9a56bda + d44bd16 commit b6b972c

File tree

4 files changed

+202
-68
lines changed

4 files changed

+202
-68
lines changed

src/Operation/BulkWrite.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use function is_object;
3434
use function key;
3535
use function MongoDB\is_first_key_operator;
36+
use function MongoDB\is_pipeline;
3637
use function MongoDB\server_supports_feature;
3738
use function sprintf;
3839

@@ -255,8 +256,8 @@ public function __construct($databaseName, $collectionName, array $operations, a
255256
throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array or object');
256257
}
257258

258-
if (! is_first_key_operator($args[1])) {
259-
throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is not an update operator', $i, $type));
259+
if (! is_first_key_operator($args[1]) && ! is_pipeline($args[1])) {
260+
throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is neither an update operator nor a pipeline', $i, $type));
260261
}
261262

262263
if (! isset($args[2])) {

tests/Operation/BulkWriteFunctionalTest.php

Lines changed: 30 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
use MongoDB\Driver\BulkWrite as Bulk;
99
use MongoDB\Driver\WriteConcern;
1010
use MongoDB\Exception\BadMethodCallException;
11-
use MongoDB\Exception\InvalidArgumentException;
1211
use MongoDB\Model\BSONDocument;
1312
use MongoDB\Operation\BulkWrite;
1413
use MongoDB\Tests\CommandObserver;
@@ -226,67 +225,6 @@ public function testUnacknowledgedWriteConcernAccessesUpsertedIds(BulkWriteResul
226225
$result->getUpsertedIds();
227226
}
228227

229-
public function testUnknownOperation()
230-
{
231-
$this->expectException(InvalidArgumentException::class);
232-
$this->expectExceptionMessage('Unknown operation type "foo" in $operations[0]');
233-
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
234-
['foo' => [['_id' => 1]]],
235-
]);
236-
}
237-
238-
/**
239-
* @dataProvider provideOpsWithMissingArguments
240-
*/
241-
public function testMissingArguments(array $ops)
242-
{
243-
$this->expectException(InvalidArgumentException::class);
244-
$this->expectExceptionMessageRegExp('/Missing (first|second) argument for \$operations\[\d+\]\["\w+\"]/');
245-
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), $ops);
246-
}
247-
248-
public function provideOpsWithMissingArguments()
249-
{
250-
return [
251-
[[['insertOne' => []]]],
252-
[[['updateOne' => []]]],
253-
[[['updateOne' => [['_id' => 1]]]]],
254-
[[['updateMany' => []]]],
255-
[[['updateMany' => [['_id' => 1]]]]],
256-
[[['replaceOne' => []]]],
257-
[[['replaceOne' => [['_id' => 1]]]]],
258-
[[['deleteOne' => []]]],
259-
[[['deleteMany' => []]]],
260-
];
261-
}
262-
263-
public function testUpdateOneRequiresUpdateOperators()
264-
{
265-
$this->expectException(InvalidArgumentException::class);
266-
$this->expectExceptionMessage('First key in $operations[0]["updateOne"][1] is not an update operator');
267-
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
268-
['updateOne' => [['_id' => 1], ['x' => 1]]],
269-
]);
270-
}
271-
272-
public function testUpdateManyRequiresUpdateOperators()
273-
{
274-
$this->expectException(InvalidArgumentException::class);
275-
$this->expectExceptionMessage('First key in $operations[0]["updateMany"][1] is not an update operator');
276-
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
277-
['updateMany' => [['_id' => ['$gt' => 1]], ['x' => 1]]],
278-
]);
279-
}
280-
281-
public function testReplaceOneRequiresReplacementDocument()
282-
{
283-
$this->expectException(InvalidArgumentException::class);
284-
$this->expectExceptionMessage('First key in $operations[0]["replaceOne"][1] is an update operator');
285-
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
286-
['replaceOne' => [['_id' => 1], ['$inc' => ['x' => 1]]]],
287-
]);
288-
}
289-
290228
public function testSessionOption()
291229
{
292230
if (version_compare($this->getServerVersion(), '3.6.0', '<')) {
@@ -357,6 +295,36 @@ function (array $event) {
357295
);
358296
}
359297

298+
public function testBulkWriteWithPipelineUpdates()
299+
{
300+
if (version_compare($this->getServerVersion(), '4.2.0', '<')) {
301+
$this->markTestSkipped('Pipeline-style updates are not supported');
302+
}
303+
304+
$this->createFixtures(4);
305+
306+
$ops = [
307+
['updateOne' => [['_id' => 2], [['$addFields' => ['y' => 2]]]]],
308+
['updateMany' => [['_id' => ['$gt' => 2]], [['$addFields' => ['y' => '$_id']]]]],
309+
];
310+
311+
$operation = new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), $ops);
312+
$result = $operation->execute($this->getPrimaryServer());
313+
314+
$this->assertInstanceOf(BulkWriteResult::class, $result);
315+
$this->assertSame(3, $result->getMatchedCount());
316+
$this->assertSame(3, $result->getModifiedCount());
317+
318+
$expected = [
319+
['_id' => 1, 'x' => 11],
320+
['_id' => 2, 'x' => 22, 'y' => 2],
321+
['_id' => 3, 'x' => 33, 'y' => 3],
322+
['_id' => 4, 'x' => 44, 'y' => 4],
323+
];
324+
325+
$this->assertSameDocuments($expected, $this->collection->find());
326+
}
327+
360328
/**
361329
* Create data fixtures.
362330
*

tests/Operation/BulkWriteTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,10 @@ public function testUpdateManyUpdateArgumentTypeCheck($update)
258258
]);
259259
}
260260

261-
public function testUpdateManyUpdateArgumentRequiresOperators()
261+
public function testUpdateManyUpdateArgumentRequiresOperatorsOrPipeline()
262262
{
263263
$this->expectException(InvalidArgumentException::class);
264-
$this->expectExceptionMessage('First key in $operations[0]["updateMany"][1] is not an update operator');
264+
$this->expectExceptionMessage('First key in $operations[0]["updateMany"][1] is neither an update operator nor a pipeline');
265265
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
266266
[BulkWrite::UPDATE_MANY => [['_id' => ['$gt' => 1]], ['x' => 1]]],
267267
]);
@@ -345,10 +345,10 @@ public function testUpdateOneUpdateArgumentTypeCheck($update)
345345
]);
346346
}
347347

348-
public function testUpdateOneUpdateArgumentRequiresOperators()
348+
public function testUpdateOneUpdateArgumentRequiresOperatorsOrPipeline()
349349
{
350350
$this->expectException(InvalidArgumentException::class);
351-
$this->expectExceptionMessage('First key in $operations[0]["updateOne"][1] is not an update operator');
351+
$this->expectExceptionMessage('First key in $operations[0]["updateOne"][1] is neither an update operator nor a pipeline');
352352
new BulkWrite($this->getDatabaseName(), $this->getCollectionName(), [
353353
[BulkWrite::UPDATE_ONE => [['_id' => 1], ['x' => 1]]],
354354
]);

tests/SpecTests/crud/updateWithPipelines.json

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,171 @@
238238
]
239239
}
240240
}
241+
},
242+
{
243+
"description": "UpdateOne in bulk write using pipelines",
244+
"operations": [
245+
{
246+
"name": "bulkWrite",
247+
"arguments": {
248+
"requests": [
249+
{
250+
"name": "updateOne",
251+
"arguments": {
252+
"filter": {
253+
"_id": 1
254+
},
255+
"update": [
256+
{
257+
"$replaceRoot": {
258+
"newRoot": "$t"
259+
}
260+
},
261+
{
262+
"$addFields": {
263+
"foo": 1
264+
}
265+
}
266+
]
267+
}
268+
}
269+
]
270+
},
271+
"result": {
272+
"matchedCount": 1,
273+
"modifiedCount": 1,
274+
"upsertedCount": 0
275+
}
276+
}
277+
],
278+
"expectations": [
279+
{
280+
"command_started_event": {
281+
"command": {
282+
"update": "test",
283+
"updates": [
284+
{
285+
"q": {
286+
"_id": 1
287+
},
288+
"u": [
289+
{
290+
"$replaceRoot": {
291+
"newRoot": "$t"
292+
}
293+
},
294+
{
295+
"$addFields": {
296+
"foo": 1
297+
}
298+
}
299+
]
300+
}
301+
]
302+
},
303+
"command_name": "update",
304+
"database_name": "crud-tests"
305+
}
306+
}
307+
],
308+
"outcome": {
309+
"collection": {
310+
"data": [
311+
{
312+
"_id": 1,
313+
"u": {
314+
"v": 1
315+
},
316+
"foo": 1
317+
},
318+
{
319+
"_id": 2,
320+
"x": 2,
321+
"y": 1
322+
}
323+
]
324+
}
325+
}
326+
},
327+
{
328+
"description": "UpdateMany in bulk write using pipelines",
329+
"operations": [
330+
{
331+
"name": "bulkWrite",
332+
"arguments": {
333+
"requests": [
334+
{
335+
"name": "updateMany",
336+
"arguments": {
337+
"filter": {},
338+
"update": [
339+
{
340+
"$project": {
341+
"x": 1
342+
}
343+
},
344+
{
345+
"$addFields": {
346+
"foo": 1
347+
}
348+
}
349+
]
350+
}
351+
}
352+
]
353+
},
354+
"result": {
355+
"matchedCount": 2,
356+
"modifiedCount": 2,
357+
"upsertedCount": 0
358+
}
359+
}
360+
],
361+
"expectations": [
362+
{
363+
"command_started_event": {
364+
"command": {
365+
"update": "test",
366+
"updates": [
367+
{
368+
"q": {},
369+
"u": [
370+
{
371+
"$project": {
372+
"x": 1
373+
}
374+
},
375+
{
376+
"$addFields": {
377+
"foo": 1
378+
}
379+
}
380+
],
381+
"multi": true
382+
}
383+
]
384+
},
385+
"command_name": "update",
386+
"database_name": "crud-tests"
387+
}
388+
}
389+
],
390+
"outcome": {
391+
"collection": {
392+
"data": [
393+
{
394+
"_id": 1,
395+
"x": 1,
396+
"foo": 1
397+
},
398+
{
399+
"_id": 2,
400+
"x": 2,
401+
"foo": 1
402+
}
403+
]
404+
}
405+
}
241406
}
242407
]
243408
}

0 commit comments

Comments
 (0)