diff --git a/multinet/api/models/table.py b/multinet/api/models/table.py index 945db54..121a7c5 100644 --- a/multinet/api/models/table.py +++ b/multinet/api/models/table.py @@ -10,9 +10,13 @@ from django.db.models.signals import post_delete, pre_save from django.dispatch.dispatcher import receiver from django_extensions.db.models import TimeStampedModel +from more_itertools import chunked from .workspace import Workspace +# The max number of documents that should be sent in bulk requests +DOCUMENT_CHUNK_SIZE = 5000 + @dataclass class RowModifyError: @@ -58,24 +62,36 @@ def get_rows(self, limit: Optional[int] = None, offset: Optional[int] = None) -> def put_rows(self, rows: List[Dict]) -> RowInsertionResponse: """Insert/update rows in the underlying arangodb collection.""" - res = self.get_arango_collection(readonly=False).insert_many(rows, overwrite=True) - errors = [ - RowModifyError(index=i, message=doc.error_message) - for i, doc in enumerate(res) - if isinstance(doc, DocumentInsertError) - ] + errors = [] + + # Limit the amount of rows inserted per request, to prevent timeouts + for chunk in chunked(rows, DOCUMENT_CHUNK_SIZE): + res = self.get_arango_collection(readonly=False).insert_many(chunk, overwrite=True) + errors.extend( + ( + RowModifyError(index=i, message=doc.error_message) + for i, doc in enumerate(res) + if isinstance(doc, DocumentInsertError) + ) + ) inserted = len(rows) - len(errors) return RowInsertionResponse(inserted=inserted, errors=errors) def delete_rows(self, rows: List[Dict]) -> RowDeletionResponse: """Delete rows in the underlying arangodb collection.""" - res = self.get_arango_collection(readonly=False).delete_many(rows) - errors = [ - RowModifyError(index=i, message=doc.error_message) - for i, doc in enumerate(res) - if isinstance(doc, DocumentDeleteError) - ] + errors = [] + + # Limit the amount of rows deleted per request, to prevent timeouts + for chunk in chunked(rows, DOCUMENT_CHUNK_SIZE): + res = self.get_arango_collection(readonly=False).delete_many(chunk) + errors.extend( + ( + RowModifyError(index=i, message=doc.error_message) + for i, doc in enumerate(res) + if isinstance(doc, DocumentDeleteError) + ) + ) deleted = len(rows) - len(errors) return RowDeletionResponse(deleted=deleted, errors=errors) diff --git a/setup.py b/setup.py index 9f6d560..f7b900a 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ 'djangorestframework', 'drf-extensions', 'drf-yasg', + 'more-itertools', 'python-arango', # Production-only 'gunicorn',