Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def _display_data(num_quantiles, key, reverse, weighted, input_batched):
}

@typehints.with_input_types(
typehints.Union[typing.Sequence[T], Tuple[T, float]])
typing.Union[T, typing.Sequence[T], Tuple[T, float]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good catch, not sure why the typehints type was passed through here

@typehints.with_output_types(List[T])
class Globally(PTransform):
"""
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/typehints/native_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
collections.abc.Set,
collections.abc.MutableSet,
collections.abc.Collection,
collections.abc.Sequence,
]


Expand Down Expand Up @@ -144,6 +145,10 @@ def _match_is_exactly_collection(user_type):
return getattr(user_type, '__origin__', None) is collections.abc.Collection


def _match_is_exactly_sequence(user_type):
return getattr(user_type, '__origin__', None) is collections.abc.Sequence


def match_is_named_tuple(user_type):
return (
_safe_issubclass(user_type, typing.Tuple) and
Expand Down Expand Up @@ -405,6 +410,10 @@ def convert_to_beam_type(typ):
match=_match_issubclass(TypedWindowedValue),
arity=1,
beam_type=typehints.WindowedValue),
_TypeMapEntry(
match=_match_is_exactly_sequence,
arity=1,
beam_type=typehints.Sequence),
]

# Find the first matching entry.
Expand Down Expand Up @@ -521,6 +530,8 @@ def convert_to_python_type(typ):
return tuple[tuple(convert_to_python_types(typ.tuple_types))]
if isinstance(typ, typehints.TupleSequenceConstraint):
return tuple[convert_to_python_type(typ.inner_type), ...]
if isinstance(typ, typehints.ABCSequenceTypeConstraint):
return collections.abc.Sequence[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.IteratorTypeConstraint):
return collections.abc.Iterator[convert_to_python_type(typ.yielded_type)]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ def test_convert_to_beam_type_with_collections_types(self):
'collection of tuples',
collections.abc.Collection[tuple[str, int]],
typehints.Collection[typehints.Tuple[str, int]]),
(
'nested sequence',
tuple[collections.abc.Sequence[str], int],
typehints.Tuple[typehints.Sequence[str], int]),
(
'sequence of tuples',
collections.abc.Sequence[tuple[str, int]],
typehints.Sequence[typehints.Tuple[str, int]]),
]

for test_case in test_cases:
Expand Down
40 changes: 40 additions & 0 deletions sdks/python/apache_beam/typehints/typehints.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
'Set',
'FrozenSet',
'Collection',
'Sequence',
'Iterable',
'Iterator',
'Generator',
Expand Down Expand Up @@ -1035,6 +1036,7 @@ def _is_subclass_constraint(sub):
sub,
(
CollectionTypeConstraint,
ABCSequenceTypeConstraint,
FrozenSetTypeConstraint,
SetTypeConstraint,
ListConstraint))
Expand Down Expand Up @@ -1072,6 +1074,43 @@ def __getitem__(self, type_param):
CollectionTypeConstraint = CollectionHint.CollectionTypeConstraint


class SequenceHint(CompositeTypeHint):
"""A Sequence type-hint.

Sequence[X] defines a type-hint for a sequence of homogeneous types. 'X' may
be either a built-in Python type or another nested TypeConstraint.

This represents collections.abc.Sequence type, which implements __getitem__,
__len__, and __contains__. This is more specific than Iterable but less
restrictive than List, providing a good middle ground for sequence-like types.
"""
class ABCSequenceTypeConstraint(SequenceTypeConstraint):
def __init__(self, type_param):
super().__init__(type_param, abc.Sequence)

def __repr__(self):
return 'Sequence[%s]' % repr(self.inner_type)

def _consistent_with_check_(self, sub):
if isinstance(sub, (ListConstraint, TupleConstraint)):
# Lists and Tuples are Sequences
if isinstance(sub, TupleConstraint):
# For tuples, all elements must be consistent with the sequence type
return all(
is_consistent_with(elem, self.inner_type)
for elem in sub.tuple_types)
return is_consistent_with(sub.inner_type, self.inner_type)
return super()._consistent_with_check_(sub)

def __getitem__(self, type_param):
validate_composite_type_param(
type_param, error_msg_prefix='Parameter to a Sequence hint')
return self.ABCSequenceTypeConstraint(type_param)


ABCSequenceTypeConstraint = SequenceHint.ABCSequenceTypeConstraint


class IterableHint(CompositeTypeHint):
"""An Iterable type-hint.

Expand Down Expand Up @@ -1252,6 +1291,7 @@ def __getitem__(self, type_params):
Set = SetHint()
FrozenSet = FrozenSetHint()
Collection = CollectionHint()
Sequence = SequenceHint()
Iterable = IterableHint()
Iterator = IteratorHint()
Generator = GeneratorHint()
Expand Down
25 changes: 25 additions & 0 deletions sdks/python/apache_beam/typehints/typehints_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,30 @@ def test_getitem_invalid_composite_type_param(self):
e.exception.args[0])


class SequenceHintTestCase(TypeHintTestCase):
def test_type_constraint_compatibility(self):
self.assertCompatible(typehints.Sequence[str], typehints.List[str])
self.assertCompatible(typehints.Sequence[str], typehints.Tuple[str])
self.assertCompatible(
typehints.Sequence[typehints.Any], typehints.Sequence[str])
self.assertCompatible(
typehints.Sequence[str], typehints.Sequence[typehints.Any])
self.assertCompatible(typehints.Any, typehints.Sequence[str])

def test_one_way_compatibility(self):
self.assertNotCompatible(typehints.List[str], typehints.Sequence[str])
self.assertNotCompatible(typehints.Tuple[str], typehints.Sequence[str])

def test_getitem_invalid_composite_type_param(self):
with self.assertRaises(TypeError) as e:
typehints.Sequence[5]
self.assertEqual(
'Parameter to a Sequence hint must be a '
'non-sequence, a type, or a TypeConstraint. 5 is '
'an instance of int.',
e.exception.args[0])


class IterableHintTestCase(TypeHintTestCase):
def test_getitem_invalid_composite_type_param(self):
with self.assertRaises(TypeError) as e:
Expand All @@ -891,6 +915,7 @@ def test_compatibility(self):
self.assertCompatible(
typehints.Iterable[typehints.Any],
typehints.List[typehints.Tuple[int, bool]])
self.assertCompatible(typehints.Iterable[str], typehints.Sequence[str])

self.assertCompatible(typehints.Iterable[int], typehints.Iterable[int])
self.assertCompatible(
Expand Down
Loading