From 32b1956e16f41ffc4fa08452390d3748b6c5988e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 26 Feb 2018 18:03:52 +0100 Subject: [PATCH] ARROW-1035: [Python] Add streaming dataframe reconstruction benchmark --- python/benchmarks/common.py | 31 ++++++++++++---- python/benchmarks/streaming.py | 67 ++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 8 deletions(-) create mode 100644 python/benchmarks/streaming.py diff --git a/python/benchmarks/common.py b/python/benchmarks/common.py index 7dd42fde5ab..b205ba5817d 100644 --- a/python/benchmarks/common.py +++ b/python/benchmarks/common.py @@ -23,13 +23,21 @@ import numpy as np +KILOBYTE = 1 << 10 +MEGABYTE = KILOBYTE * KILOBYTE + + def _multiplicate_sequence(base, target_size): q, r = divmod(target_size, len(base)) return [base] * q + [base[:r]] -def get_random_bytes(n): - rnd = np.random.RandomState(42) +def get_random_bytes(n, *, seed=42): + """ + Generate a random bytes object of size *n*. + Note the result might be compressible. + """ + rnd = np.random.RandomState(seed) # Computing a huge random bytestring can be costly, so we get at most # 100KB and duplicate the result as needed base_size = 100003 @@ -43,22 +51,25 @@ def get_random_bytes(n): return result -def get_random_ascii(n): - arr = np.frombuffer(get_random_bytes(n), dtype=np.int8) & 0x7f +def get_random_ascii(n, *, seed=42): + """ + Get a random ASCII-only unicode string of size *n*. + """ + arr = np.frombuffer(get_random_bytes(n, seed=seed), dtype=np.int8) & 0x7f result, _ = codecs.ascii_decode(arr) assert isinstance(result, str) assert len(result) == n return result -def _random_unicode_letters(n): +def _random_unicode_letters(n, *, seed=42): """ Generate a string of random unicode letters (slow). """ def _get_more_candidates(): return rnd.randint(0, sys.maxunicode, size=n).tolist() - rnd = np.random.RandomState(42) + rnd = np.random.RandomState(seed) out = [] candidates = [] @@ -75,8 +86,12 @@ def _get_more_candidates(): _1024_random_unicode_letters = _random_unicode_letters(1024) -def get_random_unicode(n): - indices = np.frombuffer(get_random_bytes(n * 2), dtype=np.int16) & 1023 +def get_random_unicode(n, *, seed=42): + """ + Get a random non-ASCII unicode string of size *n*. + """ + indices = np.frombuffer(get_random_bytes(n * 2, seed=seed), + dtype=np.int16) & 1023 unicode_arr = np.array(_1024_random_unicode_letters)[indices] result = ''.join(unicode_arr.tolist()) diff --git a/python/benchmarks/streaming.py b/python/benchmarks/streaming.py new file mode 100644 index 00000000000..be7fda42c67 --- /dev/null +++ b/python/benchmarks/streaming.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +import pandas as pd +import pyarrow as pa + +from . import common +from .common import KILOBYTE, MEGABYTE + + +def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')): + rowsize = total_size // nchunks // ncols + assert rowsize % dtype.itemsize == 0 + return [pd.DataFrame({ + 'c' + str(col): np.frombuffer( + common.get_random_bytes(rowsize, seed=col + 997 * chunk)).view(dtype) + for col in range(ncols) + }) + for chunk in range(nchunks)] + + +class StreamReader(object): + """ + Benchmark in-memory streaming to a Pandas dataframe. + """ + total_size = 64 * MEGABYTE + ncols = 8 + chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE] + + param_names = ['chunk_size'] + params = [chunk_sizes] + + def setup(self, chunk_size): + # Note we're careful to stream different chunks instead of + # streaming N times the same chunk, so that we avoid operating + # entirely out of L1/L2. + chunks = generate_chunks(self.total_size, + nchunks=self.total_size // chunk_size, + ncols=self.ncols) + batches = [pa.RecordBatch.from_pandas(df) + for df in chunks] + schema = batches[0].schema + sink = pa.BufferOutputStream() + stream_writer = pa.RecordBatchStreamWriter(sink, schema) + for batch in batches: + stream_writer.write_batch(batch) + self.source = sink.get_result() + + def time_read_to_dataframe(self, *args): + reader = pa.RecordBatchStreamReader(self.source) + table = reader.read_all() + df = table.to_pandas()