Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion gridfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
_clear_entity_type_registry,
_disallow_transactions,
)
from pymongo import ASCENDING, DESCENDING
from pymongo import ASCENDING, DESCENDING, _csot
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.common import validate_string
Expand Down Expand Up @@ -514,6 +514,7 @@ def __init__(
)

self._chunk_size_bytes = chunk_size_bytes
self._timeout = db.client.options.timeout

def open_upload_stream(
self,
Expand Down Expand Up @@ -631,6 +632,7 @@ def open_upload_stream_with_id(

return GridIn(self._collection, session=session, **opts)

@_csot.apply
def upload_from_stream(
self,
filename: str,
Expand Down Expand Up @@ -679,6 +681,7 @@ def upload_from_stream(

return cast(ObjectId, gin._id)

@_csot.apply
def upload_from_stream_with_id(
self,
file_id: Any,
Expand Down Expand Up @@ -762,6 +765,7 @@ def open_download_stream(
gout._ensure_file()
return gout

@_csot.apply
def download_to_stream(
self, file_id: Any, destination: Any, session: Optional[ClientSession] = None
) -> None:
Expand Down Expand Up @@ -795,6 +799,7 @@ def download_to_stream(
for chunk in gout:
destination.write(chunk)

@_csot.apply
def delete(self, file_id: Any, session: Optional[ClientSession] = None) -> None:
"""Given an file_id, delete this stored file's files collection document
and associated chunks from a GridFS bucket.
Expand Down Expand Up @@ -926,6 +931,7 @@ def open_download_stream_by_name(
except StopIteration:
raise NoFile("no version %d for filename %r" % (revision, filename))

@_csot.apply
def download_to_stream_by_name(
self,
filename: str,
Expand Down
15 changes: 14 additions & 1 deletion pymongo/_csot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import functools
import time
from contextvars import ContextVar, Token
from typing import Any, Callable, Optional, Tuple, TypeVar, cast
from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast

from pymongo.write_concern import WriteConcern

TIMEOUT: ContextVar[Optional[float]] = ContextVar("TIMEOUT", default=None)
RTT: ContextVar[float] = ContextVar("RTT", default=0.0)
Expand Down Expand Up @@ -103,3 +105,14 @@ def csot_wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)

return cast(F, csot_wrapper)


def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcern]) -> None:
"""Apply the given write concern to a command."""
if not write_concern or write_concern.is_server_default:
return
wc = write_concern.document
if get_timeout() is not None:
wc.pop("wtimeout", None)
if wc:
cmd["writeConcern"] = wc
5 changes: 2 additions & 3 deletions pymongo/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from bson.objectid import ObjectId
from bson.raw_bson import RawBSONDocument
from bson.son import SON
from pymongo import common
from pymongo import _csot, common
from pymongo.client_session import _validate_session_write_concern
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
Expand Down Expand Up @@ -315,8 +315,7 @@ def _execute_command(
cmd = SON([(cmd_name, self.collection.name), ("ordered", self.ordered)])
if self.comment:
cmd["comment"] = self.comment
if not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
_csot.apply_write_concern(cmd, write_concern)
if self.bypass_doc_val:
cmd["bypassDocumentValidation"] = True
if self.let is not None and run.op_type in (_DELETE, _UPDATE):
Expand Down
8 changes: 0 additions & 8 deletions pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,6 @@ def _insert_one(
command = SON([("insert", self.name), ("ordered", ordered), ("documents", [doc])])
if comment is not None:
command["comment"] = comment
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document

def _insert_command(session, sock_info, retryable_write):
if bypass_doc_val:
Expand Down Expand Up @@ -756,8 +754,6 @@ def _update(
if let is not None:
common.validate_is_mapping("let", let)
command["let"] = let
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document

if comment is not None:
command["comment"] = comment
Expand Down Expand Up @@ -1232,8 +1228,6 @@ def _delete(
hint = helpers._index_document(hint)
delete_doc["hint"] = hint
command = SON([("delete", self.name), ("ordered", ordered), ("deletes", [delete_doc])])
if not write_concern.is_server_default:
command["writeConcern"] = write_concern.document

if let is not None:
common.validate_is_document_type("let", let)
Expand Down Expand Up @@ -2820,8 +2814,6 @@ def _find_and_modify(session, sock_info, retryable_write):
"Must be connected to MongoDB 4.4+ to use hint on unacknowledged find and modify commands."
)
cmd["hint"] = hint
if not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
out = self._command(
sock_info,
cmd,
Expand Down
5 changes: 2 additions & 3 deletions pymongo/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ def command(

# Support CSOT
if client:
sock_info.apply_timeout(client, spec, write_concern)
elif write_concern and not write_concern.is_server_default:
spec["writeConcern"] = write_concern.document
sock_info.apply_timeout(client, spec)
_csot.apply_write_concern(spec, write_concern)

if use_op_msg:
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
Expand Down
9 changes: 1 addition & 8 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,16 +569,13 @@ def set_socket_timeout(self, timeout):
self.last_timeout = timeout
self.sock.settimeout(timeout)

def apply_timeout(self, client, cmd, write_concern=None):
def apply_timeout(self, client, cmd):
# CSOT: use remaining timeout when set.
timeout = _csot.remaining()
if timeout is None:
# Reset the socket timeout unless we're performing a streaming monitor check.
if not self.more_to_come:
self.set_socket_timeout(self.opts.socket_timeout)

if cmd and write_concern and not write_concern.is_server_default:
cmd["writeConcern"] = write_concern.document
return None
# RTT validation.
rtt = _csot.get_rtt()
Expand All @@ -593,10 +590,6 @@ def apply_timeout(self, client, cmd, write_concern=None):
)
if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000)
wc = write_concern.document if write_concern else {}
wc.pop("wtimeout", None)
if wc:
cmd["writeConcern"] = wc
self.set_socket_timeout(timeout)
return timeout

Expand Down
36 changes: 25 additions & 11 deletions test/csot/gridfs-advanced.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
Expand Down Expand Up @@ -62,13 +62,12 @@
"_id": {
"$oid": "000000000000000000000005"
},
"length": 10,
"length": 8,
"chunkSize": 4,
"uploadDate": {
"$date": "1970-01-01T00:00:00.000Z"
},
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
"filename": "length-10",
"filename": "length-8",
"contentType": "application/octet-stream",
"aliases": [],
"metadata": {}
Expand All @@ -93,6 +92,21 @@
"subType": "00"
}
}
},
{
"_id": {
"$oid": "000000000000000000000006"
},
"files_id": {
"$oid": "000000000000000000000005"
},
"n": 1,
"data": {
"$binary": {
"base64": "ESIzRA==",
"subType": "00"
}
}
}
]
}
Expand All @@ -116,7 +130,7 @@
"update"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand All @@ -129,7 +143,7 @@
"$oid": "000000000000000000000005"
},
"newFilename": "foo",
"timeoutMS": 100
"timeoutMS": 2000
}
}
],
Expand Down Expand Up @@ -174,7 +188,7 @@
"update"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand Down Expand Up @@ -234,7 +248,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand All @@ -243,7 +257,7 @@
"name": "drop",
"object": "bucket",
"arguments": {
"timeoutMS": 100
"timeoutMS": 2000
}
}
]
Expand All @@ -266,7 +280,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand Down Expand Up @@ -320,7 +334,7 @@
"drop"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand Down
32 changes: 23 additions & 9 deletions test/csot/gridfs-delete.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
"timeoutMS": 75
},
"useMultipleMongoses": false,
"observeEvents": [
Expand Down Expand Up @@ -62,13 +62,12 @@
"_id": {
"$oid": "000000000000000000000005"
},
"length": 10,
"length": 8,
"chunkSize": 4,
"uploadDate": {
"$date": "1970-01-01T00:00:00.000Z"
},
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
"filename": "length-10",
"filename": "length-8",
"contentType": "application/octet-stream",
"aliases": [],
"metadata": {}
Expand All @@ -93,6 +92,21 @@
"subType": "00"
}
}
},
{
"_id": {
"$oid": "000000000000000000000006"
},
"files_id": {
"$oid": "000000000000000000000005"
},
"n": 1,
"data": {
"$binary": {
"base64": "ESIzRA==",
"subType": "00"
}
}
}
]
}
Expand All @@ -116,7 +130,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand All @@ -128,7 +142,7 @@
"id": {
"$oid": "000000000000000000000005"
},
"timeoutMS": 100
"timeoutMS": 1000
}
}
]
Expand All @@ -151,7 +165,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand Down Expand Up @@ -210,7 +224,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 55
"blockTimeMS": 100
}
}
}
Expand Down Expand Up @@ -247,7 +261,7 @@
"delete"
],
"blockConnection": true,
"blockTimeMS": 30
"blockTimeMS": 50
}
}
}
Expand Down
Loading