diff --git a/CHANGES.md b/CHANGES.md index 037cffbed563..83bd2ed0ba78 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -45,6 +45,39 @@ * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). --> + +# [2.29.0] - Unreleased + +## Highlights + +* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)). + +## I/Os + +* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). + +## New Features / Improvements + +* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). + +## Breaking Changes + +* Deterministic coding enforced for GroupByKey and Stateful DoFns. Previously non-deterministic coding was allowed, resulting in keys not properly being grouped in some cases. ([BEAM-11719](https://issues.apache.org/jira/browse/BEAM-11719)) + To restore the old behavior, one can register `FakeDeterministicFastPrimitivesCoder` with + `beam.coders.registry.register_fallback_coder(beam.coders.coders.FakeDeterministicFastPrimitivesCoder())` + or use the `allow_non_deterministic_key_coders` pipeline option. +* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). + +## Known Issues + +* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). + + # [2.28.0] - 2021-02-22 ## Highlights @@ -95,6 +128,7 @@ on removed APIs. If affected, ensure to use an appropriate Guava version via `dependencyManagement` in Maven and `force` in Gradle. + # [2.27.0] - 2021-01-08 ## I/Os diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 5cd3685afb62..9ef22c53a8c0 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -300,7 +300,7 @@ def _check_safe(self, value): self._check_safe(x) else: raise TypeError( - "Unable to deterministically code '%s' of type '%s', " + "Unable to deterministically encode '%s' of type '%s', " "please provide a type hint for the input of '%s'" % (value, type(value), self._step_label)) @@ -908,7 +908,7 @@ def decode(self, encoded): class TupleCoderImpl(AbstractComponentCoderImpl): """A coder for tuple objects.""" def _extract_components(self, value): - return value + return tuple(value) def _construct_from_components(self, components): return tuple(components) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 1bf0fa8c5efd..6c570b060137 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -165,7 +165,9 @@ def as_deterministic_coder(self, step_label, error_message=None): if self.is_deterministic(): return self else: - raise ValueError(error_message or "'%s' cannot be made deterministic.") + raise ValueError( + error_message or + "%s cannot be made deterministic for '%s'." % (self, step_label)) def estimate_size(self, value): """Estimates the encoded size of the given value, in bytes. @@ -852,6 +854,16 @@ def __hash__(self): return hash(type(self)) +class FakeDeterministicFastPrimitivesCoder(FastPrimitivesCoder): + """A FastPrimitivesCoder that claims to be deterministic. + + This can be registered as a fallback coder to go back to the behavior before + deterministic encoding was enforced (BEAM-11719). + """ + def is_deterministic(self): + return True + + class Base64PickleCoder(Coder): """Coder of objects by Python pickle, then base64 encoding.""" diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 38e67fc6480a..3250f6b2e1f4 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -195,6 +195,12 @@ def test_fast_primitives_coder_large_int(self): coder = coders.FastPrimitivesCoder() self.check_coder(coder, 10**100) + def test_fake_deterministic_fast_primitives_coder(self): + coder = coders.FakeDeterministicFastPrimitivesCoder(coders.PickleCoder()) + self.check_coder(coder, *self.test_values) + for v in self.test_values: + self.check_coder(coders.TupleCoder((coder, )), (v, )) + def test_bytes_coder(self): self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 958511e2ccaf..b0d49835ebb4 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -103,6 +103,9 @@ def register_standard_coders(self, fallback_coder): default_fallback_coders = [coders.ProtoCoder, coders.FastPrimitivesCoder] self._fallback_coder = fallback_coder or FirstOf(default_fallback_coders) + def register_fallback_coder(self, fallback_coder): + self._fallback_coder = FirstOf([fallback_coder, self._fallback_coder]) + def _register_coder_internal(self, typehint_type, typehint_coder_class): # type: (Any, Type[coders.Coder]) -> None self._coders[typehint_type] = typehint_coder_class diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index d896f48a9935..8bac55bb930d 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -80,22 +80,17 @@ def test_basics_without_type_check(self): # therefore any custom coders will not be used. The default coder (pickler) # will be used instead. temp_path = self.create_temp_file(self.SAMPLE_RECORDS) - group_with_coder.run([ - '--no_pipeline_type_check', - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path - ], - save_main_session=False) - # Parse result file and compare. - results = [] - with open_shards(temp_path + '.result-*-of-*') as result_file: - for line in result_file: - name, points = line.split(',') - results.append((name, int(points))) - logging.info('result: %s', results) - self.assertEqual( - sorted(results), - sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)])) + with self.assertRaises(Exception) as context: + # yapf: disable + group_with_coder.run( + [ + '--no_pipeline_type_check', + '--input=%s*' % temp_path, + '--output=%s.result' % temp_path + ], + save_main_session=False) + self.assertIn('Unable to deterministically encode', str(context.exception)) + self.assertIn('CombinePerKey(sum)/GroupByKey', str(context.exception)) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index ae5ebcc03205..ab85024076fb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -32,6 +32,7 @@ import hashlib import logging +import pickle import random import uuid @@ -858,9 +859,13 @@ def _load_data( lambda x, deleting_tables: deleting_tables, pvalue.AsIter(temp_tables_pc)) - | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None)) + # TableReference has no deterministic coder, but as this de-duplication + # is best-effort, pickling should be good enough. + | "RemoveTempTables/AddUselessValue" >> + beam.Map(lambda x: (pickle.dumps(x), None)) | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey() - | "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0]) + | "RemoveTempTables/GetTableNames" >> + beam.MapTuple(lambda k, nones: pickle.loads(k)) | "RemoveTempTables/Delete" >> beam.ParDo( DeleteTablesFn(self.test_client))) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index bda9df188d6a..31832e4951cd 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -516,6 +516,14 @@ def _add_argparse_args(cls, parser): help='Enable faster type checking via sampling at pipeline execution ' 'time. NOTE: only supported with portable runners ' '(including the DirectRunner)') + parser.add_argument( + '--allow_non_deterministic_key_coders', + default=False, + action='store_true', + help='Use non-deterministic coders (such as pickling) for key-grouping ' + 'operations such as GropuByKey. This is unsafe, as runners may group ' + 'keys based on their encoded bytes, but is available for backwards ' + 'compatibility. See BEAM-11719.') def validate(self, unused_validator): errors = [] diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 7f3162fc000f..372bde86f35d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -841,6 +841,10 @@ def to_runner_api( # general shapes, potential conflicts will have to be resolved. # We also only handle single-input, and (for fixing the output) single # output, which is sufficient. + # Also marks such values as requiring deterministic key coders. + deterministic_key_coders = not self._options.view_as( + TypeOptions).allow_non_deterministic_key_coders + class ForceKvInputTypes(PipelineVisitor): def enter_composite_transform(self, transform_node): # type: (AppliedPTransform) -> None @@ -854,18 +858,27 @@ def visit_transform(self, transform_node): pcoll = transform_node.inputs[0] pcoll.element_type = typehints.coerce_to_kv_type( pcoll.element_type, transform_node.full_label) + pcoll.requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) if len(transform_node.outputs) == 1: # The runner often has expectations about the output types as well. output, = transform_node.outputs.values() if not output.element_type: output.element_type = transform_node.transform.infer_output_type( pcoll.element_type) + if (isinstance(output.element_type, + typehints.TupleHint.TupleConstraint) and + len(output.element_type.tuple_types) == 2): + output.requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) for side_input in transform_node.transform.side_inputs: if side_input.requires_keyed_input(): side_input.pvalue.element_type = typehints.coerce_to_kv_type( side_input.pvalue.element_type, transform_node.full_label, side_input_producer=side_input.pvalue.producer.full_label) + side_input.pvalue.requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) self.visit(ForceKvInputTypes()) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 5196525a643c..0744e981fad5 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -109,6 +109,7 @@ def __init__(self, self.is_bounded = is_bounded if windowing: self._windowing = windowing + self.requires_deterministic_key_coder = None def __str__(self): return self._str_internal() @@ -187,7 +188,8 @@ def to_runner_api(self, context): # type: (PipelineContext) -> beam_runner_api_pb2.PCollection return beam_runner_api_pb2.PCollection( unique_name=self._unique_name(), - coder_id=context.coder_id_from_element_type(self.element_type), + coder_id=context.coder_id_from_element_type( + self.element_type, self.requires_deterministic_key_coder), is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED if self.is_bounded else beam_runner_api_pb2.IsBounded.UNBOUNDED, windowing_strategy_id=context.windowing_strategies.get_id( diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index e837d833f8fe..892284f583ea 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -51,6 +51,7 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions +from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 @@ -270,7 +271,7 @@ def _only_element(iterable): return element @staticmethod - def group_by_key_input_visitor(): + def group_by_key_input_visitor(deterministic_key_coders=True): # Imported here to avoid circular dependencies. from apache_beam.pipeline import PipelineVisitor @@ -292,16 +293,23 @@ def visit_transform(self, transform_node): pcoll = transform_node.inputs[0] pcoll.element_type = typehints.coerce_to_kv_type( pcoll.element_type, transform_node.full_label) + pcoll.requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: key = DataflowRunner._only_element(transform_node.outputs.keys()) transform_node.outputs[key].element_type = typehints.KV[ key_type, typehints.Iterable[value_type]] + transform_node.outputs[key].requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) return GroupByKeyInputVisitor() @staticmethod - def side_input_visitor(use_unified_worker=False, use_fn_api=False): + def side_input_visitor( + use_unified_worker=False, + use_fn_api=False, + deterministic_key_coders=True): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor @@ -352,6 +360,8 @@ def visit_transform(self, transform_node): # access pattern to appease Dataflow. side_input.pvalue.element_type = typehints.coerce_to_kv_type( side_input.pvalue.element_type, transform_node.full_label) + side_input.pvalue.requires_deterministic_key_coder = ( + deterministic_key_coders and transform_node.full_label) new_side_input = _DataflowMultimapSideInput(side_input) else: raise ValueError( @@ -434,7 +444,10 @@ def _check_for_unsupported_fnapi_features(self, pipeline_proto): def _adjust_pipeline_for_dataflow_v2(self, pipeline): # Dataflow runner requires a KV type for GBK inputs, hence we enforce that # here. - pipeline.visit(self.group_by_key_input_visitor()) + pipeline.visit( + self.group_by_key_input_visitor( + not pipeline._options.view_as( + TypeOptions).allow_non_deterministic_key_coders)) def _check_for_unsupported_features_on_non_portable_worker(self, pipeline): pipeline.visit(self.combinefn_visitor()) @@ -471,7 +484,9 @@ def run_pipeline(self, pipeline, options): pipeline.visit( self.side_input_visitor( apiclient._use_unified_worker(options), - apiclient._use_fnapi(options))) + apiclient._use_fnapi(options), + deterministic_key_coders=not options.view_as( + TypeOptions).allow_non_deterministic_key_coders)) # Performing configured PTransform overrides. Note that this is currently # done before Runner API serialization, since the new proto needs to contain diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index aafa257b3dd5..09950f5fbe00 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -233,12 +233,20 @@ def requirements(self): # rather than an actual coder. The element type is required for some runners, # as well as performing a round-trip through protos. # TODO(BEAM-2717): Remove once this is no longer needed. - def coder_id_from_element_type(self, element_type): - # type: (Any) -> str + def coder_id_from_element_type( + self, element_type, requires_deterministic_key_coder=None): + # type: (Any, Optional[str]) -> str if self.use_fake_coders: return pickler.dumps(element_type).decode('ascii') else: - return self.coders.get_id(coders.registry.get_coder(element_type)) + coder = coders.registry.get_coder(element_type) + if requires_deterministic_key_coder: + coder = coders.TupleCoder([ + coder.key_coder().as_deterministic_coder( + requires_deterministic_key_coder), + coder.value_coder() + ]) + return self.coders.get_id(coder) def element_type_from_coder_id(self, coder_id): # type: (str) -> Any diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 0c88299266a1..4b47b5e76efc 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -155,7 +155,10 @@ def run_pipeline(self, # are known to be KVs. from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner # TODO: Move group_by_key_input_visitor() to a non-dataflow specific file. - pipeline.visit(DataflowRunner.group_by_key_input_visitor()) + pipeline.visit( + DataflowRunner.group_by_key_input_visitor( + not options.view_as(pipeline_options.TypeOptions). + allow_non_deterministic_key_coders)) self._bundle_repeat = self._bundle_repeat or options.view_as( pipeline_options.DirectOptions).direct_runner_bundle_repeat pipeline_direct_num_workers = options.view_as( diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index d4f8424c32a5..2f2bb98f8705 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -25,6 +25,8 @@ import collections import operator +import pickle +import random import re import sys import typing @@ -34,6 +36,7 @@ from builtins import zip from functools import reduce from typing import Optional +from unittest.mock import patch # patches unittest.TestCase to be python3 compatible import future.tests.base # pylint: disable=unused-import @@ -500,6 +503,94 @@ def process(self, gbk_result): | 'Reiteration-Sum' >> beam.ParDo(MyDoFn())) assert_that(result, equal_to([(1, 170)])) + def test_group_by_key_deterministic_coder(self): + # pylint: disable=global-variable-not-assigned + global MyObject # for pickling of the class instance + + class MyObject: + def __init__(self, value): + self.value = value + + def __eq__(self, other): + return self.value == other.value + + def __hash__(self): + return hash(self.value) + + class MyObjectCoder(beam.coders.Coder): + def encode(self, o): + return pickle.dumps((o.value, random.random())) + + def decode(self, encoded): + return MyObject(pickle.loads(encoded)[0]) + + def as_deterministic_coder(self, *args): + return MydeterministicObjectCoder() + + def to_type_hint(self): + return MyObject + + class MydeterministicObjectCoder(beam.coders.Coder): + def encode(self, o): + return pickle.dumps(o.value) + + def decode(self, encoded): + return MyObject(pickle.loads(encoded)) + + def is_deterministic(self): + return True + + beam.coders.registry.register_coder(MyObject, MyObjectCoder) + + with TestPipeline() as pipeline: + pcoll = pipeline | beam.Create([(MyObject(k % 2), k) for k in range(10)]) + grouped = pcoll | beam.GroupByKey() | beam.MapTuple( + lambda k, vs: (k.value, sorted(vs))) + combined = pcoll | beam.CombinePerKey(sum) | beam.MapTuple( + lambda k, v: (k.value, v)) + assert_that( + grouped, + equal_to([(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]), + 'CheckGrouped') + assert_that(combined, equal_to([(0, 20), (1, 25)]), 'CheckCombined') + + def test_group_by_key_non_deterministic_coder(self): + with self.assertRaisesRegex(Exception, r'deterministic'): + with TestPipeline() as pipeline: + _ = ( + pipeline + | beam.Create([(PickledObject(10), None)]) + | beam.GroupByKey() + | beam.MapTuple(lambda k, v: list(v))) + + def test_group_by_key_allow_non_deterministic_coder(self): + with TestPipeline() as pipeline: + # The GroupByKey below would fail without this option. + pipeline._options.view_as( + TypeOptions).allow_non_deterministic_key_coders = True + grouped = ( + pipeline + | beam.Create([(PickledObject(10), None)]) + | beam.GroupByKey() + | beam.MapTuple(lambda k, v: list(v))) + assert_that(grouped, equal_to([[None]])) + + def test_group_by_key_fake_deterministic_coder(self): + fresh_registry = beam.coders.typecoders.CoderRegistry() + with patch.object( + beam.coders, 'registry', fresh_registry), patch.object( + beam.coders.typecoders, 'registry', fresh_registry): + with TestPipeline() as pipeline: + # The GroupByKey below would fail without this registration. + beam.coders.registry.register_fallback_coder( + beam.coders.coders.FakeDeterministicFastPrimitivesCoder()) + grouped = ( + pipeline + | beam.Create([(PickledObject(10), None)]) + | beam.GroupByKey() + | beam.MapTuple(lambda k, v: list(v))) + assert_that(grouped, equal_to([[None]])) + def test_partition_with_partition_fn(self): class SomePartitionFn(beam.PartitionFn): def partition_for(self, element, num_partitions, offset): @@ -1368,7 +1459,6 @@ def more_than_half(a): def test_filter_type_checks_using_type_hints_decorator(self): @with_input_types(b=int) def half(b): - import random return bool(random.choice([0, 1])) # Filter should deduce that it returns the same type that it takes. @@ -2508,5 +2598,11 @@ def _sort_lists(result): _SortLists = beam.Map(_sort_lists) + +class PickledObject(object): + def __init__(self, value): + self.value = value + + if __name__ == '__main__': unittest.main()