diff --git a/gridfs/__init__.py b/gridfs/__init__.py index 08c7e1d2cd..6ab843a85e 100644 --- a/gridfs/__init__.py +++ b/gridfs/__init__.py @@ -33,7 +33,7 @@ _clear_entity_type_registry, _disallow_transactions, ) -from pymongo import ASCENDING, DESCENDING +from pymongo import ASCENDING, DESCENDING, _csot from pymongo.client_session import ClientSession from pymongo.collection import Collection from pymongo.common import validate_string @@ -514,6 +514,7 @@ def __init__( ) self._chunk_size_bytes = chunk_size_bytes + self._timeout = db.client.options.timeout def open_upload_stream( self, @@ -631,6 +632,7 @@ def open_upload_stream_with_id( return GridIn(self._collection, session=session, **opts) + @_csot.apply def upload_from_stream( self, filename: str, @@ -679,6 +681,7 @@ def upload_from_stream( return cast(ObjectId, gin._id) + @_csot.apply def upload_from_stream_with_id( self, file_id: Any, @@ -762,6 +765,7 @@ def open_download_stream( gout._ensure_file() return gout + @_csot.apply def download_to_stream( self, file_id: Any, destination: Any, session: Optional[ClientSession] = None ) -> None: @@ -795,6 +799,7 @@ def download_to_stream( for chunk in gout: destination.write(chunk) + @_csot.apply def delete(self, file_id: Any, session: Optional[ClientSession] = None) -> None: """Given an file_id, delete this stored file's files collection document and associated chunks from a GridFS bucket. @@ -926,6 +931,7 @@ def open_download_stream_by_name( except StopIteration: raise NoFile("no version %d for filename %r" % (revision, filename)) + @_csot.apply def download_to_stream_by_name( self, filename: str, diff --git a/pymongo/_csot.py b/pymongo/_csot.py index e25bba108f..5170c0d8ca 100644 --- a/pymongo/_csot.py +++ b/pymongo/_csot.py @@ -17,7 +17,9 @@ import functools import time from contextvars import ContextVar, Token -from typing import Any, Callable, Optional, Tuple, TypeVar, cast +from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast + +from pymongo.write_concern import WriteConcern TIMEOUT: ContextVar[Optional[float]] = ContextVar("TIMEOUT", default=None) RTT: ContextVar[float] = ContextVar("RTT", default=0.0) @@ -103,3 +105,14 @@ def csot_wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) return cast(F, csot_wrapper) + + +def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcern]) -> None: + """Apply the given write concern to a command.""" + if not write_concern or write_concern.is_server_default: + return + wc = write_concern.document + if get_timeout() is not None: + wc.pop("wtimeout", None) + if wc: + cmd["writeConcern"] = wc diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 7992383f67..b21b576aa5 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -23,7 +23,7 @@ from bson.objectid import ObjectId from bson.raw_bson import RawBSONDocument from bson.son import SON -from pymongo import common +from pymongo import _csot, common from pymongo.client_session import _validate_session_write_concern from pymongo.collation import validate_collation_or_none from pymongo.common import ( @@ -315,8 +315,7 @@ def _execute_command( cmd = SON([(cmd_name, self.collection.name), ("ordered", self.ordered)]) if self.comment: cmd["comment"] = self.comment - if not write_concern.is_server_default: - cmd["writeConcern"] = write_concern.document + _csot.apply_write_concern(cmd, write_concern) if self.bypass_doc_val: cmd["bypassDocumentValidation"] = True if self.let is not None and run.op_type in (_DELETE, _UPDATE): diff --git a/pymongo/collection.py b/pymongo/collection.py index 22af5a6426..9a9ba56618 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -542,8 +542,6 @@ def _insert_one( command = SON([("insert", self.name), ("ordered", ordered), ("documents", [doc])]) if comment is not None: command["comment"] = comment - if not write_concern.is_server_default: - command["writeConcern"] = write_concern.document def _insert_command(session, sock_info, retryable_write): if bypass_doc_val: @@ -756,8 +754,6 @@ def _update( if let is not None: common.validate_is_mapping("let", let) command["let"] = let - if not write_concern.is_server_default: - command["writeConcern"] = write_concern.document if comment is not None: command["comment"] = comment @@ -1232,8 +1228,6 @@ def _delete( hint = helpers._index_document(hint) delete_doc["hint"] = hint command = SON([("delete", self.name), ("ordered", ordered), ("deletes", [delete_doc])]) - if not write_concern.is_server_default: - command["writeConcern"] = write_concern.document if let is not None: common.validate_is_document_type("let", let) @@ -2820,8 +2814,6 @@ def _find_and_modify(session, sock_info, retryable_write): "Must be connected to MongoDB 4.4+ to use hint on unacknowledged find and modify commands." ) cmd["hint"] = hint - if not write_concern.is_server_default: - cmd["writeConcern"] = write_concern.document out = self._command( sock_info, cmd, diff --git a/pymongo/network.py b/pymongo/network.py index 3eac0d02d3..a5c5459e14 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -118,9 +118,8 @@ def command( # Support CSOT if client: - sock_info.apply_timeout(client, spec, write_concern) - elif write_concern and not write_concern.is_server_default: - spec["writeConcern"] = write_concern.document + sock_info.apply_timeout(client, spec) + _csot.apply_write_concern(spec, write_concern) if use_op_msg: flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 diff --git a/pymongo/pool.py b/pymongo/pool.py index ed9feac918..1fab98209f 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -569,16 +569,13 @@ def set_socket_timeout(self, timeout): self.last_timeout = timeout self.sock.settimeout(timeout) - def apply_timeout(self, client, cmd, write_concern=None): + def apply_timeout(self, client, cmd): # CSOT: use remaining timeout when set. timeout = _csot.remaining() if timeout is None: # Reset the socket timeout unless we're performing a streaming monitor check. if not self.more_to_come: self.set_socket_timeout(self.opts.socket_timeout) - - if cmd and write_concern and not write_concern.is_server_default: - cmd["writeConcern"] = write_concern.document return None # RTT validation. rtt = _csot.get_rtt() @@ -593,10 +590,6 @@ def apply_timeout(self, client, cmd, write_concern=None): ) if cmd is not None: cmd["maxTimeMS"] = int(max_time_ms * 1000) - wc = write_concern.document if write_concern else {} - wc.pop("wtimeout", None) - if wc: - cmd["writeConcern"] = wc self.set_socket_timeout(timeout) return timeout diff --git a/test/csot/gridfs-advanced.json b/test/csot/gridfs-advanced.json index 668b93f37a..0b09684fc7 100644 --- a/test/csot/gridfs-advanced.json +++ b/test/csot/gridfs-advanced.json @@ -17,7 +17,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 75 }, "useMultipleMongoses": false, "observeEvents": [ @@ -62,13 +62,12 @@ "_id": { "$oid": "000000000000000000000005" }, - "length": 10, + "length": 8, "chunkSize": 4, "uploadDate": { "$date": "1970-01-01T00:00:00.000Z" }, - "md5": "57d83cd477bfb1ccd975ab33d827a92b", - "filename": "length-10", + "filename": "length-8", "contentType": "application/octet-stream", "aliases": [], "metadata": {} @@ -93,6 +92,21 @@ "subType": "00" } } + }, + { + "_id": { + "$oid": "000000000000000000000006" + }, + "files_id": { + "$oid": "000000000000000000000005" + }, + "n": 1, + "data": { + "$binary": { + "base64": "ESIzRA==", + "subType": "00" + } + } } ] } @@ -116,7 +130,7 @@ "update" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -129,7 +143,7 @@ "$oid": "000000000000000000000005" }, "newFilename": "foo", - "timeoutMS": 100 + "timeoutMS": 2000 } } ], @@ -174,7 +188,7 @@ "update" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -234,7 +248,7 @@ "drop" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -243,7 +257,7 @@ "name": "drop", "object": "bucket", "arguments": { - "timeoutMS": 100 + "timeoutMS": 2000 } } ] @@ -266,7 +280,7 @@ "drop" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -320,7 +334,7 @@ "drop" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } diff --git a/test/csot/gridfs-delete.json b/test/csot/gridfs-delete.json index f458fa827c..8701929ff3 100644 --- a/test/csot/gridfs-delete.json +++ b/test/csot/gridfs-delete.json @@ -17,7 +17,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 75 }, "useMultipleMongoses": false, "observeEvents": [ @@ -62,13 +62,12 @@ "_id": { "$oid": "000000000000000000000005" }, - "length": 10, + "length": 8, "chunkSize": 4, "uploadDate": { "$date": "1970-01-01T00:00:00.000Z" }, - "md5": "57d83cd477bfb1ccd975ab33d827a92b", - "filename": "length-10", + "filename": "length-8", "contentType": "application/octet-stream", "aliases": [], "metadata": {} @@ -93,6 +92,21 @@ "subType": "00" } } + }, + { + "_id": { + "$oid": "000000000000000000000006" + }, + "files_id": { + "$oid": "000000000000000000000005" + }, + "n": 1, + "data": { + "$binary": { + "base64": "ESIzRA==", + "subType": "00" + } + } } ] } @@ -116,7 +130,7 @@ "delete" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -128,7 +142,7 @@ "id": { "$oid": "000000000000000000000005" }, - "timeoutMS": 100 + "timeoutMS": 1000 } } ] @@ -151,7 +165,7 @@ "delete" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -210,7 +224,7 @@ "delete" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -247,7 +261,7 @@ "delete" ], "blockConnection": true, - "blockTimeMS": 30 + "blockTimeMS": 50 } } } diff --git a/test/csot/gridfs-download.json b/test/csot/gridfs-download.json index a3044a6d81..2ab64010f8 100644 --- a/test/csot/gridfs-download.json +++ b/test/csot/gridfs-download.json @@ -17,7 +17,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 75 }, "useMultipleMongoses": false, "observeEvents": [ @@ -62,13 +62,12 @@ "_id": { "$oid": "000000000000000000000005" }, - "length": 10, + "length": 8, "chunkSize": 4, "uploadDate": { "$date": "1970-01-01T00:00:00.000Z" }, - "md5": "57d83cd477bfb1ccd975ab33d827a92b", - "filename": "length-10", + "filename": "length-8", "contentType": "application/octet-stream", "aliases": [], "metadata": {} @@ -93,6 +92,21 @@ "subType": "00" } } + }, + { + "_id": { + "$oid": "000000000000000000000006" + }, + "files_id": { + "$oid": "000000000000000000000005" + }, + "n": 1, + "data": { + "$binary": { + "base64": "ESIzRA==", + "subType": "00" + } + } } ] } @@ -116,7 +130,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -128,7 +142,7 @@ "id": { "$oid": "000000000000000000000005" }, - "timeoutMS": 100 + "timeoutMS": 1000 } } ] @@ -151,7 +165,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -210,7 +224,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -284,7 +298,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 30 + "blockTimeMS": 50 } } } diff --git a/test/csot/gridfs-find.json b/test/csot/gridfs-find.json index f75a279c01..45bb7066d6 100644 --- a/test/csot/gridfs-find.json +++ b/test/csot/gridfs-find.json @@ -17,7 +17,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 75 }, "useMultipleMongoses": false, "observeEvents": [ @@ -84,7 +84,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -94,7 +94,7 @@ "object": "bucket", "arguments": { "filter": {}, - "timeoutMS": 100 + "timeoutMS": 1000 } } ], @@ -139,7 +139,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } diff --git a/test/csot/gridfs-upload.json b/test/csot/gridfs-upload.json index b0daeb2e42..690fdda77f 100644 --- a/test/csot/gridfs-upload.json +++ b/test/csot/gridfs-upload.json @@ -17,7 +17,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 75 }, "useMultipleMongoses": false } @@ -81,7 +81,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -117,7 +117,7 @@ "find" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -155,7 +155,7 @@ "listIndexes" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -193,7 +193,7 @@ "createIndexes" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -231,7 +231,7 @@ "listIndexes" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -269,7 +269,7 @@ "createIndexes" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -307,7 +307,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -345,7 +345,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 55 + "blockTimeMS": 100 } } } @@ -384,7 +384,7 @@ "listIndexes" ], "blockConnection": true, - "blockTimeMS": 30 + "blockTimeMS": 50 } } } diff --git a/test/unified_format.py b/test/unified_format.py index d36b5d0a48..d81238c0ee 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -52,7 +52,7 @@ snake_to_camel, ) from test.version import Version -from typing import Any +from typing import Any, List import pymongo from bson import SON, Code, DBRef, Decimal128, Int64, MaxKey, MinKey, json_util @@ -60,8 +60,8 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.objectid import ObjectId from bson.regex import RE_TYPE, Regex -from gridfs import GridFSBucket -from pymongo import ASCENDING, MongoClient +from gridfs import GridFSBucket, GridOut +from pymongo import ASCENDING, MongoClient, _csot from pymongo.change_stream import ChangeStream from pymongo.client_session import ClientSession, TransactionOptions, _TxnState from pymongo.collection import Collection @@ -460,7 +460,17 @@ def _create_entity(self, entity_spec, uri=None): elif entity_type == "bucket": db = self[spec["database"]] kwargs = parse_spec_options(spec.get("bucketOptions", {}).copy()) - self[spec["id"]] = GridFSBucket(db, **kwargs) + bucket = GridFSBucket(db, **kwargs) + + # PyMongo does not support GridFSBucket.drop(), emulate it. + @_csot.apply + def drop(self: GridFSBucket, *args: Any, **kwargs: Any) -> None: + self._files.drop(*args, **kwargs) + self._chunks.drop(*args, **kwargs) + + if not hasattr(bucket, "drop"): + bucket.drop = drop.__get__(bucket) + self[spec["id"]] = bucket return elif entity_type == "clientEncryption": opts = camel_to_snake_args(spec["clientEncryptionOpts"].copy()) @@ -871,8 +881,11 @@ def maybe_skip_test(self, spec): or "Dirty implicit session is discarded" in spec["description"] ): self.skipTest("MMAPv1 does not support retryWrites=True") - elif "Client side error in command starting transaction" in spec["description"]: + if "Client side error in command starting transaction" in spec["description"]: self.skipTest("Implement PYTHON-1894") + if "timeoutMS applied to entire download" in spec["description"]: + self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime") + class_name = self.__class__.__name__.lower() description = spec["description"].lower() if "csot" in class_name: @@ -914,17 +927,6 @@ def maybe_skip_test(self, spec): self.skipTest("PyMongo does not support modifyCollection") if "timeoutMode" in op.get("arguments", {}): self.skipTest("PyMongo does not support timeoutMode") - if "csot" in class_name: - if "bucket" in op["object"]: - self.skipTest("CSOT not implemented for GridFS") - if name == "createEntities": - self.maybe_skip_entity(op.get("arguments", {}).get("entities", [])) - - def maybe_skip_entity(self, entities): - for entity in entities: - entity_type = next(iter(entity)) - if entity_type == "bucket": - self.skipTest("GridFS is not currently supported (PYTHON-2459)") def process_error(self, exception, spec): is_error = spec.get("isError") @@ -1145,10 +1147,10 @@ def _bucketOperation_uploadWithId(self, target: GridFSBucket, *args: Any, **kwar kwargs.setdefault("metadata", {})["contentType"] = kwargs.pop("content_type") return target.upload_from_stream_with_id(*args, **kwargs) - def _bucketOperation_drop(self, target: GridFSBucket, *args: Any, **kwargs: Any) -> None: - # PyMongo does not support GridFSBucket.drop(), emulate it. - target._files.drop(*args, **kwargs) - target._chunks.drop(*args, **kwargs) + def _bucketOperation_find( + self, target: GridFSBucket, *args: Any, **kwargs: Any + ) -> List[GridOut]: + return list(target.find(*args, **kwargs)) def run_entity_operation(self, spec): target = self.entity_map[spec["object"]]