diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py index 863e67ed6e90..ad5c3f626faf 100644 --- a/sdks/python/apache_beam/runners/__init__.py +++ b/sdks/python/apache_beam/runners/__init__.py @@ -20,6 +20,8 @@ This package defines runners, which are used to execute a pipeline. """ +from __future__ import absolute_import + from apache_beam.runners.direct.direct_runner import DirectRunner from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 43bbfcf7e68a..03333a8df727 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -15,6 +15,7 @@ # limitations under the License. # +# cython: language_level=3 # cython: profile=True """Worker operations executor. @@ -22,10 +23,17 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import sys import traceback +from builtins import next +from builtins import object +from builtins import zip -import six +from future.utils import raise_ +from past.builtins import basestring +from past.builtins import unicode from apache_beam.internal import util from apache_beam.pvalue import TaggedOutput @@ -615,7 +623,7 @@ def _reraise_augmented(self, exn): traceback.format_exception_only(type(exn), exn)[-1].strip() + step_annotation) new_exn._tagged_with_step = True - six.reraise(type(new_exn), new_exn, original_traceback) + raise_(type(new_exn), new_exn, original_traceback) class OutputProcessor(object): @@ -652,7 +660,7 @@ def process_outputs(self, windowed_input_element, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, six.string_types): + if not isinstance(tag, basestring): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value if isinstance(result, WindowedValue): @@ -694,7 +702,7 @@ def finish_bundle_outputs(self, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, six.string_types): + if not isinstance(tag, (str, unicode)): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index e0f628c71ee1..d4848e48abee 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import unittest from apache_beam.runners.common import DoFnSignature diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py index 6674ba5d9ff9..2148f1691e97 100644 --- a/sdks/python/apache_beam/runners/dataflow/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/__init__.py @@ -21,5 +21,7 @@ with no backwards-compatibility guarantees. """ +from __future__ import absolute_import + from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 3f039f7d014f..89cba3df60bc 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -21,9 +21,13 @@ service. """ +from __future__ import absolute_import + import numbers from collections import defaultdict +from future.utils import iteritems + from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import DistributionResult from apache_beam.metrics.execution import MetricKey @@ -145,7 +149,7 @@ def _populate_metric_results(self, response): # Now we create the MetricResult elements. result = [] - for metric_key, metric in metrics_by_name.iteritems(): + for metric_key, metric in iteritems(metrics_by_name): attempted = self._get_metric_value(metric['tentative']) committed = self._get_metric_value(metric['committed']) if attempted is None or committed is None: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index 67ef923e3d92..1823b59928f6 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -18,8 +18,12 @@ Tests corresponding to the DataflowRunner implementation of MetricsResult, the DataflowMetrics class. """ + +from __future__ import absolute_import + import types import unittest +from builtins import object import mock @@ -34,7 +38,7 @@ class DictToObject(object): """Translate from a dict(list()) structure to an object structure""" def __init__(self, data): - for name, value in data.iteritems(): + for name, value in data.items(): setattr(self, name, self._wrap(value)) def _wrap(self, value): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 9c8520250e77..3d5dab9b57b9 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -20,14 +20,19 @@ The runner will create a JSON description of the job graph and then submit it to the Dataflow Service for remote execution by a worker. """ +from __future__ import absolute_import +from __future__ import division import logging import threading import time import traceback -import urllib +from builtins import hex from collections import defaultdict +from future.moves.urllib.parse import quote +from future.moves.urllib.parse import unquote + import apache_beam as beam from apache_beam import coders from apache_beam import error @@ -125,7 +130,7 @@ def rank_error(msg): if duration: start_secs = time.time() - duration_secs = duration / 1000 + duration_secs = duration // 1000 job_id = result.job_id() while True: @@ -642,7 +647,7 @@ def run_ParDo(self, transform_node): if (label_renames and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn): # Patch PTransform proto. - for old, new in label_renames.iteritems(): + for old, new in iteritems(label_renames): transform_proto.inputs[new] = transform_proto.inputs[old] del transform_proto.inputs[old] @@ -650,7 +655,7 @@ def run_ParDo(self, transform_node): proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn] proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type) - for old, new in label_renames.iteritems(): + for old, new in iteritems(label_renames): proto.side_inputs[new].CopyFrom(proto.side_inputs[old]) del proto.side_inputs[old] transform_proto.spec.payload = proto.SerializeToString() @@ -965,12 +970,12 @@ def deserialize_windowing_strategy(cls, serialized_data): @staticmethod def byte_array_to_json_string(raw_bytes): """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString.""" - return urllib.quote(raw_bytes) + return quote(raw_bytes) @staticmethod def json_string_to_byte_array(encoded_string): """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray.""" - return urllib.unquote(encoded_string) + return unquote(encoded_string) class _DataflowSideInput(beam.pvalue.AsSideInput): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index c8790824bed8..dd913584f68d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -17,8 +17,12 @@ """Unit tests for the DataflowRunner class.""" +from __future__ import absolute_import + import json import unittest +from builtins import object +from builtins import range from datetime import datetime import mock diff --git a/sdks/python/apache_beam/runners/dataflow/internal/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 784166cafda4..f6c15a667a4a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -19,6 +19,9 @@ Dataflow client utility functions.""" +from __future__ import absolute_import + +from builtins import object import codecs import getpass import json @@ -28,12 +31,13 @@ import tempfile import time from datetime import datetime -from StringIO import StringIO +import io + +from past.builtins import unicode import pkg_resources from apitools.base.py import encoding from apitools.base.py import exceptions -import six from apache_beam import version as beam_version from apache_beam.internal.gcp.auth import get_service_credentials @@ -262,7 +266,7 @@ def __init__(self, packages, options, environment_version, pipeline_url): dataflow.Environment.SdkPipelineOptionsValue()) options_dict = {k: v - for k, v in sdk_pipeline_options.iteritems() + for k, v in sdk_pipeline_options.items() if v is not None} options_dict["pipelineUrl"] = pipeline_url self.proto.sdkPipelineOptions.additionalProperties.append( @@ -298,7 +302,7 @@ def encode_shortstrings(input_buffer, errors='strict'): def decode_shortstrings(input_buffer, errors='strict'): """Decoder (to Unicode) that suppresses long base64 strings.""" shortened, length = encode_shortstrings(input_buffer, errors) - return six.text_type(shortened), length + return unicode(shortened), length def shortstrings_registerer(encoding_name): if encoding_name == 'shortstrings': @@ -493,7 +497,7 @@ def create_job(self, job): if job_location: gcs_or_local_path = os.path.dirname(job_location) file_name = os.path.basename(job_location) - self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) + self.stage_file(gcs_or_local_path, file_name, io.BytesIO(job.json())) if not template_location: return self.submit_job_description(job) @@ -508,7 +512,7 @@ def create_job_description(self, job): # Stage the pipeline for the runner harness self.stage_file(job.google_cloud_options.staging_location, names.STAGED_PIPELINE_FILENAME, - StringIO(job.proto_pipeline.SerializeToString())) + io.BytesIO(job.proto_pipeline.SerializeToString())) # Stage other resources for the SDK harness resources = self._stage_resources(job.options) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 2ba4e840cd30..aa0983e6fdb6 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -15,6 +15,9 @@ # limitations under the License. # """Unit tests for the apiclient module.""" + +from __future__ import absolute_import + import unittest import mock diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py index c0d20c3ec8f9..ce260c515f04 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py @@ -18,6 +18,8 @@ """Common imports for generated dataflow client library.""" # pylint:disable=wildcard-import +from __future__ import absolute_import + import pkgutil # Protect against environments where apitools library is not available. diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py index 61d02730ab54..02b424eb994d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py @@ -17,6 +17,9 @@ """Generated client library for dataflow version v1b3.""" # NOTE: This file is autogenerated and should not be edited by hand. + +from __future__ import absolute_import + from apitools.base.py import base_api from apache_beam.runners.dataflow.internal.clients.dataflow import dataflow_v1b3_messages as messages diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index fdc1681f33e1..bdb5c6d26620 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -22,6 +22,8 @@ """ # NOTE: This file is autogenerated and should not be edited by hand. +from __future__ import absolute_import + from apitools.base.protorpclite import messages as _messages from apitools.base.py import encoding from apitools.base.py import extra_types diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py index 805473a8838c..8389e627f405 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import +from future.utils import iteritems from hamcrest.core.base_matcher import BaseMatcher IGNORED = object() @@ -49,7 +51,7 @@ def _matches(self, item): if self.origin != IGNORED and item.origin != self.origin: return False if self.context != IGNORED: - for key, name in self.context.iteritems(): + for key, name in iteritems(self.context): if key not in item.context: return False if name != IGNORED and item.context[key] != name: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py index 15bb9eff083f..3e6b6d71b579 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import + import unittest import hamcrest as hc diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 7e0b81e21308..6f2b8c297358 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -20,8 +20,12 @@ # All constants are for internal use only; no backwards-compatibility # guarantees. +from __future__ import absolute_import + # TODO (altay): Move shared names to a common location. # Standard file names used for staging files. +from builtins import object + PICKLED_MAIN_SESSION_FILE = 'pickled_main_session' DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar' STAGED_PIPELINE_FILENAME = "pipeline.pb" diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 2f2316f6f1d0..2096dc99833b 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -20,7 +20,10 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import logging +from builtins import object from apache_beam import pvalue from apache_beam.io import iobase @@ -31,7 +34,7 @@ def _dict_printable_fields(dict_object, skip_fields): """Returns a list of strings for the interesting fields of a dict.""" return ['%s=%r' % (name, value) - for name, value in dict_object.iteritems() + for name, value in dict_object.items() # want to output value 0 but not None nor [] if (value or value == 0) and name not in skip_fields] diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py index 01fd35f9cf95..828455b6d851 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py @@ -17,6 +17,7 @@ """Tests corresponding to Dataflow's iobase module.""" +from __future__ import absolute_import import unittest diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py index a54ee7767c5e..980ad24c7962 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py @@ -17,6 +17,10 @@ """Create transform for streaming.""" +from __future__ import absolute_import + +from builtins import map + from apache_beam import DoFn from apache_beam import ParDo from apache_beam import PTransform @@ -34,7 +38,7 @@ class StreamingCreate(PTransform): def __init__(self, values, coder): self.coder = coder - self.encoded_values = map(coder.encode, values) + self.encoded_values = list(map(coder.encode, values)) class DecodeAndEmitDoFn(DoFn): """A DoFn which stores encoded versions of elements. diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 0ce212fa31bd..5095e48d802a 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -17,6 +17,8 @@ """Ptransform overrides for DataflowRunner.""" +from __future__ import absolute_import + from apache_beam.coders import typecoders from apache_beam.pipeline import PTransformOverride diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index eedfa60f9fd7..8c61f104e394 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -16,6 +16,8 @@ # """Wrapper of Beam runners that's built for running and verifying e2e tests.""" + +from __future__ import absolute_import from __future__ import print_function import logging diff --git a/sdks/python/apache_beam/runners/direct/__init__.py b/sdks/python/apache_beam/runners/direct/__init__.py index 0f8275674bb5..0b647fd6f7c4 100644 --- a/sdks/python/apache_beam/runners/direct/__init__.py +++ b/sdks/python/apache_beam/runners/direct/__init__.py @@ -20,4 +20,6 @@ Anything in this package not imported here is an internal implementation detail with no backwards-compatibility guarantees. """ +from __future__ import absolute_import + from apache_beam.runners.direct.direct_runner import DirectRunner diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 942d2824dbf3..558e925df3e3 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -19,6 +19,8 @@ from __future__ import absolute_import +from builtins import object + from apache_beam import pvalue from apache_beam.runners import common from apache_beam.utils.windowed_value import WindowedValue diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py index ad079941466f..6dbf8b2a7c05 100644 --- a/sdks/python/apache_beam/runners/direct/clock.py +++ b/sdks/python/apache_beam/runners/direct/clock.py @@ -22,6 +22,7 @@ from __future__ import absolute_import import time +from builtins import object class Clock(object): diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 4efaa27f0958..a1044e7a33bd 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -16,6 +16,7 @@ # """Tests for consumer_tracking_pipeline_visitor.""" +from __future__ import absolute_import import logging import unittest diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py b/sdks/python/apache_beam/runners/direct/direct_metrics.py index 67f5780005fc..4d9f83f3a5f8 100644 --- a/sdks/python/apache_beam/runners/direct/direct_metrics.py +++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py @@ -20,7 +20,10 @@ responding to queries of current metrics, but also of keeping the common state consistent. """ +from __future__ import absolute_import + import threading +from builtins import object from collections import defaultdict from apache_beam.metrics.cells import CounterAggregator diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py index f36178601ff8..3ce42c1bb018 100644 --- a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import unittest import hamcrest as hc diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py index 231cca72476a..f258168dd604 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import threading import unittest diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 893e32e33576..01d0631c5ee8 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -21,6 +21,7 @@ import collections import threading +from builtins import object from apache_beam.runners.direct.direct_metrics import DirectMetrics from apache_beam.runners.direct.executor import TransformExecutor @@ -299,7 +300,7 @@ def handle_result( # Commit partial GBK states existing_keyed_state = self._transform_keyed_states[result.transform] - for k, v in result.partial_keyed_state.iteritems(): + for k, v in result.partial_keyed_state.items(): existing_keyed_state[k] = v return committed_bundles diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index ef6469644c22..6fe3795df4e5 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -22,13 +22,15 @@ import collections import itertools import logging -import Queue import sys import threading import traceback +from builtins import object +from builtins import range from weakref import WeakValueDictionary -import six +from future.moves import queue +from future.utils import raise_ from apache_beam.metrics.execution import MetricsContainer from apache_beam.runners.worker import statesampler @@ -80,7 +82,7 @@ def _get_task_or_none(self): # shutdown. return self.queue.get( timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT) - except Queue.Empty: + except queue.Empty: return None def run(self): @@ -101,7 +103,7 @@ def shutdown(self): self.shutdown_requested = True def __init__(self, num_workers): - self.queue = Queue.Queue() + self.queue = queue.Queue() self.workers = [_ExecutorService._ExecutorServiceWorker( self.queue, i) for i in range(num_workers)] self.shutdown_requested = False @@ -126,7 +128,7 @@ def shutdown(self): try: self.queue.get_nowait() self.queue.task_done() - except Queue.Empty: + except queue.Empty: continue # All existing threads will eventually terminate (after they complete their # last task). @@ -441,7 +443,7 @@ def await_completion(self): try: if update.exception: t, v, tb = update.exc_info - six.reraise(t, v, tb) + raise_(t, v, tb) finally: self.executor_service.shutdown() self.executor_service.await_completion() @@ -481,14 +483,14 @@ class _TypedUpdateQueue(object): def __init__(self, item_type): self._item_type = item_type - self._queue = Queue.Queue() + self._queue = queue.Queue() def poll(self): try: item = self._queue.get_nowait() self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: return None def take(self): @@ -501,7 +503,7 @@ def take(self): item = self._queue.get(timeout=1) self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: pass def offer(self, item): diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py index 0c1da0351264..6d894fb4ba01 100644 --- a/sdks/python/apache_beam/runners/direct/helper_transforms.py +++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import collections import itertools diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py index 610664be9232..641863243d6a 100644 --- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py @@ -18,6 +18,9 @@ """This module contains Splittable DoFn logic that is specific to DirectRunner. """ +from __future__ import absolute_import + +from builtins import object from threading import Lock from threading import Timer diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py index e8ef9b6b968c..d5924cb180ba 100644 --- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py @@ -17,9 +17,13 @@ """Unit tests for SDF implementation for DirectRunner.""" +from __future__ import absolute_import +from __future__ import division + import logging import os import unittest +from builtins import range import apache_beam as beam from apache_beam import Create @@ -192,7 +196,7 @@ def test_sdf_multiple_checkpoints_multiple_element(self): int(self._default_max_num_outputs * 3)) def test_sdf_with_resume_single_element(self): - resume_count = self._default_max_num_outputs / 10 + resume_count = self._default_max_num_outputs // 10 # Makes sure that resume_count is not trivial. assert resume_count > 0 @@ -202,7 +206,7 @@ def test_sdf_with_resume_single_element(self): resume_count) def test_sdf_with_resume_multiple_elements(self): - resume_count = self._default_max_num_outputs / 10 + resume_count = self._default_max_num_outputs // 10 assert resume_count > 0 self.run_sdf_read_pipeline( diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 4b392142afb3..38381fa5fd2a 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -22,6 +22,9 @@ import collections import random import time +from builtins import object + +from future.utils import iteritems import apache_beam.io as io from apache_beam import coders @@ -445,7 +448,7 @@ def _get_element(message): return timestamp, parsed_message return [_get_element(message) - for unused_ack_id, message in results.items()] + for unused_ack_id, message in iteritems(results)] def finish_bundle(self): data = self._read_from_pubsub(self.source.timestamp_attribute) @@ -575,7 +578,7 @@ def process_element(self, element): def finish_bundle(self): self.runner.finish() - bundles = self._tagged_receivers.values() + bundles = list(self._tagged_receivers.values()) result_counters = self._counter_factory.get_counters() return TransformResult( self, bundles, [], result_counters, None) @@ -716,7 +719,7 @@ def process_element(self, element): def finish_bundle(self): bundles = [] bundle = None - for encoded_k, vs in self.gbk_items.iteritems(): + for encoded_k, vs in iteritems(self.gbk_items): if not bundle: bundle = self._evaluation_context.create_bundle( self.output_pcollection) diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py index 797a7432644f..407ea39a21ac 100644 --- a/sdks/python/apache_beam/runners/direct/util.py +++ b/sdks/python/apache_beam/runners/direct/util.py @@ -22,6 +22,8 @@ from __future__ import absolute_import +from builtins import object + class TransformResult(object): """Result of evaluating an AppliedPTransform with a TransformEvaluator.""" diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 3cdc24830a51..8b50919aaf6e 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -20,6 +20,7 @@ from __future__ import absolute_import import threading +from builtins import object from apache_beam import pipeline from apache_beam import pvalue @@ -156,7 +157,7 @@ def extract_all_timers(self): and reports if there are any timers set.""" all_timers = [] has_realtime_timer = False - for applied_ptransform, tw in self._transform_to_watermarks.iteritems(): + for applied_ptransform, tw in self._transform_to_watermarks.items(): fired_timers, had_realtime_timer = tw.extract_transform_timers() if fired_timers: all_timers.append((applied_ptransform, fired_timers)) @@ -203,7 +204,7 @@ def output_watermark(self): def hold(self, keyed_earliest_holds): with self._lock: - for key, hold_value in keyed_earliest_holds.iteritems(): + for key, hold_value in keyed_earliest_holds.items(): self._keyed_earliest_holds[key] = hold_value if (hold_value is None or hold_value == WatermarkManager.WATERMARK_POS_INF): @@ -265,7 +266,7 @@ def extract_transform_timers(self): with self._lock: fired_timers = [] has_realtime_timer = False - for encoded_key, state in self._keyed_states.iteritems(): + for encoded_key, state in self._keyed_states.items(): timers, had_realtime_timer = state.get_timers( watermark=self._input_watermark, processing_time=self._clock.time()) diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py b/sdks/python/apache_beam/runners/experimental/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/runners/experimental/__init__.py +++ b/sdks/python/apache_beam/runners/experimental/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py index 5d1403013932..0b416aa8e52e 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py @@ -19,4 +19,6 @@ sends a runner API proto over the API and then runs it on the other side. """ +from __future__ import absolute_import + from apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner import PythonRPCDirectRunner diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py index 84bed4270bc7..57056abf8b69 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py @@ -18,9 +18,12 @@ """A runner implementation that submits a job for remote execution. """ +from __future__ import absolute_import + import logging import random import string +from builtins import range import grpc @@ -59,7 +62,7 @@ def run(self, pipeline): # Submit the job to the RPC co-process jobName = ('Job-' + ''.join(random.choice(string.ascii_uppercase) for _ in range(6))) - options = {k: v for k, v in pipeline._options.get_all_options().iteritems() + options = {k: v for k, v in pipeline._options.get_all_options().items() if v is not None} try: diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py index 4986dc40abc9..a0397c601a2c 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -17,6 +17,8 @@ """A runner implementation that submits a job for remote execution. """ +from __future__ import absolute_import + import time import uuid from concurrent import futures diff --git a/sdks/python/apache_beam/runners/job/__init__.py b/sdks/python/apache_beam/runners/job/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/runners/job/__init__.py +++ b/sdks/python/apache_beam/runners/job/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/job/manager.py b/sdks/python/apache_beam/runners/job/manager.py index 4d88a1189f23..579016681275 100644 --- a/sdks/python/apache_beam/runners/job/manager.py +++ b/sdks/python/apache_beam/runners/job/manager.py @@ -18,9 +18,12 @@ """A object to control to the Job API Co-Process """ +from __future__ import absolute_import + import logging import subprocess import time +from builtins import object import grpc diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py index 84c727fb4ebc..5a247cd27290 100644 --- a/sdks/python/apache_beam/runners/job/utils.py +++ b/sdks/python/apache_beam/runners/job/utils.py @@ -18,6 +18,8 @@ """Utility functions for efficiently processing with the job API """ +from __future__ import absolute_import + import json from google.protobuf import json_format diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index c2ea4ccb3340..511dbd915f5a 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -20,6 +20,9 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + +from builtins import object from apache_beam import coders from apache_beam import pipeline diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py index 6091ed8963e7..1e9456a2d3fa 100644 --- a/sdks/python/apache_beam/runners/pipeline_context_test.py +++ b/sdks/python/apache_beam/runners/pipeline_context_test.py @@ -17,6 +17,8 @@ """Unit tests for the windowing classes.""" +from __future__ import absolute_import + import unittest from apache_beam import coders diff --git a/sdks/python/apache_beam/runners/portability/__init__.py b/sdks/python/apache_beam/runners/portability/__init__.py index 7af93ed945fa..d247cadd0dac 100644 --- a/sdks/python/apache_beam/runners/portability/__init__.py +++ b/sdks/python/apache_beam/runners/portability/__init__.py @@ -16,3 +16,5 @@ # """This runner is experimental; no backwards-compatibility guarantees.""" + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index b807eae9d1cc..448c8793d66a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -17,16 +17,20 @@ """A PipelineRunner using the SDK harness. """ +from __future__ import absolute_import + import collections import contextlib import copy import logging -import Queue as queue +import queue import threading import time +from builtins import object from concurrent import futures import grpc +from future import standard_library import apache_beam as beam # pylint: disable=ungrouped-imports from apache_beam import coders @@ -52,6 +56,7 @@ from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import proto_utils +standard_library.install_aliases() # This module is experimental. No backwards-compatibility guarantees. @@ -297,7 +302,7 @@ def deduplicate_read(self): new_transforms = [] for transform in self.transforms: if transform.spec.urn == bundle_processor.DATA_INPUT_URN: - pcoll = only_element(transform.outputs.items())[1] + pcoll = only_element(list(transform.outputs.items()))[1] if pcoll in seen_pcolls: continue seen_pcolls.add(pcoll) @@ -408,9 +413,9 @@ def windowed_coder_id(coder_id): transform.spec.payload, beam_runner_api_pb2.CombinePayload) input_pcoll = pipeline_components.pcollections[only_element( - transform.inputs.values())] + list(transform.inputs.values()))] output_pcoll = pipeline_components.pcollections[only_element( - transform.outputs.values())] + list(transform.outputs.values()))] windowed_input_coder = pipeline_components.coders[ input_pcoll.coder_id] @@ -579,7 +584,7 @@ def sink_flattens(stages): if transform.spec.urn == common_urns.primitives.FLATTEN.urn: # This is used later to correlate the read and writes. param = str("materialize:%s" % transform.unique_name) - output_pcoll_id, = transform.outputs.values() + output_pcoll_id, = list(transform.outputs.values()) output_coder_id = pcollections[output_pcoll_id].coder_id flatten_writes = [] for local_in, pcoll_in in transform.inputs.items(): @@ -766,7 +771,7 @@ def fuse(producer, consumer): # Everything that was originally a stage or a replacement, but wasn't # replaced, should be in the final graph. final_stages = frozenset(stages).union(replacements.values()).difference( - replacements.keys()) + list(replacements.keys())) for stage in final_stages: # Update all references to their final values before throwing @@ -948,8 +953,8 @@ def get_buffer(pcoll_id): original_gbk_transform = pcoll_id.split(':', 1)[1] transform_proto = pipeline_components.transforms[ original_gbk_transform] - input_pcoll = only_element(transform_proto.inputs.values()) - output_pcoll = only_element(transform_proto.outputs.values()) + input_pcoll = only_element(list(transform_proto.inputs.values())) + output_pcoll = only_element(list(transform_proto.outputs.values())) pre_gbk_coder = context.coders[safe_coders[ pipeline_components.pcollections[input_pcoll].coder_id]] post_gbk_coder = context.coders[safe_coders[ diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 63af2847b56d..6544c9d477a8 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import from __future__ import print_function import functools @@ -23,6 +24,7 @@ import time import traceback import unittest +from builtins import range import apache_beam as beam from apache_beam.metrics.execution import MetricKey @@ -152,7 +154,7 @@ def cross_product(elem, sides): def test_pardo_windowed_side_inputs(self): with self.create_pipeline() as p: # Now with some windowing. - pcoll = p | beam.Create(range(10)) | beam.Map( + pcoll = p | beam.Create(list(range(10))) | beam.Map( lambda t: window.TimestampedValue(t, t)) # Intentionally choosing non-aligned windows to highlight the transition. main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(5)) @@ -163,17 +165,17 @@ def test_pardo_windowed_side_inputs(self): res, equal_to([ # The window [0, 5) maps to the window [0, 7). - (0, range(7)), - (1, range(7)), - (2, range(7)), - (3, range(7)), - (4, range(7)), + (0, list(range(7))), + (1, list(range(7))), + (2, list(range(7))), + (3, list(range(7))), + (4, list(range(7))), # The window [5, 10) maps to the window [7, 14). - (5, range(7, 10)), - (6, range(7, 10)), - (7, range(7, 10)), - (8, range(7, 10)), - (9, range(7, 10))]), + (5, list(range(7, 10))), + (6, list(range(7, 10))), + (7, list(range(7, 10))), + (8, list(range(7, 10))), + (9, list(range(7, 10)))]), label='windowed') def test_flattened_side_input(self): @@ -374,7 +376,7 @@ def test_progress_metrics(self): res.wait_until_finish() try: self.assertEqual(2, len(res._metrics_by_stage)) - pregbk_metrics, postgbk_metrics = res._metrics_by_stage.values() + pregbk_metrics, postgbk_metrics = list(res._metrics_by_stage.values()) if 'Create/Read' not in pregbk_metrics.ptransforms: # The metrics above are actually unordered. Swap. pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics @@ -398,7 +400,7 @@ def test_progress_metrics(self): # The actual stage name ends up being something like 'm_out/lamdbda...' m_out, = [ - metrics for name, metrics in postgbk_metrics.ptransforms.items() + metrics for name, metrics in list(postgbk_metrics.ptransforms.items()) if name.startswith('m_out')] self.assertEqual( 5, diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 962c2b466e51..7b07768987dc 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -14,18 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import + import functools import logging import os -import Queue as queue +import queue as queue import subprocess import threading import time import traceback import uuid +from builtins import object from concurrent import futures import grpc +from future import standard_library from google.protobuf import text_format from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -34,6 +38,8 @@ from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.portability import fn_api_runner +standard_library.install_aliases() + TERMINAL_STATES = [ beam_job_api_pb2.JobState.DONE, beam_job_api_pb2.JobState.STOPPED, diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_main.py b/sdks/python/apache_beam/runners/portability/local_job_service_main.py index b6f2ef9710c8..dc70f4556883 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import argparse import logging import sys diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 295b985bfe01..a22c6069da33 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import logging import os import threading diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 3e680a35b01d..88aa0e18805f 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import from __future__ import print_function import logging diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index a6bfa497bd57..4b10de47e22f 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -44,6 +44,8 @@ TODO(silviuc): Should we allow several setup packages? TODO(silviuc): We should allow customizing the exact command for setup build. """ +from __future__ import absolute_import + import glob import logging import os diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 56b57d167963..17318f504d4b 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -16,6 +16,8 @@ # """Unit tests for the stager module.""" +from __future__ import absolute_import + import logging import os import shutil diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index b3691722540a..15cc1de5779e 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -24,6 +24,7 @@ import shelve import shutil import tempfile +from builtins import object __all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult'] diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index e3962f89b488..196a6d91858f 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -22,6 +22,8 @@ caching and clearing values that are not tested elsewhere. """ +from __future__ import absolute_import + import unittest import hamcrest as hc diff --git a/sdks/python/apache_beam/runners/sdf_common.py b/sdks/python/apache_beam/runners/sdf_common.py index 5b3554460d26..e0573289f3bb 100644 --- a/sdks/python/apache_beam/runners/sdf_common.py +++ b/sdks/python/apache_beam/runners/sdf_common.py @@ -17,7 +17,10 @@ """This module contains Splittable DoFn logic that's common to all runners.""" +from __future__ import absolute_import + import uuid +from builtins import object import apache_beam as beam from apache_beam import pvalue diff --git a/sdks/python/apache_beam/runners/test/__init__.py b/sdks/python/apache_beam/runners/test/__init__.py index 6cad4d885509..a52ea6e84df2 100644 --- a/sdks/python/apache_beam/runners/test/__init__.py +++ b/sdks/python/apache_beam/runners/test/__init__.py @@ -23,6 +23,8 @@ # Protect against environments where dataflow runner is not available. # pylint: disable=wrong-import-order, wrong-import-position +from __future__ import absolute_import + try: from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner except ImportError: diff --git a/sdks/python/apache_beam/runners/worker/__init__.py b/sdks/python/apache_beam/runners/worker/__init__.py index 0bce5d68f724..9fbf21557df7 100644 --- a/sdks/python/apache_beam/runners/worker/__init__.py +++ b/sdks/python/apache_beam/runners/worker/__init__.py @@ -16,3 +16,4 @@ # """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 7727c87c4c70..47eb4e0f9681 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -26,6 +26,10 @@ import json import logging import re +from builtins import next +from builtins import object + +from future.utils import itervalues import apache_beam as beam from apache_beam.coders import WindowedValueCoder @@ -103,7 +107,7 @@ def __init__(self, operation_name, step_name, consumers, counter_factory, # We must do this manually as we don't have a spec or spec.output_coders. self.receivers = [ operations.ConsumerSet(self.counter_factory, self.step_name, 0, - next(consumers.itervalues()), + next(itervalues(consumers)), self.windowed_coder)] def process(self, windowed_value): @@ -315,7 +319,7 @@ def _fix_output_tags(self, transform_id, metrics): # However, if there is exactly one output, we can fix up the name here. def fix_only_output_tag(actual_output_tag, mapping): if len(mapping) == 1: - fake_output_tag, count = only_element(mapping.items()) + fake_output_tag, count = only_element(list(mapping.items())) if fake_output_tag != actual_output_tag: del mapping[fake_output_tag] mapping[actual_output_tag] = count @@ -394,7 +398,7 @@ def get_input_coders(self, transform_proto): } def get_only_input_coder(self, transform_proto): - return only_element(self.get_input_coders(transform_proto).values()) + return only_element(list(self.get_input_coders(transform_proto).values())) # TODO(robertwb): Update all operations to take these in the constructor. @staticmethod @@ -411,7 +415,7 @@ def augment_oldstyle_op(op, step_name, consumers, tag_list=None): def create(factory, transform_id, transform_proto, grpc_port, consumers): target = beam_fn_api_pb2.Target( primitive_transform_reference=transform_id, - name=only_element(transform_proto.outputs.keys())) + name=only_element(list(transform_proto.outputs.keys()))) return DataInputOperation( transform_proto.unique_name, transform_proto.unique_name, @@ -428,7 +432,7 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): def create(factory, transform_id, transform_proto, grpc_port, consumers): target = beam_fn_api_pb2.Target( primitive_transform_reference=transform_id, - name=only_element(transform_proto.inputs.keys())) + name=only_element(list(transform_proto.inputs.keys()))) return DataOutputOperation( transform_proto.unique_name, transform_proto.unique_name, diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 1ff60aacb9aa..bb3cc2a85bb8 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -24,18 +24,24 @@ import abc import collections import logging -import Queue as queue +import queue import sys import threading +from builtins import object +from builtins import range import grpc -import six +from future import standard_library +from future.utils import raise_ +from future.utils import with_metaclass from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor +standard_library.install_aliases() + # This module is experimental. No backwards-compatibility guarantees. @@ -51,7 +57,7 @@ def close(self): self._close_callback(self.get()) -class DataChannel(object): +class DataChannel(with_metaclass(abc.ABCMeta, object)): """Represents a channel for reading and writing data over the data plane. Read from this channel with the input_elements method:: @@ -70,8 +76,6 @@ class DataChannel(object): data_channel.close() """ - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def input_elements(self, instruction_id, expected_targets): """Returns an iterable of all Element.Data bundles for instruction_id. @@ -185,7 +189,7 @@ def input_elements(self, instruction_id, expected_targets): except queue.Empty: if self._exc_info: t, v, tb = self._exc_info - six.reraise(t, v, tb) + raise_(t, v, tb) else: if not data.data and data.target in expected_targets: done_targets.append(data.target) @@ -273,11 +277,9 @@ def Data(self, elements_iterator, context): yield elements -class DataChannelFactory(object): +class DataChannelFactory(with_metaclass(abc.ABCMeta, object)): """An abstract factory for creating ``DataChannel``.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def create_data_channel(self, remote_grpc_port): """Returns a ``DataChannel`` from the given RemoteGrpcPort.""" diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index b2cefbe1469c..4e9a79ff1ed4 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -28,12 +28,15 @@ from concurrent import futures import grpc -import six +from future import standard_library +from future.utils import raise_ from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import data_plane +standard_library.install_aliases() + def timeout(timeout_secs): def decorate(fn): @@ -51,7 +54,7 @@ def call_fn(): thread.join(timeout_secs) if exc_info: t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking - six.reraise(t, v, tb) + raise_(t, v, tb) assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs return wrapper return decorate diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index 152659e0a3fe..02bea3e2cdbc 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -16,17 +16,23 @@ # """Beam fn API log handler.""" +from __future__ import absolute_import + import logging import math -import Queue as queue +import queue import threading +from builtins import range import grpc +from future import standard_library from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor +standard_library.install_aliases() + # This module is experimental. No backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index eb5045aeeb58..ab042aa4ba28 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -15,9 +15,11 @@ # limitations under the License. # +from __future__ import absolute_import import logging import unittest +from builtins import range from concurrent import futures import grpc @@ -101,7 +103,7 @@ def _create_test(name, num_logs): lambda self: self._verify_fn_log_handler(num_logs)) -for test_name, num_logs_entries in data.iteritems(): +for test_name, num_logs_entries in data.items(): _create_test(test_name, num_logs_entries) diff --git a/sdks/python/apache_beam/runners/worker/logger.py b/sdks/python/apache_beam/runners/worker/logger.py index 043353807a31..07cd320ff822 100644 --- a/sdks/python/apache_beam/runners/worker/logger.py +++ b/sdks/python/apache_beam/runners/worker/logger.py @@ -15,8 +15,12 @@ # limitations under the License. # +# cython: language_level=3 + """Python worker logging.""" +from __future__ import absolute_import + import json import logging import threading diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py b/sdks/python/apache_beam/runners/worker/logger_test.py index cf3f69292826..73ec1aa3ad9c 100644 --- a/sdks/python/apache_beam/runners/worker/logger_test.py +++ b/sdks/python/apache_beam/runners/worker/logger_test.py @@ -17,11 +17,14 @@ """Tests for worker logging utilities.""" +from __future__ import absolute_import + import json import logging import sys import threading import unittest +from builtins import object from apache_beam.runners.worker import logger @@ -83,7 +86,7 @@ def create_log_record(self, **kwargs): class Record(object): def __init__(self, **kwargs): - for k, v in kwargs.iteritems(): + for k, v in kwargs.items(): setattr(self, k, v) return Record(**kwargs) diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index 0e4ee0a05dc8..cdbb27a0a778 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -15,14 +15,18 @@ # limitations under the License. # +# cython: language_level=3 # cython: profile=True """Counters collect the progress of the Worker for reporting to the service.""" from __future__ import absolute_import +from __future__ import division import math import random +from builtins import hex +from builtins import object from apache_beam.utils import counters from apache_beam.utils.counters import Counter @@ -229,7 +233,7 @@ def update_collect(self): def _compute_next_sample(self, i): # https://en.wikipedia.org/wiki/Reservoir_sampling#Fast_Approximation - gap = math.log(1.0 - random.random()) / math.log(1.0 - 10.0/i) + gap = math.log(1.0 - random.random()) / math.log(1.0 - (10.0 / i)) return i + math.floor(gap) def _should_sample(self): diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py index 41c80e87c4f7..3987311112cb 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters_test.py +++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py @@ -15,10 +15,15 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division + import logging import math import random import unittest +from builtins import object +from builtins import range from apache_beam import coders from apache_beam.runners.worker import opcounters @@ -32,7 +37,7 @@ # These have to be at top level so the pickler can find them. -class OldClassThatDoesNotImplementLen: # pylint: disable=old-style-class +class OldClassThatDoesNotImplementLen(object): # pylint: disable=old-style-class def __init__(self): pass @@ -149,11 +154,11 @@ def test_update_multiple(self): value = GlobalWindows.windowed_value('defghij') opcounts.update_from(value) total_size += coder.estimate_size(value) - self.verify_counters(opcounts, 2, float(total_size) / 2) + self.verify_counters(opcounts, 2, (float(total_size) / 2)) value = GlobalWindows.windowed_value('klmnop') opcounts.update_from(value) total_size += coder.estimate_size(value) - self.verify_counters(opcounts, 3, float(total_size) / 3) + self.verify_counters(opcounts, 3, (float(total_size) / 3)) def test_should_sample(self): # Order of magnitude more buckets than highest constant in code under test. diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index d38bc7788fac..62c5bbd0117f 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -21,6 +21,8 @@ source, write to a sink, parallel do, etc. """ +from __future__ import absolute_import + import collections from apache_beam import coders diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 9aa29b87aa04..f34bcb7db7d4 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -15,13 +15,18 @@ # limitations under the License. # +# cython: language_level=3 # cython: profile=True """Worker operations executor.""" +from __future__ import absolute_import + import collections -import itertools import logging +from builtins import filter +from builtins import object +from builtins import zip from apache_beam import pvalue from apache_beam.internal import pickler @@ -318,7 +323,7 @@ def _read_side_inputs(self, tags_and_types): # while the variable has the value assigned by the current iteration of # the for loop. # pylint: disable=cell-var-from-loop - for si in itertools.ifilter( + for si in filter( lambda o: o.tag == side_tag, self.spec.side_inputs): if not isinstance(si, operation_specs.WorkerSideInputSource): raise NotImplementedError('Unknown side input type: %r' % si) @@ -542,7 +547,7 @@ def process(self, wkv): target = self.key_count * 9 // 10 old_wkeys = [] # TODO(robertwb): Use an LRU cache? - for old_wkey, old_wvalue in self.table.iteritems(): + for old_wkey, old_wvalue in self.table.items(): old_wkeys.append(old_wkey) # Can't mutate while iterating. self.output_key(old_wkey, old_wvalue[0]) self.key_count -= 1 @@ -557,7 +562,7 @@ def process(self, wkv): entry[0] = self.combine_fn_add_input(entry[0], value) def finish(self): - for wkey, value in self.table.iteritems(): + for wkey, value in self.table.items(): self.output_key(wkey, value[0]) self.table = {} self.key_count = 0 diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b8fa422536b4..8e94e952d681 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -23,14 +23,18 @@ import abc import contextlib import logging -import Queue as queue +import queue import sys import threading import traceback +from builtins import object +from builtins import range from concurrent import futures import grpc -import six +from future import standard_library +from future.utils import raise_ +from future.utils import with_metaclass from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -38,6 +42,8 @@ from apache_beam.runners.worker import data_plane from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor +standard_library.install_aliases() + class SdkHarness(object): REQUEST_METHOD_PREFIX = '_request_' @@ -176,7 +182,7 @@ def _request_process_bundle_progress(self, request): def task(): instruction_reference = getattr( request, request.WhichOneof('request')).instruction_reference - if self._instruction_id_vs_worker.has_key(instruction_reference): + if instruction_reference in self._instruction_id_vs_worker: self._execute( lambda: self._instruction_id_vs_worker[ instruction_reference @@ -245,11 +251,9 @@ def process_bundle_progress(self, request, instruction_id): metrics=processor.metrics() if processor else None)) -class StateHandlerFactory(object): +class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)): """An abstract factory for creating ``DataChannel``.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def create_state_handler(self, api_service_descriptor): """Returns a ``StateHandler`` from the given ApiServiceDescriptor.""" @@ -407,7 +411,7 @@ def _blocking_request(self, request): while not future.wait(timeout=1): if self._exc_info: t, v, tb = self._exc_info - six.reraise(t, v, tb) + raise_(t, v, tb) elif self._done: raise RuntimeError() del self._responses_by_id[request.id] diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 46670e88964f..d05217ef9b83 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -16,7 +16,9 @@ # """SDK Fn Harness entry point.""" -import BaseHTTPServer +from __future__ import absolute_import + +import http.server import json import logging import os @@ -24,7 +26,9 @@ import sys import threading import traceback +from builtins import object +from future import standard_library from google.protobuf import text_format from apache_beam.internal import pickler @@ -33,6 +37,8 @@ from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler from apache_beam.runners.worker.sdk_worker import SdkHarness +standard_library.install_aliases() + # This module is experimental. No backwards-compatibility guarantees. @@ -57,7 +63,7 @@ def start(self, status_http_port=0): Default is 0 which means any free unsecured port """ - class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler): + class StatusHttpHandler(http.server.BaseHTTPRequestHandler): """HTTP handler for serving stacktraces of all threads.""" def do_GET(self): # pylint: disable=invalid-name @@ -73,7 +79,7 @@ def log_message(self, f, *args): """Do not log any messages.""" pass - self.httpd = httpd = BaseHTTPServer.HTTPServer( + self.httpd = httpd = http.server.HTTPServer( ('localhost', status_http_port), StatusHttpHandler) logging.info('Status HTTP server running at %s:%s', httpd.server_name, httpd.server_port) @@ -157,10 +163,10 @@ def _get_worker_count(pipeline_options): an int containing the worker_threads to use. Default is 1 """ pipeline_options = pipeline_options.get( - 'options') if pipeline_options.has_key('options') else {} + 'options') if 'options' in pipeline_options else {} experiments = pipeline_options.get( 'experiments' - ) if pipeline_options and pipeline_options.has_key('experiments') else [] + ) if pipeline_options and 'experiments' in pipeline_options else [] experiments = experiments if experiments else [] diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index c229d6450ef5..785064a172c4 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -22,6 +22,7 @@ import logging import unittest +from builtins import range from concurrent import futures import grpc diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index 980a2088c7b5..d806f9e20257 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -17,11 +17,17 @@ """Utilities for handling side inputs.""" +from __future__ import absolute_import + import collections import logging -import Queue +import queue import threading import traceback +from builtins import object +from builtins import range + +from future import standard_library from apache_beam.coders import observable from apache_beam.io import iobase @@ -29,6 +35,8 @@ from apache_beam.runners.worker import opcounters from apache_beam.transforms import window +standard_library.install_aliases() + # This module is experimental. No backwards-compatibility guarantees. @@ -61,13 +69,13 @@ def __init__(self, self.num_reader_threads = min(max_reader_threads, len(self.sources)) # Queue for sources that are to be read. - self.sources_queue = Queue.Queue() + self.sources_queue = queue.Queue() for source in sources: self.sources_queue.put(source) # Queue for elements that have been read. - self.element_queue = Queue.Queue(ELEMENT_QUEUE_SIZE) + self.element_queue = queue.Queue(ELEMENT_QUEUE_SIZE) # Queue for exceptions encountered in reader threads; to be rethrown. - self.reader_exceptions = Queue.Queue() + self.reader_exceptions = queue.Queue() # Whether we have already iterated; this iterable can only be used once. self.already_iterated = False # Whether an error was encountered in any source reader. @@ -134,7 +142,7 @@ def _reader_thread(self): self.element_queue.put(value) else: self.element_queue.put(_globally_windowed(value)) - except Queue.Empty: + except queue.Empty: return except Exception as e: # pylint: disable=broad-except logging.error('Encountered exception in PrefetchingSourceSetIterable ' diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index c4240dd028c9..3a5ff3819502 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -17,9 +17,13 @@ """Tests for side input utilities.""" +from __future__ import absolute_import + import logging import time import unittest +from builtins import object +from builtins import range import mock @@ -77,7 +81,7 @@ def test_single_source_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=2) - assert list(strip_windows(iterator_fn())) == range(6) + assert list(strip_windows(iterator_fn())) == list(range(6)) def test_bytes_read_behind_experiment(self): mock_read_counter = mock.MagicMock() @@ -115,7 +119,7 @@ def test_multiple_sources_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=3) - assert sorted(strip_windows(iterator_fn())) == range(11) + assert sorted(strip_windows(iterator_fn())) == list(range(11)) def test_multiple_sources_single_reader_iterator_fn(self): sources = [ @@ -126,7 +130,7 @@ def test_multiple_sources_single_reader_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=1) - assert list(strip_windows(iterator_fn())) == range(11) + assert list(strip_windows(iterator_fn())) == list(range(11)) def test_source_iterator_single_source_exception(self): class MyException(Exception): @@ -172,7 +176,7 @@ def perpetual_generator(value): with self.assertRaises(MyException): for value in iterator_fn(): seen.add(value.value) - self.assertEqual(sorted(seen), range(5)) + self.assertEqual(sorted(seen), list(range(5))) class EmulatedCollectionsTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index f3916a230de4..b0c2b67f9ff7 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -16,6 +16,9 @@ # # This module is experimental. No backwards-compatibility guarantees. + +from __future__ import absolute_import + import threading from collections import namedtuple diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py b/sdks/python/apache_beam/runners/worker/statesampler_slow.py index 59f84f7891f7..2f09d0e8bb25 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py @@ -17,6 +17,10 @@ # This module is experimental. No backwards-compatibility guarantees. +from __future__ import absolute_import + +from builtins import object + class StateSampler(object): diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 8b2216951dd3..dfb4bc10a913 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -17,10 +17,12 @@ """Tests for state sampler.""" from __future__ import absolute_import +from __future__ import division import logging import time import unittest +from builtins import range from apache_beam.runners.worker import statesampler from apache_beam.utils.counters import CounterFactory diff --git a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py index 0a71292f773d..f2ca4e79f725 100644 --- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py +++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py @@ -39,8 +39,7 @@ class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor): # and throw exception in worker_id_interceptor.py after we have rolled out # the corresponding container changes. # Unique worker Id for this worker. - _worker_id = os.environ['WORKER_ID'] if os.environ.has_key( - 'WORKER_ID') else str(uuid.uuid4()) + _worker_id = os.environ.get('WORKER_ID', str(uuid.uuid4())) def __init__(self): pass diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index c80cefd24cb0..01d5f03eaa78 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -100,6 +100,7 @@ deps = flake8==3.5.0 modules = apache_beam/coders + apache_beam/runners apache_beam/examples apache_beam/portability apache_beam/internal