Skip to content
18 changes: 16 additions & 2 deletions google/resumable_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _process_response(self, response):
response, _ACCEPTABLE_STATUS_CODES, self._get_status_code
)

def consume(self, transport):
def consume(self, transport, timeout=None):
"""Consume the resource to be downloaded.

If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -180,6 +180,13 @@ def consume(self, transport):
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -398,12 +405,19 @@ def _process_response(self, response):
# Write the response body to the stream.
self._stream.write(response_body)

def consume_next_chunk(self, transport):
def consume_next_chunk(self, transport, timeout=None):
"""Consume the next chunk of the resource to be downloaded.

Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down
35 changes: 32 additions & 3 deletions google/resumable_media/_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _prepare_request(self, data, content_type):
self._headers[_CONTENT_TYPE_HEADER] = content_type
return _POST, self.upload_url, data, self._headers

def transmit(self, transport, data, content_type):
def transmit(self, transport, data, content_type, timeout=None):
"""Transmit the resource to be uploaded.

Args:
Expand All @@ -207,6 +207,13 @@ def transmit(self, transport, data, content_type):
data (bytes): The resource content to be uploaded.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -274,7 +281,7 @@ def _prepare_request(self, data, metadata, content_type):
self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
return _POST, self.upload_url, content, self._headers

def transmit(self, transport, data, metadata, content_type):
def transmit(self, transport, data, metadata, content_type, timeout=None):
"""Transmit the resource to be uploaded.

Args:
Expand All @@ -285,6 +292,13 @@ def transmit(self, transport, data, metadata, content_type):
ACL list.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -468,6 +482,7 @@ def initiate(
content_type,
total_bytes=None,
stream_final=True,
timeout=None,
):
"""Initiate a resumable upload.

Expand Down Expand Up @@ -499,6 +514,13 @@ def initiate(
"final" (i.e. no more bytes will be added to it). In this case
we determine the upload size from the size of the stream. If
``total_bytes`` is passed, this argument will be ignored.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -626,7 +648,7 @@ def _process_response(self, response, bytes_sent):
)
self._bytes_uploaded = int(match.group(u"end_byte")) + 1

def transmit_next_chunk(self, transport):
def transmit_next_chunk(self, transport, timeout=None):
"""Transmit the next chunk of the resource to be uploaded.

If the current upload was initiated with ``stream_final=False``,
Expand All @@ -637,6 +659,13 @@ def transmit_next_chunk(self, transport):
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Raises:
NotImplementedError: Always, since virtual.
Expand Down
56 changes: 52 additions & 4 deletions google/resumable_media/requests/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ def _write_to_stream(self, response):
)
raise common.DataCorruption(response, msg)

def consume(self, transport):
def consume(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the resource to be downloaded.

If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -128,6 +132,13 @@ def consume(self, transport):
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -144,6 +155,7 @@ def consume(self, transport):
u"data": payload,
u"headers": headers,
u"retry_strategy": self._retry_strategy,
u"timeout": timeout,
}
if self._stream is not None:
request_kwargs[u"stream"] = True
Expand Down Expand Up @@ -231,7 +243,11 @@ def _write_to_stream(self, response):
)
raise common.DataCorruption(response, msg)

def consume(self, transport):
def consume(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the resource to be downloaded.

If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -240,6 +256,13 @@ def consume(self, transport):
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -260,6 +283,7 @@ def consume(self, transport):
headers=headers,
retry_strategy=self._retry_strategy,
stream=True,
timeout=timeout,
)

self._process_response(result)
Expand Down Expand Up @@ -298,12 +322,23 @@ class ChunkedDownload(_helpers.RequestsMixin, _download.ChunkedDownload):
ValueError: If ``start`` is negative.
"""

def consume_next_chunk(self, transport):
def consume_next_chunk(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the next chunk of the resource to be downloaded.

Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -320,6 +355,7 @@ def consume_next_chunk(self, transport):
data=payload,
headers=headers,
retry_strategy=self._retry_strategy,
timeout=timeout,
)
self._process_response(result)
return result
Expand Down Expand Up @@ -353,12 +389,23 @@ class RawChunkedDownload(_helpers.RawRequestsMixin, _download.ChunkedDownload):
ValueError: If ``start`` is negative.
"""

def consume_next_chunk(self, transport):
def consume_next_chunk(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the next chunk of the resource to be downloaded.

Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -376,6 +423,7 @@ def consume_next_chunk(self, transport):
headers=headers,
stream=True,
retry_strategy=self._retry_strategy,
timeout=timeout,
)
self._process_response(result)
return result
Expand Down
Loading