Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
This package defines runners, which are used to execute a pipeline.
"""

from __future__ import absolute_import

from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
Expand Down
16 changes: 12 additions & 4 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
# limitations under the License.
#

# cython: language_level=3
# cython: profile=True

"""Worker operations executor.

For internal use only; no backwards-compatibility guarantees.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this file will be cythonized, let's tell cython to use Python3 semantics and add:

# cython: language_level=3,

See also:

'apache_beam/**/*.pyx',

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has some interesting consequences for typechecks on strings.

if not isinstance(tag, six.string_types):

cannot be replaced by

if not isinstance(tag, (str, unicode)):

Since the string objects defined in other modules (in Python2) are seen as bytes in Cython code with Python3 semantics.

Therefore I replaced it by

try:
    basestring
except NameError:
    basestring = str

and

if not isinstance(tag,basestring):

"""

from __future__ import absolute_import

import sys
import traceback
from builtins import next
from builtins import object
from builtins import zip

import six
from future.utils import raise_
from past.builtins import basestring
from past.builtins import unicode

from apache_beam.internal import util
from apache_beam.pvalue import TaggedOutput
Expand Down Expand Up @@ -615,7 +623,7 @@ def _reraise_augmented(self, exn):
traceback.format_exception_only(type(exn), exn)[-1].strip()
+ step_annotation)
new_exn._tagged_with_step = True
six.reraise(type(new_exn), new_exn, original_traceback)
raise_(type(new_exn), new_exn, original_traceback)


class OutputProcessor(object):
Expand Down Expand Up @@ -652,7 +660,7 @@ def process_outputs(self, windowed_input_element, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, six.string_types):
if not isinstance(tag, basestring):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
Expand Down Expand Up @@ -694,7 +702,7 @@ def finish_bundle_outputs(self, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, six.string_types):
if not isinstance(tag, (str, unicode)):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

from __future__ import absolute_import

import unittest

from apache_beam.runners.common import DoFnSignature
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@
with no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
service.
"""

from __future__ import absolute_import

import numbers
from collections import defaultdict

from future.utils import iteritems

from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
Expand Down Expand Up @@ -145,7 +149,7 @@ def _populate_metric_results(self, response):

# Now we create the MetricResult elements.
result = []
for metric_key, metric in metrics_by_name.iteritems():
for metric_key, metric in iteritems(metrics_by_name):
attempted = self._get_metric_value(metric['tentative'])
committed = self._get_metric_value(metric['committed'])
if attempted is None or committed is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
Tests corresponding to the DataflowRunner implementation of MetricsResult,
the DataflowMetrics class.
"""

from __future__ import absolute_import

import types
import unittest
from builtins import object

import mock

Expand All @@ -34,7 +38,7 @@
class DictToObject(object):
"""Translate from a dict(list()) structure to an object structure"""
def __init__(self, data):
for name, value in data.iteritems():
for name, value in data.items():
setattr(self, name, self._wrap(value))

def _wrap(self, value):
Expand Down
17 changes: 11 additions & 6 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
The runner will create a JSON description of the job graph and then submit it
to the Dataflow Service for remote execution by a worker.
"""
from __future__ import absolute_import
from __future__ import division

import logging
import threading
import time
import traceback
import urllib
from builtins import hex
from collections import defaultdict

from future.moves.urllib.parse import quote
from future.moves.urllib.parse import unquote

import apache_beam as beam
from apache_beam import coders
from apache_beam import error
Expand Down Expand Up @@ -125,7 +130,7 @@ def rank_error(msg):

if duration:
start_secs = time.time()
duration_secs = duration / 1000
duration_secs = duration // 1000

