Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

from types import NoneType

import six

from apache_beam.coders import observable
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MAX_TIMESTAMP
Expand Down Expand Up @@ -197,7 +199,7 @@ def __init__(self, coder, step_label):
self._step_label = step_label

def _check_safe(self, value):
if isinstance(value, (str, unicode, long, int, float)):
if isinstance(value, (str, six.text_type, long, int, float)):
pass
elif value is None:
pass
Expand Down Expand Up @@ -288,7 +290,7 @@ def encode_to_stream(self, value, stream, nested):
elif t is str:
stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is unicode:
elif t is six.text_type:
unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
stream.write(unicode_value.encode('utf-8'), nested)
Expand Down
9 changes: 5 additions & 4 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,11 @@ class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""

def encode(self, value):
if isinstance(value, unicode):
return value.encode('utf-8')
elif isinstance(value, str):
return value
try: # Python 2
if isinstance(value, unicode):
return value.encode('utf-8')
except NameError: # Python 3
pass
return str(value)

def decode(self, _):
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def MakeXyzs(v):
See apache_beam.typehints.decorators module for more details.
"""

import six

from apache_beam.coders import coders
from apache_beam.typehints import typehints

Expand All @@ -84,7 +86,7 @@ def register_standard_coders(self, fallback_coder):
self._register_coder_internal(float, coders.FloatCoder)
self._register_coder_internal(str, coders.BytesCoder)
self._register_coder_internal(bytes, coders.BytesCoder)
self._register_coder_internal(unicode, coders.StrUtf8Coder)
self._register_coder_internal(six.text_type, coders.StrUtf8Coder)
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
# Default fallback coders applied in that order until the first matching
# coder found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter
import six

import apache_beam as beam
from apache_beam.io import ReadFromText
Expand Down Expand Up @@ -131,7 +132,7 @@ def make_entity(self, content):
datastore_helper.add_key_path(entity.key, self._kind, self._ancestor,
self._kind, str(uuid.uuid4()))

datastore_helper.add_properties(entity, {"content": unicode(content)})
datastore_helper.add_properties(entity, {"content": six.text_type(content)})
return entity


Expand Down Expand Up @@ -186,7 +187,7 @@ def count_ones(word_ones):

counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
string. The tags can contain only letters, digits and _.
"""

import six

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
Expand Down Expand Up @@ -983,7 +985,8 @@ def model_datastoreio():
def to_entity(content):
entity = entity_pb2.Entity()
googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4()))
googledatastore.helper.add_properties(entity, {'content': unicode(content)})
googledatastore.helper.add_properties(entity,
{'content': six.text_type(content)})
return entity

entities = musicians | 'To Entity' >> beam.Map(to_entity)
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/streaming_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import argparse
import logging

import six

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -65,7 +67,7 @@ def count_ones(word_ones):
transformed = (lines
# Use a pre-defined function that imports the re package.
| 'Split' >> (
beam.FlatMap(split_fn).with_output_types(unicode))
beam.FlatMap(split_fn).with_output_types(six.text_type))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/windowed_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import argparse
import logging

import six

import apache_beam as beam
import apache_beam.transforms.window as window

Expand Down Expand Up @@ -75,7 +77,7 @@ def count_ones(word_ones):

transformed = (lines
| 'Split' >> (beam.FlatMap(find_words)
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(2*60, 0))
| 'Group' >> beam.GroupByKey()
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import logging
import re

import six

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
Expand Down Expand Up @@ -94,7 +96,7 @@ def count_ones(word_ones):

counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/wordcount_debugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import logging
import re

import six

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
Expand Down Expand Up @@ -99,7 +101,7 @@ def count_ones(word_ones):

return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/wordcount_fnapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import logging
import re

import six

import apache_beam as beam
from apache_beam.io import ReadFromText
# TODO(BEAM-2887): Enable after the issue is fixed.
Expand Down Expand Up @@ -102,7 +104,7 @@ def run(argv=None):

counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group_and_sum' >> beam.CombinePerKey(sum))

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/examples/wordcount_minimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import logging
import re

import six

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
Expand Down Expand Up @@ -101,7 +103,7 @@ def run(argv=None):
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
.with_output_types(six.text_type))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))

Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import time
from socket import error as SocketError

import six

# pylint: disable=ungrouped-imports
from apache_beam.internal.gcp import auth
from apache_beam.utils import retry
Expand Down Expand Up @@ -252,7 +254,8 @@ def make_kind_stats_query(namespace, kind, latest_timestamp):
kind_stat_query.kind.add().name = '__Stat_Ns_Kind__'

kind_filter = datastore_helper.set_property_filter(
query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL, unicode(kind))
query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL,
six.text_type(kind))
timestamp_filter = datastore_helper.set_property_filter(
query_pb2.Filter(), 'timestamp', PropertyFilter.EQUAL,
latest_timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from apitools.base.py import encoding
from apitools.base.py import exceptions
import six

from apache_beam.internal.gcp.auth import get_service_credentials
from apache_beam.internal.gcp.json_value import to_json_value
Expand Down Expand Up @@ -287,7 +288,7 @@ def encode_shortstrings(input_buffer, errors='strict'):
def decode_shortstrings(input_buffer, errors='strict'):
"""Decoder (to Unicode) that suppresses long base64 strings."""
shortened, length = encode_shortstrings(input_buffer, errors)
return unicode(shortened), length
return six.text_type(shortened), length

def shortstrings_registerer(encoding_name):
if encoding_name == 'shortstrings':
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/transforms/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from datetime import datetime
from datetime import timedelta

import six

__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']


Expand Down Expand Up @@ -167,7 +169,7 @@ class DisplayDataItem(object):
display item belongs to.
"""
typeDict = {str:'STRING',
unicode:'STRING',
six.text_type:'STRING',
int:'INTEGER',
float:'FLOAT',
bool: 'BOOLEAN',
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/transforms/display_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import unittest
from datetime import datetime

# pylint: disable=ungrouped-imports
import hamcrest as hc
import six
from hamcrest.core.base_matcher import BaseMatcher

import apache_beam as beam
Expand All @@ -31,6 +33,8 @@
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData

# pylint: enable=ungrouped-imports


class DisplayDataItemMatcher(BaseMatcher):
""" Matcher class for DisplayDataItems in unit tests.
Expand Down Expand Up @@ -161,7 +165,7 @@ def test_create_list_display_data(self):
def test_unicode_type_display_data(self):
class MyDoFn(beam.DoFn):
def display_data(self):
return {'unicode_string': unicode('my string'),
return {'unicode_string': six.text_type('my string'),
'unicode_literal_string': u'my literal string'}

fn = MyDoFn()
Expand Down