Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from __future__ import absolute_import

import logging
import os
import sys
import unittest
from builtins import object

Expand Down Expand Up @@ -109,6 +111,10 @@ def test_repr(self):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSubOverride(unittest.TestCase):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_expand_with_topic(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
Expand All @@ -133,6 +139,10 @@ def test_expand_with_topic(self):
self.assertEqual('a_topic', source.topic_name)
self.assertEqual('a_label', source.id_label)

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_expand_with_subscription(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
Expand Down Expand Up @@ -169,6 +179,10 @@ def test_expand_with_both_topic_and_subscription(self):
ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
with_attributes=False, timestamp_attribute=None)

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_expand_with_other_options(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ def test_gbk_side_input(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6878')
def test_multimap_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create(['a', 'b'])
Expand Down
24 changes: 16 additions & 8 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,8 @@ def test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_combine_properly_pipeline_type_checks_using_decorator(self):
@with_output_types(int)
@with_input_types(ints=typehints.Iterable[int])
Expand Down Expand Up @@ -1637,7 +1638,8 @@ def test_combine_insufficient_type_hint_information(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_mean_globally_pipeline_checking_satisfied(self):
d = (self.p
| 'C' >> beam.Create(range(5)).with_output_types(int)
Expand Down Expand Up @@ -1668,7 +1670,8 @@ def test_mean_globally_pipeline_checking_violated(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_mean_globally_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True

Expand Down Expand Up @@ -1779,7 +1782,8 @@ def test_mean_per_key_runtime_checking_violated(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
| 'P' >> beam.Create(range(5)).with_output_types(int)
Expand All @@ -1791,7 +1795,8 @@ def test_count_globally_pipeline_type_checking_satisfied(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_count_globally_runtime_type_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True

Expand Down Expand Up @@ -2082,7 +2087,8 @@ def test_runtime_type_check_python_type_error(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_pardo_type_inference(self):
self.assertEqual(int,
beam.Filter(lambda x: False).infer_output_type(int))
Expand All @@ -2096,7 +2102,8 @@ def test_gbk_type_inference(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_pipeline_inference(self):
created = self.p | beam.Create(['a', 'b', 'c'])
mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
Expand All @@ -2108,7 +2115,8 @@ def test_pipeline_inference(self):

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
'This test still needs to be fixed on Python 3.6.'
'See BEAM-6877')
def test_inferred_bad_kv_type(self):
with self.assertRaises(typehints.TypeCheckError) as e:
_ = (self.p
Expand Down
64 changes: 6 additions & 58 deletions sdks/python/apache_beam/typehints/trivial_inference_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,29 @@
global_int = 1


@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6. '
'See BEAM-6877')
class TrivialInferenceTest(unittest.TestCase):

def assertReturnType(self, expected, f, inputs=()):
self.assertEquals(expected, trivial_inference.infer_return_type(f, inputs))

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testIdentity(self):
self.assertReturnType(int, lambda x: x, [int])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testIndexing(self):
self.assertReturnType(int, lambda x: x[0], [typehints.Tuple[int, str]])
self.assertReturnType(str, lambda x: x[1], [typehints.Tuple[int, str]])
self.assertReturnType(str, lambda x: x[1], [typehints.List[str]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testTuples(self):
self.assertReturnType(
typehints.Tuple[typehints.Tuple[()], int], lambda x: ((), x), [int])
self.assertReturnType(
typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testGetItem(self):
def reverse(ab):
return ab[-1], ab[0]
Expand All @@ -74,9 +66,6 @@ def reverse(ab):
self.assertReturnType(
typehints.List[int], lambda v: v[::-1], [typehints.List[int]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testUnpack(self):
def reverse(a_b):
(a, b) = a_b
Expand All @@ -99,19 +88,13 @@ def reverse(a_b):
self.assertReturnType(any_tuple,
reverse, [trivial_inference.Const((1, 2, 3))])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testNoneReturn(self):
def func(a):
if a == 5:
return a
return None
self.assertReturnType(typehints.Union[int, type(None)], func, [int])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testSimpleList(self):
self.assertReturnType(
typehints.List[int],
Expand All @@ -123,9 +106,6 @@ def testSimpleList(self):
lambda xs: list(xs), # List is a disallowed builtin
[typehints.Tuple[int, ...]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testListComprehension(self):
self.assertReturnType(
typehints.List[int],
Expand All @@ -134,7 +114,8 @@ def testListComprehension(self):

@unittest.skipIf(sys.version_info[0] == 3 and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.')
'This test still needs to be fixed on Python 3. '
'See BEAM-6877')
def testTupleListComprehension(self):
self.assertReturnType(
typehints.List[int],
Expand All @@ -154,9 +135,6 @@ def testTupleListComprehension(self):
lambda L: [(a, a or b, b) for a, b in L],
[typehints.Iterable[typehints.Tuple[str, int]]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testGenerator(self):

def foo(x, y):
Expand All @@ -167,18 +145,12 @@ def foo(x, y):
self.assertReturnType(
typehints.Iterable[typehints.Union[int, float]], foo, [int, float])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testGeneratorComprehension(self):
self.assertReturnType(
typehints.Iterable[int],
lambda xs: (x for x in xs),
[typehints.Tuple[int, ...]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testBinOp(self):
self.assertReturnType(int, lambda a, b: a + b, [int, int])
self.assertReturnType(
Expand All @@ -187,9 +159,6 @@ def testBinOp(self):
typehints.List[typehints.Union[int, str]], lambda a, b: a + b,
[typehints.List[int], typehints.List[str]])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testCall(self):
f = lambda x, *args: x
self.assertReturnType(
Expand All @@ -198,37 +167,22 @@ def testCall(self):
self.assertReturnType(
typehints.Tuple[int, typehints.Any], lambda: (1, f(x=1.0)))

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testClosure(self):
x = 1
y = 1.0
self.assertReturnType(typehints.Tuple[int, float], lambda: (x, y))

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testGlobals(self):
self.assertReturnType(int, lambda: global_int)

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testBuiltins(self):
self.assertReturnType(int, lambda x: len(x), [typehints.Any])

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testGetAttr(self):
self.assertReturnType(
typehints.Tuple[str, typehints.Any],
lambda: (typehints.__doc__, typehints.fake))

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testMethod(self):

class A(object):
Expand All @@ -238,9 +192,6 @@ def m(self, x):
self.assertReturnType(int, lambda: A().m(3))
self.assertReturnType(float, lambda: A.m(A(), 3.0))

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testAlwaysReturnsEarly(self):

def some_fn(v):
Expand All @@ -250,9 +201,6 @@ def some_fn(v):

self.assertReturnType(int, some_fn)

@unittest.skipIf(sys.version_info >= (3, 6, 0) and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'This test still needs to be fixed on Python 3.6.')
def testDict(self):
self.assertReturnType(
typehints.Dict[typehints.Any, typehints.Any], lambda: {})
Expand Down