Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hubstorage/collectionsrt.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def iter_json(self, _type, _name, requests_params=None, **apiparams):
return DownloadableResource.iter_json(self, (_type, _name), return DownloadableResource.iter_json(self, (_type, _name),
requests_params=requests_params, **apiparams) requests_params=requests_params, **apiparams)


def iter_msgpack(self, _type, _name, requests_params=None, **apiparams):
return DownloadableResource.iter_msgpack(self, (_type, _name),
requests_params=requests_params, **apiparams)

def create_writer(self, coltype, colname, **writer_kwargs): def create_writer(self, coltype, colname, **writer_kwargs):
self._validate_collection(coltype, colname) self._validate_collection(coltype, colname)
kwargs = dict(writer_kwargs) kwargs = dict(writer_kwargs)
Expand Down
88 changes: 70 additions & 18 deletions hubstorage/resourcetype.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from collections import MutableMapping from collections import MutableMapping
import requests.exceptions as rexc import requests.exceptions as rexc
from .utils import urlpathjoin, xauth from .utils import urlpathjoin, xauth
from .serialization import jlencode, jldecode from .serialization import jlencode, jldecode, mpdecode
from .serialization import MSGPACK_AVAILABLE


logger = logging.getLogger('hubstorage.resourcetype') logger = logging.getLogger('hubstorage.resourcetype')
CHUNK_SIZE = 512




class ResourceType(object): class ResourceType(object):
Expand All @@ -20,6 +22,29 @@ def __init__(self, client, key, auth=None):
self.auth = xauth(auth) or client.auth self.auth = xauth(auth) or client.auth
self.url = urlpathjoin(client.endpoint, self.key) self.url = urlpathjoin(client.endpoint, self.key)


def _allows_mpack(self, path=None):
""" Check if request can be served with msgpack data.

Currently, items, logs, collections and samples endpoints are able to
return msgpack data. However, /stats calls can only return JSON data
for now.
"""
if not MSGPACK_AVAILABLE or path == 'stats':
return False
return self.resource_type in ('items', 'logs',
'collections', 'samples')

@staticmethod
def _enforce_msgpack(**kwargs):
kwargs.setdefault('headers', {})
kwargs['headers']['Accept'] = 'application/x-msgpack'
return kwargs

def _iter_content(self, _path, **kwargs):
kwargs['url'] = urlpathjoin(self.url, _path)
kwargs.setdefault('auth', self.auth)
return self.client.request(**kwargs).iter_content(CHUNK_SIZE)

def _iter_lines(self, _path, **kwargs): def _iter_lines(self, _path, **kwargs):
kwargs['url'] = urlpathjoin(self.url, _path) kwargs['url'] = urlpathjoin(self.url, _path)
kwargs.setdefault('auth', self.auth) kwargs.setdefault('auth', self.auth)
Expand All @@ -34,6 +59,9 @@ def _iter_lines(self, _path, **kwargs):
return lines return lines


def apirequest(self, _path=None, **kwargs): def apirequest(self, _path=None, **kwargs):
if self._allows_mpack(_path) and kwargs.get('method').upper() == 'GET':
kwargs = self._enforce_msgpack(**kwargs)
return mpdecode(self._iter_content(_path=_path, **kwargs))
return jldecode(self._iter_lines(_path, **kwargs)) return jldecode(self._iter_lines(_path, **kwargs))


def apipost(self, _path=None, **kwargs): def apipost(self, _path=None, **kwargs):
Expand All @@ -52,15 +80,18 @@ class DownloadableResource(ResourceType):
MAX_RETRIES = 180 MAX_RETRIES = 180
RETRY_INTERVAL = 60 RETRY_INTERVAL = 60


def _add_resume_param(self, lastline, offset, params): @staticmethod
"""Adds a startafter=LASTKEY parameter if there was def _add_key_meta(params):
a lastvalue. It also adds meta=_key to ensure a key is returned """Adds meta=_key to ensure a key is returned"""
"""
meta = params.get('meta', []) meta = params.get('meta', [])
if '_key' not in meta: if '_key' not in meta:
meta = list(meta) meta = list(meta)
meta.append('_key') meta.append('_key')
params['meta'] = meta params['meta'] = meta
return params

def _add_resume_param(self, lastline, offset, params):
"""Adds a startafter=LASTKEY parameter if there was a lastvalue"""
if lastline is not None: if lastline is not None:
lastvalue = json.loads(lastline) lastvalue = json.loads(lastline)
params['startafter'] = lastvalue['_key'] params['startafter'] = lastvalue['_key']
Expand All @@ -70,24 +101,25 @@ def _add_resume_param(self, lastline, offset, params):
def iter_values(self, *args, **kwargs): def iter_values(self, *args, **kwargs):
"""Reliably iterate through all data as python objects """Reliably iterate through all data as python objects


calls iter_json, decoding the results calls either iter_json or iter_msgpack, decoding the results
""" """
if self._allows_mpack():
return mpdecode(self.iter_msgpack(*args, **kwargs))
return jldecode(self.iter_json(*args, **kwargs)) return jldecode(self.iter_json(*args, **kwargs))