job_id = result.job_id()
while True:
Expand Down Expand Up @@ -642,15 +647,15 @@ def run_ParDo(self, transform_node):
if (label_renames and
transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
# Patch PTransform proto.
for old, new in label_renames.iteritems():
for old, new in iteritems(label_renames):
transform_proto.inputs[new] = transform_proto.inputs[old]
del transform_proto.inputs[old]

# Patch ParDo proto.
proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
proto = proto_utils.parse_Bytes(transform_proto.spec.payload,
proto_type)
for old, new in label_renames.iteritems():
for old, new in iteritems(label_renames):
proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
del proto.side_inputs[old]
transform_proto.spec.payload = proto.SerializeToString()
Expand Down Expand Up @@ -965,12 +970,12 @@ def deserialize_windowing_strategy(cls, serialized_data):
@staticmethod
def byte_array_to_json_string(raw_bytes):
"""Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
return urllib.quote(raw_bytes)
return quote(raw_bytes)

@staticmethod
def json_string_to_byte_array(encoded_string):
"""Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
return urllib.unquote(encoded_string)
return unquote(encoded_string)


class _DataflowSideInput(beam.pvalue.AsSideInput):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

"""Unit tests for the DataflowRunner class."""

from __future__ import absolute_import

import json
import unittest
from builtins import object
from builtins import range
from datetime import datetime

import mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
16 changes: 10 additions & 6 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

Dataflow client utility functions."""

from __future__ import absolute_import

from builtins import object
import codecs
import getpass
import json
Expand All @@ -28,12 +31,13 @@
import tempfile
import time
from datetime import datetime
from StringIO import StringIO
import io

from past.builtins import unicode

import pkg_resources
from apitools.base.py import encoding
from apitools.base.py import exceptions
import six

from apache_beam import version as beam_version
from apache_beam.internal.gcp.auth import get_service_credentials
Expand Down Expand Up @@ -262,7 +266,7 @@ def __init__(self, packages, options, environment_version, pipeline_url):
dataflow.Environment.SdkPipelineOptionsValue())

options_dict = {k: v
for k, v in sdk_pipeline_options.iteritems()
for k, v in sdk_pipeline_options.items()
if v is not None}
options_dict["pipelineUrl"] = pipeline_url
self.proto.sdkPipelineOptions.additionalProperties.append(
Expand Down Expand Up @@ -298,7 +302,7 @@ def encode_shortstrings(input_buffer, errors='strict'):
def decode_shortstrings(input_buffer, errors='strict'):
"""Decoder (to Unicode) that suppresses long base64 strings."""
shortened, length = encode_shortstrings(input_buffer, errors)
return six.text_type(shortened), length
return unicode(shortened), length

def shortstrings_registerer(encoding_name):
if encoding_name == 'shortstrings':
Expand Down Expand Up @@ -493,7 +497,7 @@ def create_job(self, job):
if job_location:
gcs_or_local_path = os.path.dirname(job_location)
file_name = os.path.basename(job_location)
self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
self.stage_file(gcs_or_local_path, file_name, io.BytesIO(job.json()))

if not template_location:
return self.submit_job_description(job)
Expand All @@ -508,7 +512,7 @@ def create_job_description(self, job):
# Stage the pipeline for the runner harness
self.stage_file(job.google_cloud_options.staging_location,
names.STAGED_PIPELINE_FILENAME,
StringIO(job.proto_pipeline.SerializeToString()))
io.BytesIO(job.proto_pipeline.SerializeToString()))

# Stage other resources for the SDK harness
resources = self._stage_resources(job.options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# limitations under the License.
#
"""Unit tests for the apiclient module."""

from __future__ import absolute_import

import unittest

import mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""Common imports for generated dataflow client library."""
# pylint:disable=wildcard-import

from __future__ import absolute_import

import pkgutil

# Protect against environments where apitools library is not available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

"""Generated client library for dataflow version v1b3."""
# NOTE: This file is autogenerated and should not be edited by hand.

from __future__ import absolute_import

from apitools.base.py import base_api

from apache_beam.runners.dataflow.internal.clients.dataflow import dataflow_v1b3_messages as messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"""
# NOTE: This file is autogenerated and should not be edited by hand.

from __future__ import absolute_import

from apitools.base.protorpclite import messages as _messages
from apitools.base.py import encoding
from apitools.base.py import extra_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import

from future.utils import iteritems
from hamcrest.core.base_matcher import BaseMatcher

IGNORED = object()
Expand Down Expand Up @@ -49,7 +51,7 @@ def _matches(self, item):
if self.origin != IGNORED and item.origin != self.origin:
return False
if self.context != IGNORED:
for key, name in self.context.iteritems():
for key, name in iteritems(self.context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why iteritems here vs .items()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid efficiency losses in Python2, see: #5373 (comment)

if key not in item.context:
return False
if name != IGNORED and item.context[key] != name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import

import unittest

import hamcrest as hc
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/internal/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
# All constants are for internal use only; no backwards-compatibility
# guarantees.

from __future__ import absolute_import

# TODO (altay): Move shared names to a common location.
# Standard file names used for staging files.
from builtins import object

PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
STAGED_PIPELINE_FILENAME = "pipeline.pb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

import logging
from builtins import object

from apache_beam import pvalue
from apache_beam.io import iobase
Expand All @@ -31,7 +34,7 @@
def _dict_printable_fields(dict_object, skip_fields):
"""Returns a list of strings for the interesting fields of a dict."""
return ['%s=%r' % (name, value)
for name, value in dict_object.iteritems()
for name, value in dict_object.items()
# want to output value 0 but not None nor []
if (value or value == 0)
and name not in skip_fields]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""Tests corresponding to Dataflow's iobase module."""

from __future__ import absolute_import

import unittest

Expand Down
Loading