Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/coders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from apache_beam.coders.coders import *
from apache_beam.coders.typecoders import registry
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/coder_impl.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE

cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef CoderImpl fallback_coder_impl
@cython.locals(unicode_value=unicode, dict_value=dict)
@cython.locals(dict_value = dict)
cpdef encode_to_stream(self, value, OutputStream stream, bint nested)


Expand Down
61 changes: 35 additions & 26 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,33 @@
coder_impl.pxd file for type hints.

For internal use only; no backwards-compatibility guarantees.

isort:skip_file
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

native_int = int

from types import NoneType
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
from builtins import bytes
from builtins import chr
from builtins import int
from builtins import object
from builtins import range
from builtins import str

import six
from past.builtins import str as old_str
from past.builtins import long
from past.builtins import unicode

from apache_beam.coders import observable
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from .stream import InputStream as create_InputStream
from .stream import OutputStream as create_OutputStream
Expand All @@ -54,11 +67,6 @@
from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports

try:
long # Python 2
except NameError:
long = int # Python 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep long for Python 2 compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The int from the future.builtins is a subclass of python 2's long.



class CoderImpl(object):
"""For internal use only; no backwards-compatibility guarantees."""
Expand Down Expand Up @@ -199,7 +207,7 @@ def __init__(self, coder, step_label):
self._step_label = step_label

def _check_safe(self, value):
if isinstance(value, (str, six.text_type, long, int, float)):
if isinstance(value, (str, bytes, int, float)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep long for Python 2 compatibility.

pass
elif value is None:
pass
Expand Down Expand Up @@ -279,18 +287,21 @@ def get_estimated_size_and_observables(self, value, nested=False):

def encode_to_stream(self, value, stream, nested):
t = type(value)
if t is NoneType:
if value is None:
stream.write_byte(NONE_TYPE)
elif t is int:
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
elif t is int or t is native_int or t is long:
stream.write_byte(INT_TYPE)
stream.write_var_int64(value)
elif t is float:
stream.write_byte(FLOAT_TYPE)
stream.write_bigendian_double(value)
elif t is str:
elif t is bytes or t is old_str:
stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is six.text_type:
elif t is str or t is unicode:
unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
stream.write(unicode_value.encode('utf-8'), nested)
Expand All @@ -304,12 +315,9 @@ def encode_to_stream(self, value, stream, nested):
dict_value = value # for typing
stream.write_byte(DICT_TYPE)
stream.write_var_int64(len(dict_value))
for k, v in dict_value.iteritems():
for k, v in dict_value.items():
self.encode_to_stream(k, stream, True)
self.encode_to_stream(v, stream, True)
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
else:
stream.write_byte(UNKNOWN_TYPE)
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
Expand All @@ -318,6 +326,8 @@ def decode_from_stream(self, stream, nested):
t = stream.read_byte()
if t == NONE_TYPE:
return None
elif t == BOOL_TYPE:
return not not stream.read_byte()
elif t == INT_TYPE:
return stream.read_var_int64()
elif t == FLOAT_TYPE:
Expand All @@ -341,8 +351,6 @@ def decode_from_stream(self, stream, nested):
k = self.decode_from_stream(stream, True)
v[k] = self.decode_from_stream(stream, True)
return v
elif t == BOOL_TYPE:
return not not stream.read_byte()

return self.fallback_coder_impl.decode_from_stream(stream, nested)

Expand Down Expand Up @@ -394,8 +402,9 @@ def _from_normal_time(self, value):

def encode_to_stream(self, value, out, nested):
span_micros = value.end.micros - value.start.micros
out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000))
out.write_var_int64(span_micros / 1000)
out.write_bigendian_uint64(
self._from_normal_time(value.end.micros // 1000))
out.write_var_int64(span_micros // 1000)

def decode_from_stream(self, in_, nested):
end_millis = self._to_normal_time(in_.read_bigendian_uint64())
Expand All @@ -409,7 +418,7 @@ def estimate_size(self, value, nested=False):
# An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
# and a varint timespam.
span = value.end.micros - value.start.micros
return 8 + get_varint_size(span / 1000)
return 8 + get_varint_size(span // 1000)


class TimestampCoderImpl(StreamCoderImpl):
Expand All @@ -427,7 +436,7 @@ def estimate_size(self, unused_value, nested=False):
return 8


small_ints = [chr(_) for _ in range(128)]
small_ints = [chr(_).encode('latin-1') for _ in range(128)]


class VarIntCoderImpl(StreamCoderImpl):
Expand Down Expand Up @@ -783,7 +792,7 @@ def encode_to_stream(self, value, out, nested):
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
self._from_normal_time(
restore_sign * (abs(wv.timestamp_micros) / 1000)))
restore_sign * (abs(wv.timestamp_micros) // 1000)))
self._windows_coder.encode_to_stream(wv.windows, out, True)
# Default PaneInfo encoded byte representing NO_FIRING.
self._pane_info_coder.encode_to_stream(wv.pane_info, out, True)
Expand All @@ -797,9 +806,9 @@ def decode_from_stream(self, in_stream, nested):
# were indeed MIN/MAX timestamps.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000):
if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000):
timestamp = MIN_TIMESTAMP.micros
elif timestamp == (MAX_TIMESTAMP.micros / 1000):
elif timestamp == (MAX_TIMESTAMP.micros // 1000):
timestamp = MAX_TIMESTAMP.micros
else:
timestamp *= 1000
Expand Down
19 changes: 13 additions & 6 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
Only those coders listed in __all__ are part of the public API of this module.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import base64
import cPickle as pickle
from builtins import object
from builtins import str

import google.protobuf
from google.protobuf import wrappers_pb2
Expand All @@ -33,6 +36,12 @@
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.utils import proto_utils

# This is for py2/3 compatibility. cPickle was renamed pickle in python 3.
try:
import cPickle as pickle # Python 2
except ImportError:
import pickle # Python 3

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from .stream import get_varint_size
Expand Down Expand Up @@ -216,6 +225,9 @@ def __eq__(self, other):
and self._dict_without_impl() == other._dict_without_impl())
# pylint: enable=protected-access

def __hash__(self):
return hash(type(self))

_known_urns = {}

@classmethod
Expand Down Expand Up @@ -309,11 +321,6 @@ class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""

def encode(self, value):
try: # Python 2
if isinstance(value, unicode):
return value.encode('utf-8')
except NameError: # Python 3
pass
return str(value)

def decode(self, _):
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which of these is really necessary? Do we use division and print in this file?


import base64
import logging
import unittest
from builtins import object

from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
Expand Down Expand Up @@ -99,6 +102,9 @@ def __eq__(self, other):
return True
return False

def __hash__(self):
return hash(type(self))


class FallbackCoderTest(unittest.TestCase):

Expand Down
Loading