def iter_json(self, _path=None, requests_params=None, **apiparams): def _retry(self, iter_callback, resume=False, _path=None, requests_params=None, **apiparams):
"""Reliably iterate through all data as json strings""" """Reliable iterate through all data calling iter_callback"""
requests_params = dict(requests_params or {}) self._add_key_meta(apiparams)
requests_params.setdefault('method', 'GET')
requests_params.setdefault('stream', True)
lastexc = None lastexc = None
line = None chunk = None
offset = 0 offset = 0
for attempt in range(self.MAX_RETRIES): for attempt in xrange(self.MAX_RETRIES):
self._add_resume_param(line, offset, apiparams) if resume:
self._add_resume_param(chunk, offset, apiparams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that you better moving _add_key_meta above this condition instead of keeping it inside _add_resume_param and duplicating it in https://github.com/scrapinghub/python-hubstorage/pull/59/files?diff=split#diff-5ced7809b9453346a782751d6b9a698cR134

try: try:
for line in self._iter_lines(_path=_path, params=apiparams, for chunk in iter_callback(_path=_path, params=apiparams,
**requests_params): **requests_params):
yield line yield chunk
offset += 1 offset += 1
break break
except (ValueError, socket.error, rexc.RequestException) as exc: except (ValueError, socket.error, rexc.RequestException) as exc:
Expand All @@ -103,7 +135,27 @@ def iter_json(self, _path=None, requests_params=None, **apiparams):
else: else:
url = urlpathjoin(self.url, _path) url = urlpathjoin(self.url, _path)
logger.error("Failed %d times reading items from %s, params %s, " logger.error("Failed %d times reading items from %s, params %s, "
"last error was: %s", self.MAX_RETRIES, url, apiparams, lastexc) "last error was: %s", self.MAX_RETRIES, url,
apiparams, lastexc)

def iter_msgpack(self, _path=None, requests_params=None, **apiparams):
"""Reliably iterate through all data as msgpack"""
requests_params = dict(requests_params or {})
requests_params.setdefault('method', 'GET')
requests_params.setdefault('stream', True)
requests_params = self._enforce_msgpack(**requests_params)
for chunk in self._retry(self._iter_content, False, _path,
requests_params, **apiparams):
yield chunk

def iter_json(self, _path=None, requests_params=None, **apiparams):
"""Reliably iterate through all data as json strings"""
requests_params = dict(requests_params or {})
requests_params.setdefault('method', 'GET')
requests_params.setdefault('stream', True)
for line in self._retry(self._iter_lines, True, _path, requests_params,
**apiparams):
yield line




class ItemsResourceType(ResourceType): class ItemsResourceType(ResourceType):
Expand Down
17 changes: 17 additions & 0 deletions hubstorage/serialization.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
ADAYINSECONDS = 24 * 3600 ADAYINSECONDS = 24 * 3600




try:
from msgpack import Unpacker
MSGPACK_AVAILABLE = True
except ImportError:
MSGPACK_AVAILABLE = False


def jlencode(iterable): def jlencode(iterable):
if isinstance(iterable, (dict, six.string_types)): if isinstance(iterable, (dict, six.string_types)):
iterable = [iterable] iterable = [iterable]
Expand All @@ -17,6 +24,16 @@ def jldecode(lineiterable):
yield loads(line) yield loads(line)




def mpdecode(iterable):
unpacker = Unpacker(encoding='utf8')
for chunk in iterable:
unpacker.feed(chunk)
# Each chunk can have none or many objects,
# so here we dispatch any object ready
for obj in unpacker:
yield obj


def jsonencode(o): def jsonencode(o):
return dumps(o, default=jsondefault) return dumps(o, default=jsondefault)


Expand Down
3 changes: 3 additions & 0 deletions requirements-pypy.txt
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,3 @@
-r requirements.txt

msgpack-pypy>=0.0.2
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,5 @@
-r requirements.txt -r requirements.txt
-r requirements-pypy.txt


responses==0.5.0 responses==0.5.0
pytest pytest
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Original file line Diff line number Diff line change
@@ -1,3 +1,4 @@
requests>=1.0 requests>=1.0
retrying>=1.3.3 retrying>=1.3.3
six>=1.10.0 six>=1.10.0
msgpack-python>=0.4.7
4 changes: 4 additions & 0 deletions setup.py
Original file line number Original file line Diff line number Diff line change
@@ -1,10 +1,13 @@
import sys
from os.path import join, dirname from os.path import join, dirname
from setuptools import setup, find_packages from setuptools import setup, find_packages


# We can't import hubstorage.__version__ because it imports "requests" and it # We can't import hubstorage.__version__ because it imports "requests" and it
# can not be available yet # can not be available yet
__version__ = open(join(dirname(__file__), 'hubstorage/VERSION')).read().strip() __version__ = open(join(dirname(__file__), 'hubstorage/VERSION')).read().strip()


is_pypy = '__pypy__' in sys.builtin_module_names
mpack_required = 'msgpack-pypy>=0.0.2' if is_pypy else 'msgpack-python>=0.4.7'


setup(name='hubstorage', setup(name='hubstorage',
version=__version__, version=__version__,
Expand All @@ -21,4 +24,5 @@
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python'], 'Programming Language :: Python'],
extras_require = {'msgpack': [mpack_required]},
) )