Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,39 @@

* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->

# [2.29.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).

## I/Os

* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## New Features / Improvements

* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Breaking Changes

* Deterministic coding enforced for GroupByKey and Stateful DoFns. Previously non-deterministic coding was allowed, resulting in keys not properly being grouped in some cases. ([BEAM-11719](https://issues.apache.org/jira/browse/BEAM-11719))
To restore the old behavior, one can register `FakeDeterministicFastPrimitivesCoder` with
`beam.coders.registry.register_fallback_coder(beam.coders.coders.FakeDeterministicFastPrimitivesCoder())`
or use the `allow_non_deterministic_key_coders` pipeline option.
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Known Issues

* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).


# [2.28.0] - 2021-02-22

## Highlights
Expand Down Expand Up @@ -95,6 +128,7 @@
on removed APIs. If affected, ensure to use an appropriate Guava version via `dependencyManagement` in Maven and
`force` in Gradle.


# [2.27.0] - 2021-01-08

## I/Os
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def _check_safe(self, value):
self._check_safe(x)
else:
raise TypeError(
"Unable to deterministically code '%s' of type '%s', "
"Unable to deterministically encode '%s' of type '%s', "
"please provide a type hint for the input of '%s'" %
(value, type(value), self._step_label))

Expand Down Expand Up @@ -908,7 +908,7 @@ def decode(self, encoded):
class TupleCoderImpl(AbstractComponentCoderImpl):
"""A coder for tuple objects."""
def _extract_components(self, value):
return value
return tuple(value)

def _construct_from_components(self, components):
return tuple(components)
Expand Down
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ def as_deterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else:
raise ValueError(error_message or "'%s' cannot be made deterministic.")
raise ValueError(
error_message or
"%s cannot be made deterministic for '%s'." % (self, step_label))

def estimate_size(self, value):
"""Estimates the encoded size of the given value, in bytes.
Expand Down Expand Up @@ -852,6 +854,16 @@ def __hash__(self):
return hash(type(self))


class FakeDeterministicFastPrimitivesCoder(FastPrimitivesCoder):
"""A FastPrimitivesCoder that claims to be deterministic.

This can be registered as a fallback coder to go back to the behavior before
deterministic encoding was enforced (BEAM-11719).
"""
def is_deterministic(self):
return True


class Base64PickleCoder(Coder):
"""Coder of objects by Python pickle, then base64 encoding."""

Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ def test_fast_primitives_coder_large_int(self):
coder = coders.FastPrimitivesCoder()
self.check_coder(coder, 10**100)

def test_fake_deterministic_fast_primitives_coder(self):
coder = coders.FakeDeterministicFastPrimitivesCoder(coders.PickleCoder())
self.check_coder(coder, *self.test_values)
for v in self.test_values:
self.check_coder(coders.TupleCoder((coder, )), (v, ))

def test_bytes_coder(self):
self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000)

Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def register_standard_coders(self, fallback_coder):
default_fallback_coders = [coders.ProtoCoder, coders.FastPrimitivesCoder]
self._fallback_coder = fallback_coder or FirstOf(default_fallback_coders)

def register_fallback_coder(self, fallback_coder):
self._fallback_coder = FirstOf([fallback_coder, self._fallback_coder])

def _register_coder_internal(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
self._coders[typehint_type] = typehint_coder_class
Expand Down
27 changes: 11 additions & 16 deletions sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,17 @@ def test_basics_without_type_check(self):
# therefore any custom coders will not be used. The default coder (pickler)
# will be used instead.
temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
group_with_coder.run([
'--no_pipeline_type_check',
'--input=%s*' % temp_path,
'--output=%s.result' % temp_path
],
save_main_session=False)
# Parse result file and compare.
results = []
with open_shards(temp_path + '.result-*-of-*') as result_file:
for line in result_file:
name, points = line.split(',')
results.append((name, int(points)))
logging.info('result: %s', results)
self.assertEqual(
sorted(results),
sorted([('ann', 15), ('fred', 9), ('joe', 60), ('mary', 8)]))
with self.assertRaises(Exception) as context:
# yapf: disable
group_with_coder.run(
[
'--no_pipeline_type_check',
'--input=%s*' % temp_path,
'--output=%s.result' % temp_path
],
save_main_session=False)
self.assertIn('Unable to deterministically encode', str(context.exception))
self.assertIn('CombinePerKey(sum)/GroupByKey', str(context.exception))


if __name__ == '__main__':
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import hashlib
import logging
import pickle
import random
import uuid

Expand Down Expand Up @@ -858,9 +859,13 @@ def _load_data(
lambda x,
deleting_tables: deleting_tables,
pvalue.AsIter(temp_tables_pc))
| "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
# TableReference has no deterministic coder, but as this de-duplication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, parse_table_reference is used to normalize and populate TableReference instances. They are essentially a "(project, dataset, table) tuple."
It should be possible to beam.Map(lambda x: (x.tableId, x.datasetId, x.projectId)) | beam.Distinct() | beam.Map(lambda x: parse_table_reference(*x))

cc: @pabloem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming exactly these three fields would not be as future proof.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. The official BigQuery client (which this code doesn't use) has a hashable wrapper for this.
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.table.TableReference.html

# is best-effort, pickling should be good enough.
| "RemoveTempTables/AddUselessValue" >>
beam.Map(lambda x: (pickle.dumps(x), None))
| "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
| "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0])
| "RemoveTempTables/GetTableNames" >>
beam.MapTuple(lambda k, nones: pickle.loads(k))
| "RemoveTempTables/Delete" >> beam.ParDo(
DeleteTablesFn(self.test_client)))

Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,14 @@ def _add_argparse_args(cls, parser):
help='Enable faster type checking via sampling at pipeline execution '
'time. NOTE: only supported with portable runners '
'(including the DirectRunner)')
parser.add_argument(
'--allow_non_deterministic_key_coders',
default=False,
action='store_true',
help='Use non-deterministic coders (such as pickling) for key-grouping '
'operations such as GropuByKey. This is unsafe, as runners may group '
'keys based on their encoded bytes, but is available for backwards '
'compatibility. See BEAM-11719.')

def validate(self, unused_validator):
errors = []
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,10 @@ def to_runner_api(
# general shapes, potential conflicts will have to be resolved.
# We also only handle single-input, and (for fixing the output) single
# output, which is sufficient.
# Also marks such values as requiring deterministic key coders.
deterministic_key_coders = not self._options.view_as(
TypeOptions).allow_non_deterministic_key_coders

class ForceKvInputTypes(PipelineVisitor):
def enter_composite_transform(self, transform_node):
# type: (AppliedPTransform) -> None
Expand All @@ -854,18 +858,27 @@ def visit_transform(self, transform_node):
pcoll = transform_node.inputs[0]
pcoll.element_type = typehints.coerce_to_kv_type(
pcoll.element_type, transform_node.full_label)
pcoll.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)
if len(transform_node.outputs) == 1:
# The runner often has expectations about the output types as well.
output, = transform_node.outputs.values()
if not output.element_type:
output.element_type = transform_node.transform.infer_output_type(
pcoll.element_type)
if (isinstance(output.element_type,
typehints.TupleHint.TupleConstraint) and
len(output.element_type.tuple_types) == 2):
output.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)
for side_input in transform_node.transform.side_inputs:
if side_input.requires_keyed_input():
side_input.pvalue.element_type = typehints.coerce_to_kv_type(
side_input.pvalue.element_type,
transform_node.full_label,
side_input_producer=side_input.pvalue.producer.full_label)
side_input.pvalue.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)

self.visit(ForceKvInputTypes())

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self,
self.is_bounded = is_bounded
if windowing:
self._windowing = windowing
self.requires_deterministic_key_coder = None

def __str__(self):
return self._str_internal()
Expand Down Expand Up @@ -187,7 +188,8 @@ def to_runner_api(self, context):
# type: (PipelineContext) -> beam_runner_api_pb2.PCollection
return beam_runner_api_pb2.PCollection(
unique_name=self._unique_name(),
coder_id=context.coder_id_from_element_type(self.element_type),
coder_id=context.coder_id_from_element_type(
self.element_type, self.requires_deterministic_key_coder),
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
if self.is_bounded else beam_runner_api_pb2.IsBounded.UNBOUNDED,
windowing_strategy_id=context.windowing_strategies.get_id(
Expand Down
23 changes: 19 additions & 4 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -270,7 +271,7 @@ def _only_element(iterable):
return element

@staticmethod
def group_by_key_input_visitor():
def group_by_key_input_visitor(deterministic_key_coders=True):
# Imported here to avoid circular dependencies.
from apache_beam.pipeline import PipelineVisitor

Expand All @@ -292,16 +293,23 @@ def visit_transform(self, transform_node):
pcoll = transform_node.inputs[0]
pcoll.element_type = typehints.coerce_to_kv_type(
pcoll.element_type, transform_node.full_label)
pcoll.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)
key_type, value_type = pcoll.element_type.tuple_types
if transform_node.outputs:
key = DataflowRunner._only_element(transform_node.outputs.keys())
transform_node.outputs[key].element_type = typehints.KV[
key_type, typehints.Iterable[value_type]]
transform_node.outputs[key].requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)

return GroupByKeyInputVisitor()

@staticmethod
def side_input_visitor(use_unified_worker=False, use_fn_api=False):
def side_input_visitor(
use_unified_worker=False,
use_fn_api=False,
deterministic_key_coders=True):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.pipeline import PipelineVisitor
Expand Down Expand Up @@ -352,6 +360,8 @@ def visit_transform(self, transform_node):
# access pattern to appease Dataflow.
side_input.pvalue.element_type = typehints.coerce_to_kv_type(
side_input.pvalue.element_type, transform_node.full_label)
side_input.pvalue.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)
new_side_input = _DataflowMultimapSideInput(side_input)
else:
raise ValueError(
Expand Down Expand Up @@ -434,7 +444,10 @@ def _check_for_unsupported_fnapi_features(self, pipeline_proto):
def _adjust_pipeline_for_dataflow_v2(self, pipeline):
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
# here.
pipeline.visit(self.group_by_key_input_visitor())
pipeline.visit(
self.group_by_key_input_visitor(
not pipeline._options.view_as(
TypeOptions).allow_non_deterministic_key_coders))

def _check_for_unsupported_features_on_non_portable_worker(self, pipeline):
pipeline.visit(self.combinefn_visitor())
Expand Down Expand Up @@ -471,7 +484,9 @@ def run_pipeline(self, pipeline, options):
pipeline.visit(
self.side_input_visitor(
apiclient._use_unified_worker(options),
apiclient._use_fnapi(options)))
apiclient._use_fnapi(options),
deterministic_key_coders=not options.view_as(
TypeOptions).allow_non_deterministic_key_coders))

# Performing configured PTransform overrides. Note that this is currently
# done before Runner API serialization, since the new proto needs to contain
Expand Down
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/runners/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,20 @@ def requirements(self):
# rather than an actual coder. The element type is required for some runners,
# as well as performing a round-trip through protos.
# TODO(BEAM-2717): Remove once this is no longer needed.
def coder_id_from_element_type(self, element_type):
# type: (Any) -> str
def coder_id_from_element_type(
self, element_type, requires_deterministic_key_coder=None):
# type: (Any, Optional[str]) -> str
if self.use_fake_coders:
return pickler.dumps(element_type).decode('ascii')
else:
return self.coders.get_id(coders.registry.get_coder(element_type))
coder = coders.registry.get_coder(element_type)
if requires_deterministic_key_coder:
coder = coders.TupleCoder([
coder.key_coder().as_deterministic_coder(
requires_deterministic_key_coder),
coder.value_coder()
])
return self.coders.get_id(coder)

def element_type_from_coder_id(self, coder_id):
# type: (str) -> Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ def run_pipeline(self,
# are known to be KVs.
from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
# TODO: Move group_by_key_input_visitor() to a non-dataflow specific file.
pipeline.visit(DataflowRunner.group_by_key_input_visitor())
pipeline.visit(
DataflowRunner.group_by_key_input_visitor(
not options.view_as(pipeline_options.TypeOptions).
allow_non_deterministic_key_coders))
self._bundle_repeat = self._bundle_repeat or options.view_as(
pipeline_options.DirectOptions).direct_runner_bundle_repeat
pipeline_direct_num_workers = options.view_as(
Expand Down
Loading