From 1eab20b8984797f1536b32eeaa79967ce592991a Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Tue, 5 Jun 2018 13:30:48 -0700 Subject: [PATCH 1/2] Add coders microbenchmark. --- .../tools/coders_microbenchmark.py | 201 ++++++++++++++++++ .../apache_beam/tools/microbenchmarks_test.py | 36 ++++ sdks/python/apache_beam/tools/utils.py | 92 ++++++++ sdks/python/setup.py | 1 + 4 files changed, 330 insertions(+) create mode 100644 sdks/python/apache_beam/tools/coders_microbenchmark.py create mode 100644 sdks/python/apache_beam/tools/microbenchmarks_test.py diff --git a/sdks/python/apache_beam/tools/coders_microbenchmark.py b/sdks/python/apache_beam/tools/coders_microbenchmark.py new file mode 100644 index 000000000000..9453d61067c9 --- /dev/null +++ b/sdks/python/apache_beam/tools/coders_microbenchmark.py @@ -0,0 +1,201 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""A microbenchmark for measuring performance of coders. + +This runs a sequence of encode-decode operations on random inputs +to collect performance of various coders. + +To evaluate coders performance we approximate the behavior +how the coders are used in PCollections: we encode and decode +a list of elements. An element can be a string, a list of integers, +a windowed value, or any other object we want a coder to process. + +Run as: + python -m apache_beam.tools.coders_microbenchmark + +""" + +from __future__ import absolute_import +from __future__ import print_function + +import random +import string +import sys + +from past.builtins import unicode + +from apache_beam.coders import coders +from apache_beam.tools import utils +from apache_beam.transforms import window +from apache_beam.utils import windowed_value + + +def coder_benchmark_factory(coder, generate_fn): + """Creates a benchmark that encodes and decodes a list of elements. + + Args: + coder: coder to use to encode an element. + generate_fn: a callable that generates an element. + """ + + class CoderBenchmark(object): + def __init__(self, num_elements_per_benchmark): + self._coder = coders.IterableCoder(coder) + self._list = [generate_fn() + for _ in range(num_elements_per_benchmark)] + + def __call__(self): + # Calling coder operations on a single element at a time may incur + # unrelevant overhead. To compensate, we use a list elements. + _ = self._coder.decode(self._coder.encode(self._list)) + + CoderBenchmark.__name__ = "%s, %s" % ( + generate_fn.__name__, str(coder)) + + return CoderBenchmark + + +def small_int(): + return random.randint(0, 127) + + +def large_int(): + return random.randint(sys.maxsize >> 2, sys.maxsize) + + +def random_string(length): + return unicode(''.join(random.choice( + string.ascii_letters + string.digits) for _ in range(length))) + + +def small_string(): + return random_string(4) + + +def large_string(): + return random_string(100) + + +def list_int(size): + return [small_int() for _ in range(size)] + + +def dict_int_int(size): + return {i: i for i in list_int(size)} + + +def small_list(): + return list_int(10) + + +def large_list(): + return list_int(1000) + + +def small_tuple(): + # Benchmark a common case of 2-element tuples. + return tuple(list_int(2)) + + +def large_tuple(): + return tuple(large_list()) + + +def small_dict(): + return {i: i for i in small_list()} + + +def large_dict(): + return {i: i for i in large_list()} + + +def random_windowed_value(num_windows): + return windowed_value.WindowedValue( + value=small_int(), + timestamp=12345678, + windows=tuple( + window.IntervalWindow(i * 10, i * 10 + small_int()) + for i in range(num_windows) + )) + + +def wv_with_one_window(): + return random_windowed_value(num_windows=1) + + +def wv_with_multiple_windows(): + return random_windowed_value(num_windows=32) + + +def run_coder_benchmarks(num_runs, input_size, seed, verbose): + random.seed(seed) + + # TODO(BEAM-4441): Pick coders using type hints, for example: + # tuple_coder = typecoders.registry.get_coder(typehints.Tuple[int, ...]) + benchmarks = [ + coder_benchmark_factory( + coders.FastPrimitivesCoder(), small_int), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), large_int), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), small_string), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), large_string), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + small_list), + coder_benchmark_factory( + coders.IterableCoder(coders.FastPrimitivesCoder()), + small_list), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + large_list), + coder_benchmark_factory( + coders.IterableCoder(coders.FastPrimitivesCoder()), + large_list), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + small_tuple), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + large_tuple), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + small_dict), + coder_benchmark_factory( + coders.FastPrimitivesCoder(), + large_dict), + coder_benchmark_factory( + coders.WindowedValueCoder(coders.FastPrimitivesCoder()), + wv_with_one_window), + coder_benchmark_factory( + coders.WindowedValueCoder(coders.FastPrimitivesCoder()), + wv_with_multiple_windows), + ] + + suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks] + utils.run_benchmarks(suite, verbose=verbose) + + +if __name__ == "__main__": + utils.check_compiled("apache_beam.coders.coder_impl") + + num_runs = 20 + num_elements_per_benchmark = 1000 + seed = 42 # Fix the seed for better consistency + + run_coder_benchmarks(num_runs, num_elements_per_benchmark, seed, + verbose=True) diff --git a/sdks/python/apache_beam/tools/microbenchmarks_test.py b/sdks/python/apache_beam/tools/microbenchmarks_test.py new file mode 100644 index 000000000000..d306122d989f --- /dev/null +++ b/sdks/python/apache_beam/tools/microbenchmarks_test.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for microbenchmarks code.""" + +from __future__ import absolute_import + +import unittest + +from apache_beam.tools import coders_microbenchmark + + +class MicrobenchmarksTest(unittest.TestCase): + def test_coders_microbenchmark(self): + # Right now, we don't evaluate performance impact, only check that + # microbenchmark code can successfully run. + coders_microbenchmark.run_coder_benchmarks( + num_runs=1, input_size=10, seed=1, verbose=False) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/tools/utils.py b/sdks/python/apache_beam/tools/utils.py index f2a9c214ae4f..203d629c87e0 100644 --- a/sdks/python/apache_beam/tools/utils.py +++ b/sdks/python/apache_beam/tools/utils.py @@ -18,8 +18,15 @@ """Utility functions for all microbenchmarks.""" from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import collections +import gc import os +import time + +import numpy def check_compiled(module): @@ -34,3 +41,88 @@ def check_compiled(module): "Profiling uncompiled code.\n" "To compile beam, run " "'pip install Cython; python setup.py build_ext --inplace'") + + +class BenchmarkConfig( + collections.namedtuple( + "BenchmarkConfig", ["benchmark", "size", "num_runs"])): + """ + Attributes: + benchmark: a callable that takes an int argument - benchmark size, + and returns a callable. A returned callable must run the code being + benchmarked on an input of specified size. + + For example, one can implement a benchmark as: + + class MyBenchmark(object): + def __init__(self, size): + [do necessary initialization] + def __call__(self): + [run the code in question] + + size: int, a size of the input. Aggregated per-element metrics + are counted based on the size of the input. + num_runs: int, number of times to run each benchmark. + """ + + def __str__(self): + return "%s, %s element(s)" % ( + getattr(self.benchmark, '__name__', str(self.benchmark)), + str(self.size)) + + +def run_benchmarks(benchmark_suite, verbose=True): + """Runs benchmarks, and collects execution times. + + A simple instrumentation to run a callable several times, collect and print + its execution times. + + Args: + benchmark_suite: A list of BenchmarkConfig. + verbose: bool, whether to print benchmark results to stdout. + + Returns: + A dictionary of the form string -> list of floats. Keys of the dictionary + are benchmark names, values are execution times in seconds for each run. + """ + + def run(benchmark_fn, size): + # Contain each run of a benchmark inside a function so that any temporary + # objects can be garbage-collected after the run. + benchmark_instance_callable = benchmark_fn(size) + start = time.time() + _ = benchmark_instance_callable() + return time.time() - start + + cost_series = collections.defaultdict(list) + for benchmark_config in benchmark_suite: + name = str(benchmark_config) + num_runs = benchmark_config.num_runs + size = benchmark_config.size + for run_id in range(num_runs): + # Do a proactive GC before each run to minimize side-effects of different + # runs. + gc.collect() + time_cost = run(benchmark_config.benchmark, size) + cost_series[name].append(time_cost) + if verbose: + per_element_cost = time_cost / size + print("%s: run %d of %d, per element time cost: %g sec" % ( + name, run_id + 1, num_runs, per_element_cost)) + if verbose: + print("") + + if verbose: + pad_length = max([len(str(bc)) for bc in benchmark_suite]) + + for benchmark_config in benchmark_suite: + name = str(benchmark_config) + per_element_median_cost = ( + numpy.median(cost_series[name]) / benchmark_config.size) + std = numpy.std(cost_series[name]) / benchmark_config.size + + print("%s: per element median time cost: %g sec, relative std: %.2f%%" % ( + name.ljust(pad_length, " "), per_element_median_cost, + std * 100 / per_element_median_cost)) + + return cost_series diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 43df52b20ec2..a2219b882f21 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -125,6 +125,7 @@ def get_version(): REQUIRED_TEST_PACKAGES = [ 'nose>=1.3.7', + 'numpy', 'pyhamcrest>=1.9,<2.0', ] From bc6e63f3053e311370bb1dec29f7f9b17ae3b424 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Tue, 17 Jul 2018 15:16:55 -0700 Subject: [PATCH 2/2] Remove REQUIRED_PERF_TEST_PACKAGES since numpy is included in REQUIRED_TEST_PACKAGES --- sdks/python/setup.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index a2219b882f21..f7a809e72034 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -129,10 +129,6 @@ def get_version(): 'pyhamcrest>=1.9,<2.0', ] -REQUIRED_PERF_TEST_PACKAGES = [ - 'numpy>=1.14.3', -] - GCP_REQUIREMENTS = [ # oauth2client >=4 only works with google-apitools>=0.5.18. 'google-apitools>=0.5.18,<=0.5.20', @@ -198,7 +194,6 @@ def run(self): 'docs': ['Sphinx>=1.5.2,<2.0'], 'test': REQUIRED_TEST_PACKAGES, 'gcp': GCP_REQUIREMENTS, - 'perftest': REQUIRED_PERF_TEST_PACKAGES, }, zip_safe=False, # PyPI package information.