Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ Jobq metadata fieldset is less detailed, than ``job.metadata``, but contains few
Additional fields can be requested using the ``jobmeta`` parameter.
If it used, then it's up to the user to list all the required fields, so only few default fields would be added except requested ones.

>>> metadata = project.jobq.list().next()
>>> metadata = next(project.jobq.list())
>>> metadata.get('spider', 'missing')
u'foo'
>>> jobs_metadata = project.jobq.list(jobmeta=['scheduled_by', ])
>>> metadata = jobs_metadata.next()
>>> metadata = next(jobs_metadata)
>>> metadata.get('scheduled_by', 'missing')
u'John'
>>> metadata.get('spider', 'missing')
Expand Down
38 changes: 22 additions & 16 deletions hubstorage/batchuploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import random
import logging
import warnings
import six
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add six to requirements and setup.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this.

Added six and rebased against master. #64 allowed to catch 2 more issues.

from six.moves import range
from six.moves.queue import Queue
from io import BytesIO
from gzip import GzipFile
from itertools import count
import requests
from requests.compat import StringIO
from collections import deque
from Queue import Queue
from threading import Thread, Event
from .utils import xauth, iterqueue
from .serialization import jsonencode
Expand Down Expand Up @@ -91,7 +93,7 @@ def _worker(self):
continue

# Delay once all writers are processed
if (ctr.next() % len(self._writers) == 0) and not self.closed:
if (next(ctr) % len(self._writers) == 0) and not self.closed:
self._interruptable_sleep()

# Get next writer to process
Expand Down Expand Up @@ -125,12 +127,12 @@ def _checkpoint(self, w):
'content-encoding': w.content_encoding,
})
w.offset += qiter.count
for _ in xrange(qiter.count):
for _ in range(qiter.count):
q.task_done()
if w.callback is not None:
try:
w.callback(response)
except Exception, e:
except Exception:
logger.exception("Callback for %s failed", w.url)

def _content_encode(self, qiter, w):
Expand All @@ -148,12 +150,12 @@ def _tryupload(self, batch):
Use polinomial backoff with 10 minutes maximum interval that accounts
for ~30 hours of total retry time.

