Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
python -m apache_beam.tools.distribution_counter_microbenchmark
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import random
import sys
import time
from builtins import range

from apache_beam.tools import utils

Expand All @@ -44,7 +47,7 @@ def run_benchmark(num_runs=100, num_input=10000, seed=time.time()):
total_time = 0
random.seed(seed)
lower_bound = 0
upper_bound = sys.maxint
upper_bound = sys.maxsize
inputs = generate_input_values(num_input, lower_bound, upper_bound)
from apache_beam.transforms import DataflowDistributionCounter
print("Number of runs:", num_runs)
Expand All @@ -57,8 +60,8 @@ def run_benchmark(num_runs=100, num_input=10000, seed=time.time()):
counter.add_inputs_for_test(inputs)
time_cost = time.time() - start
print("Run %d: Total time cost %g sec" % (i+1, time_cost))
total_time += time_cost/num_input
print("Per element update time cost:", total_time/num_runs)
total_time += time_cost // num_input
print("Per element update time cost:", total_time // num_runs)


if __name__ == '__main__':
Expand Down
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/tools/map_fn_microbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@
python -m apache_beam.tools.map_fn_microbenchmark
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time
from builtins import range
from builtins import zip

import apache_beam as beam
from apache_beam.tools import utils
Expand All @@ -45,7 +49,7 @@ def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
num_elements = num_elements_step * run + 1
start = time.time()
with beam.Pipeline() as p:
pc = p | beam.Create(range(num_elements))
pc = p | beam.Create(list(range(num_elements)))
for ix in range(num_maps):
pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None,))
timings[num_elements] = time.time() - start
Expand All @@ -55,9 +59,9 @@ def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
print
# pylint: disable=unused-variable
gradient, intercept, r_value, p_value, std_err = stats.linregress(
*zip(*timings.items()))
*list(zip(*list(timings.items()))))
print("Fixed cost ", intercept)
print("Per-element ", gradient / num_maps)
print("Per-element ", gradient // num_maps)
print("R^2 ", r_value**2)


Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/tools/sideinput_microbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
python -m apache_beam.tools.sideinput_microbenchmark
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time
from builtins import range

from apache_beam.runners.worker import opcounters
from apache_beam.runners.worker import sideinputs
Expand Down Expand Up @@ -64,9 +67,10 @@ def run_benchmark(num_runs=50, input_per_source=4000, num_sources=4):

print("Runtimes:", times)

avg_runtime = sum(times)/len(times)
avg_runtime = sum(times) // len(times)
print("Average runtime:", avg_runtime)
print("Time per element:", avg_runtime/(input_per_source * num_sources))
print("Time per element:", avg_runtime // (input_per_source *
num_sources))


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Utility functions for all microbenchmarks."""

from __future__ import absolute_import

import os


Expand Down
1 change: 1 addition & 0 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ deps =
flake8==3.5.0
modules =
apache_beam/coders
apache_beam/tools
commands =
python --version
pip --version
Expand Down