From 79e0366c3fa7354eabd2b375a365dc8cb0f55b41 Mon Sep 17 00:00:00 2001 From: omaxx Date: Wed, 29 Sep 2021 22:50:38 -0400 Subject: [PATCH] add ordered argument to insert method fix #2169 --- mongoengine/queryset/base.py | 55 +++++++++++++++++++++++---------- tests/queryset/test_queryset.py | 29 ++++++++++++++++- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index 5dc47e001..5d3c997f2 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -293,7 +293,12 @@ def first(self): return result def insert( - self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None + self, + doc_or_docs, + load_bulk=True, + write_concern=None, + signal_kwargs=None, + ordered=True, ): """bulk insert documents @@ -309,6 +314,11 @@ def insert( each server being written to. :param signal_kwargs: (optional) kwargs dictionary to be passed to the signal calls. + :param ordered (optional): If True (the default) documents will be + inserted on the server serially, in the order provided. If an error + occurs all remaining inserts are aborted. If False, documents will + be inserted on the server in arbitrary order, possibly in parallel, + and all document inserts will be attempted. By default returns document instances, set ``load_bulk`` to False to return just ``ObjectIds`` @@ -341,12 +351,14 @@ def insert( with set_write_concern(self._collection, write_concern) as collection: insert_func = collection.insert_many + insert_func_kwargs = {"ordered": ordered} if return_one: raw = raw[0] insert_func = collection.insert_one + insert_func_kwargs = {} try: - inserted_result = insert_func(raw) + inserted_result = insert_func(raw, **insert_func_kwargs) ids = ( [inserted_result.inserted_id] if return_one @@ -358,6 +370,17 @@ def insert( except pymongo.errors.BulkWriteError as err: # inserting documents that already have an _id field will # give huge performance debt or raise + if ordered: + inserted = err.details["nInserted"] + for doc, raw_doc in zip(docs[:inserted], raw[:inserted]): + doc.pk = raw_doc["_id"] + else: + not_writed_ids = [ + error["op"]["_id"] for error in err.details["writeErrors"] + ] + for doc, raw_doc in zip(docs, raw): + if raw_doc["_id"] not in not_writed_ids: + doc.pk = raw_doc["_id"] message = "Bulk write error: (%s)" raise BulkWriteError(message % err.details) except pymongo.errors.OperationFailure as err: @@ -1715,29 +1738,29 @@ def no_dereference(self): def _item_frequencies_map_reduce(self, field, normalize=False): map_func = """ - function() { - var path = '{{~%(field)s}}'.split('.'); + function() {{ + var path = '{{{{~{field}}}}}'.split('.'); var field = this; - for (p in path) { + for (p in path) {{ if (typeof field != 'undefined') field = field[path[p]]; else break; - } - if (field && field.constructor == Array) { - field.forEach(function(item) { + }} + if (field && field.constructor == Array) {{ + field.forEach(function(item) {{ emit(item, 1); - }); - } else if (typeof field != 'undefined') { + }}); + }} else if (typeof field != 'undefined') {{ emit(field, 1); - } else { + }} else {{ emit(null, 1); - } - } - """ % { - "field": field - } + }} + }} + """.format( + field=field + ) reduce_func = """ function(key, values) { var total = 0; diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index 1aa4f32a3..6527c1e41 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -12,7 +12,7 @@ from mongoengine import * from mongoengine.connection import get_db from mongoengine.context_managers import query_counter, switch_db -from mongoengine.errors import InvalidQueryError +from mongoengine.errors import BulkWriteError, InvalidQueryError from mongoengine.mongodb_support import ( MONGODB_36, get_mongodb_version, @@ -1067,6 +1067,33 @@ class Comment(Document): com2 = Comment(id=1) Comment.objects.insert([com1, com2]) + def test_bulk_insert_ordered(self): + class Comment(Document): + name = StringField(unique=True) + + Comment.drop_collection() + Comment.objects.insert(Comment(name="b"), ordered=True) + comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")] + with pytest.raises(BulkWriteError): + Comment.objects.insert(comments, ordered=True) + Comment.objects.get(name="a") + with pytest.raises(DoesNotExist): + Comment.objects.get(name="c") + assert comments[0].pk is not None + assert comments[1].pk is None + assert comments[2].pk is None + + Comment.drop_collection() + Comment.objects.insert(Comment(name="b"), ordered=False) + comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")] + with pytest.raises(BulkWriteError): + Comment.objects.insert(comments, ordered=False) + Comment.objects.get(name="a") + Comment.objects.get(name="c") + assert comments[0].pk is not None + assert comments[1].pk is None + assert comments[2].pk is not None + def test_insert_raise_if_duplicate_in_constraint(self): class Comment(Document): id = IntField(primary_key=True)