>>> sum(min(x**2, 600) for x in xrange(200)) / 3600
>>> sum(min(x**2, 600) for x in range(200)) / 3600
30
"""
url = batch['url']
offset = batch['offset']
for retryn in xrange(self.worker_max_retries):
for retryn in range(self.worker_max_retries):
emsg = ''
try:
r = self._upload(batch)
Expand Down Expand Up @@ -229,7 +231,7 @@ def write(self, item):
self.itemsq.put(data)
if self.itemsq.full():
self.uploader.interrupt()
return self._nextid.next()
return next(self._nextid)

def flush(self):
self.flushme = True
Expand All @@ -249,18 +251,22 @@ def __str__(self):
return self.url


def _encode_identity(iter):
data = StringIO()
for item in iter:
def _encode_identity(iterable):
data = BytesIO()
for item in iterable:
if isinstance(item, six.text_type):
item = item.encode('utf8')
data.write(item)
data.write('\n')
data.write(b'\n')
return data.getvalue()


def _encode_gzip(iter):
data = StringIO()
def _encode_gzip(iterable):
data = BytesIO()
with GzipFile(fileobj=data, mode='w') as gzo:
for item in iter:
for item in iterable:
if isinstance(item, six.text_type):
item = item.encode('utf8')
gzo.write(item)
gzo.write('\n')
gzo.write(b'\n')
return data.getvalue()
5 changes: 2 additions & 3 deletions hubstorage/collectionsrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Collections(DownloadableResource):
def get(self, _type, _name, _key=None, **params):
try:
r = self.apiget((_type, _name, _key), params=params)
return r if _key is None else r.next()
return r if _key is None else next(r)
except HTTPError as exc:
if exc.response.status_code == 404:
raise KeyError(_key)
Expand Down Expand Up @@ -78,8 +78,7 @@ def _batch(self, method, path, total_param, progress=None, **params):
getparams = dict(params)
try:
while True:
r = self.apirequest(path, method=method,
params=getparams).next()
r = next(self.apirequest(path, method=method, params=getparams))
total += r[total_param]
next = r.get('nextstart')
if next is None:
Expand Down
3 changes: 1 addition & 2 deletions hubstorage/frontier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json

from .resourcetype import ResourceType
from .utils import urlpathjoin
Expand Down Expand Up @@ -38,7 +37,7 @@ def _get_writer(self, frontier, slot):
return writer

def _writer_callback(self, response):
self.newcount += json.loads(response.content)["newcount"]
self.newcount += response.json()["newcount"]

def close(self, block=True):
for writer in self._writers.values():
Expand Down
2 changes: 1 addition & 1 deletion hubstorage/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Ids(ResourceType):

def spider(self, spidername, **params):
r = self.apiget(('spider', spidername), params=params)
return r.next()
return next(r)


class Settings(MappingResourceType):
Expand Down
17 changes: 11 additions & 6 deletions hubstorage/resourcetype.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import six
from six.moves import range
import logging, time, json, socket
from collections import MutableMapping
import requests.exceptions as rexc
Expand Down Expand Up @@ -26,7 +28,10 @@ def _iter_lines(self, _path, **kwargs):

r = self.client.request(**kwargs)

return r.iter_lines()
lines = r.iter_lines()
if six.PY3:
return (l.decode(r.encoding or 'utf8') for l in lines)
return lines

def apirequest(self, _path=None, **kwargs):
return jldecode(self._iter_lines(_path, **kwargs))
Expand Down Expand Up @@ -77,7 +82,7 @@ def iter_json(self, _path=None, requests_params=None, **apiparams):
lastexc = None
line = None
offset = 0
for attempt in xrange(self.MAX_RETRIES):
for attempt in range(self.MAX_RETRIES):
self._add_resume_param(line, offset, apiparams)
try:
for line in self._iter_lines(_path=_path, params=apiparams,
Expand Down Expand Up @@ -153,7 +158,7 @@ def get(self, _key, **params):
return o

def stats(self):
return self.apiget('stats').next()
return next(self.apiget('stats'))


class MappingResourceType(ResourceType, MutableMapping):
Expand All @@ -177,7 +182,7 @@ def _data(self):
if self._cached is None:
r = self.apiget()
try:
self._cached = r.next()
self._cached = next(r)
except StopIteration:
self._cached = {}

Expand All @@ -194,8 +199,8 @@ def save(self):
if not self.ignore_fields:
self.apipost(jl=self._data, is_idempotent=True)
else:
self.apipost(jl=dict((k, v) for k, v in self._data.iteritems()
if k not in self.ignore_fields),
self.apipost(jl={k: v for k, v in six.iteritems(self._data)
if k not in self.ignore_fields},
is_idempotent=True)

def __getitem__(self, key):
Expand Down
9 changes: 4 additions & 5 deletions hubstorage/serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import six
from json import dumps, loads
from datetime import datetime

Expand All @@ -6,7 +7,7 @@


def jlencode(iterable):
if isinstance(iterable, (dict, str, unicode)):
if isinstance(iterable, (dict, six.string_types)):
iterable = [iterable]
return u'\n'.join(jsonencode(o) for o in iterable)

Expand All @@ -26,8 +27,6 @@ def jsondefault(o):
u = delta.microseconds
s = delta.seconds
d = delta.days
millis = (u + (s + d * ADAYINSECONDS) * 1e6) / 1000
return int(millis)
return (u + (s + d * ADAYINSECONDS) * 1e6) // 1000
else:
return str(o)

return six.text_type(o)
17 changes: 8 additions & 9 deletions hubstorage/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import six
import time
from Queue import Empty
from six.moves.queue import Empty


def urlpathjoin(*parts):
Expand Down Expand Up @@ -35,12 +36,10 @@ def urlpathjoin(*parts):
continue
elif isinstance(p, tuple):
p = urlpathjoin(*p)
elif isinstance(p, unicode):
p = p.encode('utf8')
elif not isinstance(p, str):
p = str(p)
elif not isinstance(p, six.text_type):
p = six.text_type(p)

url = p if url is None else '{0}/{1}'.format(url.rstrip('/'), p)
url = p if url is None else u'{0}/{1}'.format(url.rstrip(u'/'), p)

return url

Expand Down Expand Up @@ -81,17 +80,17 @@ class iterqueue(object):

it exposes an attribute "count" with the number of messages read

>>> from Queue import Queue
>>> from six.moves.queue import Queue
>>> q = Queue()
>>> for x in xrange(10):
>>> for x in range(10):
... q.put(x)
>>> qiter = iterqueue(q)
>>> list(qiter)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> qiter.count
10

>>> for x in xrange(10):
>>> for x in range(10):
... q.put(x)
>>> qiter = iterqueue(q, maxcount=4)
>>> list(qiter)
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
requests>=1.0
retrying>=1.3.3
retrying>=1.3.3
six>=1.10.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
platforms=['Any'],
packages=find_packages(),
package_data={'hubstorage': ['VERSION']},
install_requires=['requests', 'retrying>=1.3.3'],
install_requires=['requests', 'retrying>=1.3.3', 'six>=1.10.0'],
classifiers=['Development Status :: 4 - Beta',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
Expand Down
2 changes: 1 addition & 1 deletion tests/hstestcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def tearDownClass(cls):
@classmethod
def _remove_all_jobs(cls):
project = cls.project
for k in project.settings.keys():
for k in list(project.settings.keys()):
if k != 'botgroups':
del project.settings[k]
project.settings.save()
Expand Down
9 changes: 5 additions & 4 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""
Test Activty
"""
from hstestcase import HSTestCase
from .hstestcase import HSTestCase
from six.moves import range


