From a004471174834aa04ff792876aa53cb0173021ae Mon Sep 17 00:00:00 2001 From: JackWilb Date: Thu, 13 Jan 2022 13:42:36 -0700 Subject: [PATCH 1/2] Add chunking to put_rows and delete_rows --- multinet/api/models/table.py | 41 +++++++++++++++++++++++++----------- setup.py | 1 + 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/multinet/api/models/table.py b/multinet/api/models/table.py index 945db54..4a122d3 100644 --- a/multinet/api/models/table.py +++ b/multinet/api/models/table.py @@ -10,9 +10,13 @@ from django.db.models.signals import post_delete, pre_save from django.dispatch.dispatcher import receiver from django_extensions.db.models import TimeStampedModel +from more_itertools import chunked from .workspace import Workspace +# The max number of documents that should be sent in bulk requests +DOCUMENT_CHUNK_SIZE = 5000 + @dataclass class RowModifyError: @@ -58,24 +62,37 @@ def get_rows(self, limit: Optional[int] = None, offset: Optional[int] = None) -> def put_rows(self, rows: List[Dict]) -> RowInsertionResponse: """Insert/update rows in the underlying arangodb collection.""" - res = self.get_arango_collection(readonly=False).insert_many(rows, overwrite=True) - errors = [ - RowModifyError(index=i, message=doc.error_message) - for i, doc in enumerate(res) - if isinstance(doc, DocumentInsertError) - ] + errors = [] + + # Limit the amount of rows inserted per request, to prevent timeouts + for chunk in chunked(rows, DOCUMENT_CHUNK_SIZE): + print(len(chunk)) + res = self.get_arango_collection(readonly=False).insert_many(chunk, overwrite=True) + errors.extend( + ( + RowModifyError(index=i, message=doc.error_message) + for i, doc in enumerate(res) + if isinstance(doc, DocumentInsertError) + ) + ) inserted = len(rows) - len(errors) return RowInsertionResponse(inserted=inserted, errors=errors) def delete_rows(self, rows: List[Dict]) -> RowDeletionResponse: """Delete rows in the underlying arangodb collection.""" - res = self.get_arango_collection(readonly=False).delete_many(rows) - errors = [ - RowModifyError(index=i, message=doc.error_message) - for i, doc in enumerate(res) - if isinstance(doc, DocumentDeleteError) - ] + errors = [] + + # Limit the amount of rows deleted per request, to prevent timeouts + for chunk in chunked(rows, DOCUMENT_CHUNK_SIZE): + res = self.get_arango_collection(readonly=False).delete_many(chunk) + errors.extend( + ( + RowModifyError(index=i, message=doc.error_message) + for i, doc in enumerate(res) + if isinstance(doc, DocumentDeleteError) + ) + ) deleted = len(rows) - len(errors) return RowDeletionResponse(deleted=deleted, errors=errors) diff --git a/setup.py b/setup.py index 9f6d560..f7b900a 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ 'djangorestframework', 'drf-extensions', 'drf-yasg', + 'more-itertools', 'python-arango', # Production-only 'gunicorn', From a32aea44b193b169e5240d6d0b0e6c876f134fe7 Mon Sep 17 00:00:00 2001 From: JackWilb Date: Thu, 13 Jan 2022 16:15:45 -0700 Subject: [PATCH 2/2] Remove print statement --- multinet/api/models/table.py | 1 - 1 file changed, 1 deletion(-) diff --git a/multinet/api/models/table.py b/multinet/api/models/table.py index 4a122d3..121a7c5 100644 --- a/multinet/api/models/table.py +++ b/multinet/api/models/table.py @@ -66,7 +66,6 @@ def put_rows(self, rows: List[Dict]) -> RowInsertionResponse: # Limit the amount of rows inserted per request, to prevent timeouts for chunk in chunked(rows, DOCUMENT_CHUNK_SIZE): - print(len(chunk)) res = self.get_arango_collection(readonly=False).insert_many(chunk, overwrite=True) errors.extend( (