From bcf292e8d79f604f652fdc7cc6ec346915d80934 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Tue, 10 Apr 2018 15:19:39 -0400 Subject: [PATCH 1/8] initial working commit for s3 driver and database driver --- quantgov/corpora/__init__.py | 4 +- quantgov/corpora/structures.py | 91 ++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/quantgov/corpora/__init__.py b/quantgov/corpora/__init__.py index be0ec36..f095957 100644 --- a/quantgov/corpora/__init__.py +++ b/quantgov/corpora/__init__.py @@ -5,5 +5,7 @@ FlatFileCorpusDriver, RecursiveDirectoryCorpusDriver, NamePatternCorpusDriver, - IndexDriver + IndexDriver, + S3Driver, + S3DatabaseDriver ) diff --git a/quantgov/corpora/structures.py b/quantgov/corpora/structures.py index 7030c2e..d95a684 100644 --- a/quantgov/corpora/structures.py +++ b/quantgov/corpora/structures.py @@ -9,16 +9,40 @@ import csv import logging +from decorator import decorator from collections import namedtuple from pathlib import Path from .. import utils as qgutils +try: + import boto3 +except ImportError: + boto3 = None +try: + import sqlalchemy +except ImportError: + sqlalchemy = None + log = logging.getLogger(__name__) Document = namedtuple('Document', ['index', 'text']) +@decorator +def check_boto(func, *args, **kwargs): + if boto3 is None: + raise RuntimeError('Must install boto3 to use {}'.format(func)) + return func(*args, **kwargs) + + +@decorator +def check_sqlalchemy(func, *args, **kwargs): + if sqlalchemy is None: + raise RuntimeError('Must install sqlalchemy to use {}'.format(func)) + return func(*args, **kwargs) + + class CorpusStreamer(object): """ A knowledgable wrapper for a CorpusDriver stream @@ -243,3 +267,70 @@ def gen_indices_and_paths(self): next(reader) for row in reader: yield tuple(row[:-1]), Path(row[-1]) + + +class S3Driver(IndexDriver): + """ + Serve a whole or partial corpus from a remote file location in s3. + Filtering can be done using the values provided in the index file. + """ + + @check_boto + def __init__(self, index, bucket, encoding='utf-8', cache=True): + self.index = Path(index) + self.bucket = bucket + self.client = boto3.client('s3') + with self.index.open(encoding=encoding) as inf: + index_labels = next(csv.reader(inf))[:-1] + super(IndexDriver, self).__init__( + index_labels=index_labels, encoding=encoding, cache=cache) + + def read(self, docinfo): + idx, path = docinfo + body = self.client.get_object(Bucket=self.bucket, + Key=str(path))['Body'] + return Document(idx, body.read()) + + def filter(self, pattern): + """ Filter paths based on index values. """ + raise NotImplementedError + + def gen_indces_and_paths(self): + """Yield paths to s3 objects. """ + with self.index.open() as inf: + reader = csv.reader(inf) + next(reader) + for row in reader: + yield tuple(row[:-1]), row[-1] + + def stream(self): + """Yield text from an object stored in s3. """ + return qgutils.lazy_parallel(self.read, self.gen_indices_and_paths()) + + +class S3DatabaseDriver(S3Driver): + """ + Retrieves an index table from a database with an arbitrary, user-provided + query and serves documents like a normal S3Driver. + """ + + @check_boto + @check_sqlalchemy + def __init__(self, protocol, user, password, host, db, port, query, + bucket, cache=True, encoding='utf-8'): + self.bucket = bucket + self.client = boto3.client('s3') + self.index = [] + engine = sqlalchemy.create_engine(f'{protocol}://{user}:{password}' + f'@{host}:{port}/{db}') + conn = engine.connect() + result = conn.execute(query) + for doc in result: + self.index.append(doc) + index_labels = doc.keys() + super(IndexDriver, self).__init__( + index_labels=index_labels, encoding=encoding, cache=cache) + + def gen_indices_and_paths(self): + for row in self.index: + yield tuple(row[:-1]), row[-1] From 11b0e2f3874ad461065e88c61eed3542391e8983 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Tue, 10 Apr 2018 15:51:59 -0400 Subject: [PATCH 2/8] removing 3.6 formatting --- quantgov/corpora/structures.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/quantgov/corpora/structures.py b/quantgov/corpora/structures.py index d95a684..54faa10 100644 --- a/quantgov/corpora/structures.py +++ b/quantgov/corpora/structures.py @@ -321,8 +321,9 @@ def __init__(self, protocol, user, password, host, db, port, query, self.bucket = bucket self.client = boto3.client('s3') self.index = [] - engine = sqlalchemy.create_engine(f'{protocol}://{user}:{password}' - f'@{host}:{port}/{db}') + engine = sqlalchemy.create_engine('{}://{}:{}@{}:{}/{}' + .format(protocol, user, password, + host, port, db)) conn = engine.connect() result = conn.execute(query) for doc in result: From 7da72f339b09e6f2784447ff2c3ab7d2251fb8c2 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Tue, 10 Apr 2018 16:00:26 -0400 Subject: [PATCH 3/8] adding extra requirements list --- .travis.yml | 1 + setup.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 75cc0ab..a7d71f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ python: install: - pip install ".[testing]" - pip install ".[nlp]" +- pip install ".[s3driver]" - python -m nltk.downloader punkt stopwords wordnet script: pytest deploy: diff --git a/setup.py b/setup.py index 3d424b1..eb1c880 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,10 @@ def find_version(*file_paths): 'nlp': [ 'textblob', 'nltk', + ], + 's3driver': [ + 'sqlalchemy', + 'boto3' ] }, entry_points={ From f6918898b3068f1998a4d46d2607e94c3f5c1f0e Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Wed, 11 Apr 2018 11:46:23 -0400 Subject: [PATCH 4/8] adding basic s3 driver test --- quantgov/corpora/structures.py | 3 ++- tests/test_corpora.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/quantgov/corpora/structures.py b/quantgov/corpora/structures.py index 54faa10..277b003 100644 --- a/quantgov/corpora/structures.py +++ b/quantgov/corpora/structures.py @@ -280,6 +280,7 @@ def __init__(self, index, bucket, encoding='utf-8', cache=True): self.index = Path(index) self.bucket = bucket self.client = boto3.client('s3') + self.encoding = encoding with self.index.open(encoding=encoding) as inf: index_labels = next(csv.reader(inf))[:-1] super(IndexDriver, self).__init__( @@ -289,7 +290,7 @@ def read(self, docinfo): idx, path = docinfo body = self.client.get_object(Bucket=self.bucket, Key=str(path))['Body'] - return Document(idx, body.read()) + return Document(idx, body.read().decode(self.encoding)) def filter(self, pattern): """ Filter paths based on index values. """ diff --git a/tests/test_corpora.py b/tests/test_corpora.py index 4eba0fc..90acdd4 100644 --- a/tests/test_corpora.py +++ b/tests/test_corpora.py @@ -38,10 +38,26 @@ def build_index_corpus(directory): return quantgov.corpora.IndexDriver(str(index_path)) +def build_s3_corpus(directory): + rows = [] + for letter, number, path in ( + ('a', '1', 'quantgov_tests/first.txt'), + ('b', '2', 'quantgov_tests/second.txt') + ): + rows.append((letter, number, path)) + index_path = directory.join('index.csv') + with index_path.open('w', encoding='utf-8') as outf: + outf.write(u'letter,number,path\n') + outf.write(u'\n'.join(','.join(row) for row in rows)) + return quantgov.corpora.S3Driver(str(index_path), + bucket='quantgov-databanks') + + BUILDERS = { 'RecursiveDirectoryCorpusDriver': build_recursive_directory_corpus, 'NamePatternCorpusDriver': build_name_pattern_corpus, 'IndexDriver': build_index_corpus, + 'S3Driver': build_s3_corpus } From 2fdc8039adaa0222e63660a8f9d918ba599a1263 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Fri, 13 Apr 2018 12:10:23 -0400 Subject: [PATCH 5/8] Removing unnecessary function --- quantgov/corpora/structures.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/quantgov/corpora/structures.py b/quantgov/corpora/structures.py index 277b003..46cdd15 100644 --- a/quantgov/corpora/structures.py +++ b/quantgov/corpora/structures.py @@ -296,14 +296,6 @@ def filter(self, pattern): """ Filter paths based on index values. """ raise NotImplementedError - def gen_indces_and_paths(self): - """Yield paths to s3 objects. """ - with self.index.open() as inf: - reader = csv.reader(inf) - next(reader) - for row in reader: - yield tuple(row[:-1]), row[-1] - def stream(self): """Yield text from an object stored in s3. """ return qgutils.lazy_parallel(self.read, self.gen_indices_and_paths()) From c4641f26ae9afa092d89e84f0cdbedbad2d32075 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Fri, 13 Apr 2018 12:19:13 -0400 Subject: [PATCH 6/8] This ain't 2007 --- tests/test_corpora.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_corpora.py b/tests/test_corpora.py index 90acdd4..080854c 100644 --- a/tests/test_corpora.py +++ b/tests/test_corpora.py @@ -6,14 +6,14 @@ def build_recursive_directory_corpus(directory): - for path, text in (('a/1.txt', u'foo'), ('b/2.txt', u'bar')): + for path, text in (('a/1.txt', 'foo'), ('b/2.txt', 'bar')): directory.join(path).write_text(text, encoding='utf-8', ensure=True) return quantgov.corpora.RecursiveDirectoryCorpusDriver( directory=str(directory), index_labels=('letter', 'number')) def build_name_pattern_corpus(directory): - for path, text in (('a_1.txt', u'foo'), ('b_2.txt', u'bar')): + for path, text in (('a_1.txt', 'foo'), ('b_2.txt', 'bar')): path = directory.join(path).write_text( text, encoding='utf-8', ensure=True) return quantgov.corpora.NamePatternCorpusDriver( @@ -25,16 +25,16 @@ def build_name_pattern_corpus(directory): def build_index_corpus(directory): rows = [] for letter, number, path, text in ( - ('a', '1', 'first.txt', u'foo'), - ('b', '2', 'second.txt', u'bar') + ('a', '1', 'first.txt', 'foo'), + ('b', '2', 'second.txt', 'bar') ): outpath = directory.join(path, abs=1) outpath.write_text(text, encoding='utf-8') rows.append((letter, number, str(outpath))) index_path = directory.join('index.csv') with index_path.open('w', encoding='utf-8') as outf: - outf.write(u'letter,number,path\n') - outf.write(u'\n'.join(','.join(row) for row in rows)) + outf.write('letter,number,path\n') + outf.write('\n'.join(','.join(row) for row in rows)) return quantgov.corpora.IndexDriver(str(index_path)) @@ -47,8 +47,8 @@ def build_s3_corpus(directory): rows.append((letter, number, path)) index_path = directory.join('index.csv') with index_path.open('w', encoding='utf-8') as outf: - outf.write(u'letter,number,path\n') - outf.write(u'\n'.join(','.join(row) for row in rows)) + outf.write('letter,number,path\n') + outf.write('\n'.join(','.join(row) for row in rows)) return quantgov.corpora.S3Driver(str(index_path), bucket='quantgov-databanks') From a46a22dabb34e0b9b53fd88b502f16e76453f4e8 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Fri, 13 Apr 2018 12:23:51 -0400 Subject: [PATCH 7/8] test updates --- tests/test_corpora.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/test_corpora.py b/tests/test_corpora.py index 17a7500..488fca9 100644 --- a/tests/test_corpora.py +++ b/tests/test_corpora.py @@ -33,15 +33,9 @@ def build_index_corpus(directory): rows.append((letter, number, str(outpath))) index_path = directory.join('index.csv') with index_path.open('w', encoding='utf-8') as outf: -<<<<<<< HEAD outf.write('letter,number,path\n') outf.write('\n'.join(','.join(row) for row in rows)) return quantgov.corpora.IndexDriver(str(index_path)) -======= - outf.write(u'letter,number,path\n') - outf.write(u'\n'.join(','.join(row) for row in rows)) - return quantgov.corpus.IndexDriver(str(index_path)) ->>>>>>> 251328340d8557ed2f098d58c419464d3969cfb2 def build_s3_corpus(directory): From 6ec87329702af3e1c311fd3baf862c7887f557d5 Mon Sep 17 00:00:00 2001 From: Michael Gasvoda Date: Fri, 13 Apr 2018 12:32:18 -0400 Subject: [PATCH 8/8] adding s3driver to new corpus structure --- quantgov/corpus/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quantgov/corpus/__init__.py b/quantgov/corpus/__init__.py index be0ec36..f095957 100644 --- a/quantgov/corpus/__init__.py +++ b/quantgov/corpus/__init__.py @@ -5,5 +5,7 @@ FlatFileCorpusDriver, RecursiveDirectoryCorpusDriver, NamePatternCorpusDriver, - IndexDriver + IndexDriver, + S3Driver, + S3DatabaseDriver )