Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion warc/tests/test_warc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from ..warc import WARCReader, WARCHeader, WARCRecord, WARCFile
from ..warc import WARCReader, WARCHeader, WARCRecord, WARCFile, \
SimpleWARCReader

from StringIO import StringIO


class TestWARCHeader:
def test_attrs(self):
h = WARCHeader({
Expand Down Expand Up @@ -95,6 +97,34 @@ def read_multiple_records(self):
rec = reader.read_record()
assert rec is not None


class TestSimpleWARCReader:
def test_read_header1(self):
f = StringIO(SAMPLE_WARC_RECORD_TEXT)
h, b = SimpleWARCReader(f).read_record()
assert h['WARC-Date'] == "2012-02-10T16:15:52Z"
assert h['WARC-Record-ID'] == "<urn:uuid:80fb9262-5402-11e1-8206-545200690126>"
assert h['WARC-Type'] == "response"
assert h['Content-Length'] == '10'

def test_empty(self):
reader = WARCReader(StringIO(""))
assert reader.read_record() is None

def test_read_record(self):
f = StringIO(SAMPLE_WARC_RECORD_TEXT)
reader = SimpleWARCReader(f)
headers, body = reader.read_record()
assert body == "Helloworld"

def read_multiple_records(self):
f = StringIO(SAMPLE_WARC_RECORD_TEXT * 5)
reader = SimpleWARCReader(f)
for i in range(5):
rec = reader.read_record()
assert rec is not None


class TestWarcFile:
def test_read(self):
f = WARCFile(fileobj=StringIO(SAMPLE_WARC_RECORD_TEXT))
Expand Down
61 changes: 60 additions & 1 deletion warc/warc.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ def tell(self):
else:
return self.fileobj.tell()


class WARCReader:
RE_VERSION = re.compile("WARC/(\d+.\d+)\r\n")
RE_HEADER = re.compile(r"([\w\-\.]+): *(.*)\r\n")
Expand Down Expand Up @@ -395,3 +394,63 @@ def __iter__(self):
while record is not None:
yield record
record = self.read_record()


class SimpleWARCReader(WARCReader):
RE_HEADER = re.compile(r"([\w\-]+): *(.*)\r?\n")

def __init__(self, fileobj):
self.fileobj = fileobj
self.pos = 0

def __iter__(self):
return self

def next(self):
record = self.read_record()
if record is None:
raise StopIteration
return record

def read_record(self):
try:
self._read_version()
except AssertionError:
return
headers = self._read_header()
body = self._read_body()
return (headers, body)

def _read_version(self):
self.fileobj.seek(self.pos)
line = self.fileobj.readline()
assert line == 'WARC/1.0\r\n'

def _read_header(self):
headers = {}
while True:
line = self.fileobj.readline()
if line == "\r\n": # end of headers
break
m = self.RE_HEADER.match(line)
if not m:
logging.warning("Bad header line: %r" % line)
continue
name, value = m.groups()
headers[name] = value.strip()
return headers

def _read_body(self):
body = ''
line = ''
while not (line == 'WARC/1.0\r\n' and body.endswith('\r\n\r\n')):
body += line
pos = self.fileobj.tell()
line = self.fileobj.readline()
if self.fileobj.tell() == pos:
break
self.pos = pos
return body.strip('\r\n')

def close(self):
self.fileobj.close()