From 17833860d576b6b02f3de3d32d8f4594e6191d54 Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Wed, 2 Mar 2016 16:40:38 -0300 Subject: [PATCH 1/2] Download data as msgpack --- hubstorage/collectionsrt.py | 4 ++ hubstorage/resourcetype.py | 87 +++++++++++++++++++++++++++++-------- hubstorage/serialization.py | 11 +++++ requirements-pypy.txt | 3 ++ requirements-test.txt | 1 + requirements.txt | 1 + setup.py | 6 ++- 7 files changed, 94 insertions(+), 19 deletions(-) create mode 100644 requirements-pypy.txt diff --git a/hubstorage/collectionsrt.py b/hubstorage/collectionsrt.py index d134a29..8041863 100644 --- a/hubstorage/collectionsrt.py +++ b/hubstorage/collectionsrt.py @@ -36,6 +36,10 @@ def iter_json(self, _type, _name, requests_params=None, **apiparams): return DownloadableResource.iter_json(self, (_type, _name), requests_params=requests_params, **apiparams) + def iter_msgpack(self, _type, _name, requests_params=None, **apiparams): + return DownloadableResource.iter_msgpack(self, (_type, _name), + requests_params=requests_params, **apiparams) + def create_writer(self, coltype, colname, **writer_kwargs): self._validate_collection(coltype, colname) kwargs = dict(writer_kwargs) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 5baddc3..4c672e6 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -4,9 +4,10 @@ from collections import MutableMapping import requests.exceptions as rexc from .utils import urlpathjoin, xauth -from .serialization import jlencode, jldecode +from .serialization import jlencode, jldecode, mpdecode logger = logging.getLogger('hubstorage.resourcetype') +CHUNK_SIZE = 512 class ResourceType(object): @@ -20,6 +21,29 @@ def __init__(self, client, key, auth=None): self.auth = xauth(auth) or client.auth self.url = urlpathjoin(client.endpoint, self.key) + def _allows_mpack(self, path=None): + """ Check if request can be served with msgpack data. + + Currently, items, logs, collections and + samples endpoints are able to return msgpack data. However, /stats + calls can only return JSON data for now. + """ + if path == 'stats': + return False + return self.resource_type in ('items', 'logs', + 'collections', 'samples') + + @staticmethod + def _enforce_msgpack(**kwargs): + kwargs.setdefault('headers', {}) + kwargs['headers']['Accept'] = 'application/x-msgpack' + return kwargs + + def _iter_content(self, _path, **kwargs): + kwargs['url'] = urlpathjoin(self.url, _path) + kwargs.setdefault('auth', self.auth) + return self.client.request(**kwargs).iter_content(CHUNK_SIZE) + def _iter_lines(self, _path, **kwargs): kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) @@ -34,6 +58,9 @@ def _iter_lines(self, _path, **kwargs): return lines def apirequest(self, _path=None, **kwargs): + if self._allows_mpack(_path) and kwargs.get('method').upper() == 'GET': + kwargs = self._enforce_msgpack(**kwargs) + return mpdecode(self._iter_content(_path=_path, **kwargs)) return jldecode(self._iter_lines(_path, **kwargs)) def apipost(self, _path=None, **kwargs): @@ -52,15 +79,18 @@ class DownloadableResource(ResourceType): MAX_RETRIES = 180 RETRY_INTERVAL = 60 - def _add_resume_param(self, lastline, offset, params): - """Adds a startafter=LASTKEY parameter if there was - a lastvalue. It also adds meta=_key to ensure a key is returned - """ + @staticmethod + def _add_key_meta(params): + """Adds meta=_key to ensure a key is returned""" meta = params.get('meta', []) if '_key' not in meta: meta = list(meta) meta.append('_key') params['meta'] = meta + return params + + def _add_resume_param(self, lastline, offset, params): + """Adds a startafter=LASTKEY parameter if there was a lastvalue""" if lastline is not None: lastvalue = json.loads(lastline) params['startafter'] = lastvalue['_key'] @@ -70,24 +100,25 @@ def _add_resume_param(self, lastline, offset, params): def iter_values(self, *args, **kwargs): """Reliably iterate through all data as python objects - calls iter_json, decoding the results + calls either iter_json or iter_msgpack, decoding the results """ + if self._allows_mpack(): + return mpdecode(self.iter_msgpack(*args, **kwargs)) return jldecode(self.iter_json(*args, **kwargs)) - def iter_json(self, _path=None, requests_params=None, **apiparams): - """Reliably iterate through all data as json strings""" - requests_params = dict(requests_params or {}) - requests_params.setdefault('method', 'GET') - requests_params.setdefault('stream', True) + def _retry(self, iter_callback, resume=False, _path=None, requests_params=None, **apiparams): + """Reliable iterate through all data calling iter_callback""" + self._add_key_meta(apiparams) lastexc = None - line = None + chunk = None offset = 0 - for attempt in range(self.MAX_RETRIES): - self._add_resume_param(line, offset, apiparams) + for attempt in xrange(self.MAX_RETRIES): + if resume: + self._add_resume_param(chunk, offset, apiparams) try: - for line in self._iter_lines(_path=_path, params=apiparams, - **requests_params): - yield line + for chunk in iter_callback(_path=_path, params=apiparams, + **requests_params): + yield chunk offset += 1 break except (ValueError, socket.error, rexc.RequestException) as exc: @@ -103,7 +134,27 @@ def iter_json(self, _path=None, requests_params=None, **apiparams): else: url = urlpathjoin(self.url, _path) logger.error("Failed %d times reading items from %s, params %s, " - "last error was: %s", self.MAX_RETRIES, url, apiparams, lastexc) + "last error was: %s", self.MAX_RETRIES, url, + apiparams, lastexc) + + def iter_msgpack(self, _path=None, requests_params=None, **apiparams): + """Reliably iterate through all data as msgpack""" + requests_params = dict(requests_params or {}) + requests_params.setdefault('method', 'GET') + requests_params.setdefault('stream', True) + requests_params = self._enforce_msgpack(**requests_params) + for chunk in self._retry(self._iter_content, False, _path, + requests_params, **apiparams): + yield chunk + + def iter_json(self, _path=None, requests_params=None, **apiparams): + """Reliably iterate through all data as json strings""" + requests_params = dict(requests_params or {}) + requests_params.setdefault('method', 'GET') + requests_params.setdefault('stream', True) + for line in self._retry(self._iter_lines, True, _path, requests_params, + **apiparams): + yield line class ItemsResourceType(ResourceType): diff --git a/hubstorage/serialization.py b/hubstorage/serialization.py index 3e6e6bc..885f075 100644 --- a/hubstorage/serialization.py +++ b/hubstorage/serialization.py @@ -1,5 +1,6 @@ import six from json import dumps, loads +from msgpack import Unpacker from datetime import datetime EPOCH = datetime.utcfromtimestamp(0) @@ -17,6 +18,16 @@ def jldecode(lineiterable): yield loads(line) +def mpdecode(iterable): + unpacker = Unpacker(encoding='utf8') + for chunk in iterable: + unpacker.feed(chunk) + # Each chunk can have none or many objects, + # so here we dispatch any object ready + for obj in unpacker: + yield obj + + def jsonencode(o): return dumps(o, default=jsondefault) diff --git a/requirements-pypy.txt b/requirements-pypy.txt new file mode 100644 index 0000000..7dc4e6a --- /dev/null +++ b/requirements-pypy.txt @@ -0,0 +1,3 @@ +-r requirements.txt + +msgpack-pypy>=0.0.2 diff --git a/requirements-test.txt b/requirements-test.txt index 2aa51ff..5106c20 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,5 @@ -r requirements.txt +-r requirements-pypy.txt responses==0.5.0 pytest diff --git a/requirements.txt b/requirements.txt index 8f7f75e..a73f4d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests>=1.0 retrying>=1.3.3 six>=1.10.0 +msgpack-python>=0.4.7 diff --git a/setup.py b/setup.py index 4550a48..f0167cd 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,4 @@ +import sys from os.path import join, dirname from setuptools import setup, find_packages @@ -5,6 +6,8 @@ # can not be available yet __version__ = open(join(dirname(__file__), 'hubstorage/VERSION')).read().strip() +is_pypy = '__pypy__' in sys.builtin_module_names +mpack_required = 'msgpack-pypy>=0.0.2' if is_pypy else 'msgpack-python>=0.4.7' setup(name='hubstorage', version=__version__, @@ -16,7 +19,8 @@ platforms=['Any'], packages=find_packages(), package_data={'hubstorage': ['VERSION']}, - install_requires=['requests', 'retrying>=1.3.3', 'six>=1.10.0'], + install_requires=['requests', 'retrying>=1.3.3', + 'six>=1.10.0', mpack_required], classifiers=['Development Status :: 4 - Beta', 'License :: OSI Approved :: BSD License', 'Operating System :: OS Independent', From 4b8ec5be8af4a0df3c5f810059dbdef48c4e364a Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 27 Sep 2016 13:24:48 +0300 Subject: [PATCH 2/2] Msgpack as a recommended dependency --- hubstorage/resourcetype.py | 9 +++++---- hubstorage/serialization.py | 8 +++++++- setup.py | 4 ++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 4c672e6..7d94381 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -5,6 +5,7 @@ import requests.exceptions as rexc from .utils import urlpathjoin, xauth from .serialization import jlencode, jldecode, mpdecode +from .serialization import MSGPACK_AVAILABLE logger = logging.getLogger('hubstorage.resourcetype') CHUNK_SIZE = 512 @@ -24,11 +25,11 @@ def __init__(self, client, key, auth=None): def _allows_mpack(self, path=None): """ Check if request can be served with msgpack data. - Currently, items, logs, collections and - samples endpoints are able to return msgpack data. However, /stats - calls can only return JSON data for now. + Currently, items, logs, collections and samples endpoints are able to + return msgpack data. However, /stats calls can only return JSON data + for now. """ - if path == 'stats': + if not MSGPACK_AVAILABLE or path == 'stats': return False return self.resource_type in ('items', 'logs', 'collections', 'samples') diff --git a/hubstorage/serialization.py b/hubstorage/serialization.py index 885f075..4e80951 100644 --- a/hubstorage/serialization.py +++ b/hubstorage/serialization.py @@ -1,12 +1,18 @@ import six from json import dumps, loads -from msgpack import Unpacker from datetime import datetime EPOCH = datetime.utcfromtimestamp(0) ADAYINSECONDS = 24 * 3600 +try: + from msgpack import Unpacker + MSGPACK_AVAILABLE = True +except ImportError: + MSGPACK_AVAILABLE = False + + def jlencode(iterable): if isinstance(iterable, (dict, six.string_types)): iterable = [iterable] diff --git a/setup.py b/setup.py index f0167cd..fd90468 100644 --- a/setup.py +++ b/setup.py @@ -19,10 +19,10 @@ platforms=['Any'], packages=find_packages(), package_data={'hubstorage': ['VERSION']}, - install_requires=['requests', 'retrying>=1.3.3', - 'six>=1.10.0', mpack_required], + install_requires=['requests', 'retrying>=1.3.3', 'six>=1.10.0'], classifiers=['Development Status :: 4 - Beta', 'License :: OSI Approved :: BSD License', 'Operating System :: OS Independent', 'Programming Language :: Python'], + extras_require = {'msgpack': [mpack_required]}, )