class ActivityTest(HSTestCase):

def test_post_and_reverse_get(self):
# make some sample data
orig_data = [{u'foo': 42, u'counter': i} for i in xrange(20)]
orig_data = [{u'foo': 42, u'counter': i} for i in range(20)]
data1 = orig_data[:10]
data2 = orig_data[10:]

Expand All @@ -22,12 +23,12 @@ def test_post_and_reverse_get(self):
self.assertEqual(orig_data[::-1], result)

def test_filters(self):
self.project.activity.post({'c': i} for i in xrange(10))
self.project.activity.post({'c': i} for i in range(10))
r = list(self.project.activity.list(filter='["c", ">", [5]]', count=2))
self.assertEqual(r, [{'c': 9}, {'c': 8}])

def test_timestamp(self):
self.project.activity.add({'foo': 'bar'}, baz='qux')
entry = self.project.activity.list(count=1, meta='_ts').next()
entry = next(self.project.activity.list(count=1, meta='_ts'))
self.assertTrue(entry.pop('_ts', None))
self.assertEqual(entry, {'foo': 'bar', 'baz': 'qux'})
11 changes: 6 additions & 5 deletions tests/test_batchuploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Test Project
"""
import time
from six.moves import range
from collections import defaultdict
from hstestcase import HSTestCase
from .hstestcase import HSTestCase
from hubstorage import ValueTooLarge


Expand All @@ -18,7 +19,7 @@ def _job_and_writer(self, **writerargs):

def test_writer_batchsize(self):
job, w = self._job_and_writer(size=10)
for x in xrange(111):
for x in range(111):
w.write({'x': x})
w.close()
# this works only for small batches (previous size=10 and small data)
Expand Down Expand Up @@ -47,19 +48,19 @@ def test_writer_maxitemsize(self):
ValueTooLarge,
'Value exceeds max encoded size of 1048576 bytes:'
' \'{"b+\\.\\.\\.\'',
w.write, {'b'*(m/2): 'x'*(m/2)})
w.write, {'b'*(m//2): 'x'*(m//2)})

def test_writer_contentencoding(self):
for ce in ('identity', 'gzip'):
job, w = self._job_and_writer(content_encoding=ce)
for x in xrange(111):
for x in range(111):
w.write({'x': x})
w.close()
self.assertEqual(job.items.stats()['totals']['input_values'], 111)

def test_writer_interval(self):
job, w = self._job_and_writer(size=1000, interval=1)
for x in xrange(111):
for x in range(111):
w.write({'x': x})
if x == 50:
time.sleep(2)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Test Client
"""
from hstestcase import HSTestCase
from .hstestcase import HSTestCase
from hubstorage.utils import millitime, apipoll

class ClientTest(HSTestCase):
Expand Down
Loading