From f1d9e93938881ddc0cbda5363272767da3a9a59d Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Tue, 24 Sep 2024 19:31:05 +0700 Subject: [PATCH 01/16] Log Warning if process function return None --- sdks/python/apache_beam/transforms/core.py | 15 +++++++- .../apache_beam/transforms/core_test.py | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index d7415e8d8135..30afd40d76d9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1497,15 +1497,26 @@ def _check_fn_use_yield_and_return(fn): source_code = _get_function_body_without_inners(fn) has_yield = False has_return = False + return_none_warning = ( + "No iterator is returned by the process method in %s.", + fn.__self__.__class__) for line in source_code.split("\n"): - if line.lstrip().startswith("yield ") or line.lstrip().startswith( + lstripped_line = line.lstrip() + if lstripped_line.startswith("yield ") or lstripped_line.startswith( "yield("): has_yield = True - if line.lstrip().startswith("return ") or line.lstrip().startswith( + if lstripped_line.startswith("return ") or lstripped_line.startswith( "return("): has_return = True + if lstripped_line.startswith( + "return None") or lstripped_line.rstrip() == "return": + _LOGGER.warning(return_none_warning) if has_yield and has_return: return True + + if not has_yield and not has_return: + _LOGGER.warning(return_none_warning) + return False except Exception as e: _LOGGER.debug(str(e)) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index b492ab0938cc..54afb365d2d8 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -30,6 +30,8 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.window import FixedWindows +RETURN_NONE_PARTIAL_WARNING = "No iterator is returned" + class TestDoFn1(beam.DoFn): def process(self, element): @@ -96,6 +98,24 @@ def process(self, element): yield element +class TestDoFn10(beam.DoFn): + """test process returning None explicitly""" + def process(self, element): + return None + + +class TestDoFn11(beam.DoFn): + """test process returning None (no return and no yield)""" + def process(self, element): + pass + + +class TestDoFn12(beam.DoFn): + """test process returning None (return statement without a value)""" + def process(self, element): + return + + class CreateTest(unittest.TestCase): @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): @@ -119,6 +139,24 @@ def test_dofn_with_yield_and_return(self): beam.ParDo(TestDoFn3()) assert warning_text in self._caplog.text + def test_dofn_with_explicit_return_none(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn10()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn10) in self._caplog.text + + def test_dofn_with_implicit_return_none_missing_return_and_yield(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn11()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn11) in self._caplog.text + + def test_dofn_with_implicit_return_none_return_without_value(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn12()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn12) in self._caplog.text + class PartitionTest(unittest.TestCase): def test_partition_boundedness(self): From 3cdb62910a62557b7208d5e8a2fb517c42664fe3 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:20:23 +0700 Subject: [PATCH 02/16] Fix get function without inner --- sdks/python/apache_beam/transforms/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 30afd40d76d9..42523dae9d55 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1463,7 +1463,10 @@ def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) def_line = next(source_lines).strip() - if def_line.startswith("def ") and def_line.endswith(":"): + if def_line.startswith("def "): + while next(source_lines).split("#")[0].split("\"\"\"")[0].endswith(":"): + continue + first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) final_lines = [first_line[indentation:]] From 9170808d4313ac2e5a81d7393231ce791a68e67f Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:34:03 +0700 Subject: [PATCH 03/16] check the first def_line also --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 42523dae9d55..f1001076b2ef 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1464,8 +1464,8 @@ def _get_function_body_without_inners(func): source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) def_line = next(source_lines).strip() if def_line.startswith("def "): - while next(source_lines).split("#")[0].split("\"\"\"")[0].endswith(":"): - continue + while not def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + def_line = next(source_lines) first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) From 9370ed24aa4edcde077a4586088eb8af7fe621a4 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:37:00 +0700 Subject: [PATCH 04/16] rename variable --- sdks/python/apache_beam/transforms/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index f1001076b2ef..2d142e3e628a 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1462,10 +1462,11 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) - def_line = next(source_lines).strip() - if def_line.startswith("def "): - while not def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): - def_line = next(source_lines) + first_def_line = next(source_lines).strip() + if first_def_line.startswith("def "): + last_def_line = first_def_line + while not last_def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + last_def_line = next(source_lines) first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) @@ -1490,7 +1491,7 @@ def _get_function_body_without_inners(func): return "".join(final_lines) else: - return def_line.rsplit(":")[-1].strip() + return first_def_line.rsplit(":")[-1].strip() def _check_fn_use_yield_and_return(fn): From 62196640b6c6f1200975589b4ade96e86cb9c0d8 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:12:30 +0700 Subject: [PATCH 05/16] add strip function --- sdks/python/apache_beam/transforms/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d142e3e628a..1d596536d2be 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1465,7 +1465,8 @@ def _get_function_body_without_inners(func): first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line = first_def_line - while not last_def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + while not last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith( + ":"): last_def_line = next(source_lines) first_line = next(source_lines) From 5a3c1a6c7e9d581fdc4a3850c297570c6226c706 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:14:12 +0700 Subject: [PATCH 06/16] reformat function --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 1d596536d2be..46ad948bb0a9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1465,8 +1465,8 @@ def _get_function_body_without_inners(func): first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line = first_def_line - while not last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith( - ":"): + while not ( + last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith(":")): last_def_line = next(source_lines) first_line = next(source_lines) From 3bfc43d77f16961f2883a51679ed1bc83db4f178 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:20:01 +0700 Subject: [PATCH 07/16] refactor code --- sdks/python/apache_beam/transforms/core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 46ad948bb0a9..a3762adac0cb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1464,10 +1464,11 @@ def _get_function_body_without_inners(func): source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): - last_def_line = first_def_line - while not ( - last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith(":")): - last_def_line = next(source_lines) + last_def_line_without_comment = first_def_line.split("#")[0] \ + .split("\"\"\"")[0] + while not last_def_line_without_comment.strip().endswith(":"): + last_def_line_without_comment = next(source_lines).split("#")[0] \ + .split("\"\"\"")[0] first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) From 278d1863e3d861824dc514ebcbb8057f95edc06f Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 17:17:09 +0700 Subject: [PATCH 08/16] fix bug in get function body --- sdks/python/apache_beam/transforms/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a3762adac0cb..3fec653a705b 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1461,7 +1461,7 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] - source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) + source_lines = dropwhile(lambda x: x.strip().startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line_without_comment = first_def_line.split("#")[0] \ From e8193668022fdba8efb2b6fe9413d990e3494c87 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 18:04:56 +0700 Subject: [PATCH 09/16] retrigger test --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3fec653a705b..a1af96c865d1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1504,8 +1504,8 @@ def _check_fn_use_yield_and_return(fn): has_yield = False has_return = False return_none_warning = ( - "No iterator is returned by the process method in %s.", - fn.__self__.__class__) + f"No iterator is returned by the " + f"process method in {fn.__self__.__class__}.") for line in source_code.split("\n"): lstripped_line = line.lstrip() if lstripped_line.startswith("yield ") or lstripped_line.startswith( From 25d5431232b6134713a5b396b9a4330280e1d9bb Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 18:41:06 +0700 Subject: [PATCH 10/16] retrigger test --- sdks/python/apache_beam/transforms/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a1af96c865d1..256dbd87ddba 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1461,7 +1461,7 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] - source_lines = dropwhile(lambda x: x.strip().startswith("@"), source_lines) + source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line_without_comment = first_def_line.split("#")[0] \ From d912aa305e7f416f5253e83d28f59342ed7bee2c Mon Sep 17 00:00:00 2001 From: DKER2 Date: Fri, 4 Apr 2025 14:35:09 +0800 Subject: [PATCH 11/16] fix(avro_to_beam): parse negative int failed --- .../core/src/main/java/org/apache/beam/sdk/util/VarInt.java | 2 +- sdks/python/apache_beam/io/avroio.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java index 5432383f5ad4..139a40fdbc2c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java @@ -102,7 +102,7 @@ public static void encode(long v, OutputStream stream) throws IOException { /** Decodes an integer value from the given stream. */ public static int decodeInt(InputStream stream) throws IOException { long r = decodeLong(stream); - if (r < 0 || r >= 1L << 32) { + if (r >= 1L << 32) { throw new IOException("varint overflow " + r); } return (int) r; diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 8b7958a00b80..16665fd224ae 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -646,9 +646,9 @@ def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): if value is None: return value elif avro_type == "int": - return ctypes.c_uint32(value).value + return ctypes.c_int32(value).value elif avro_type == "long": - return ctypes.c_uint64(value).value + return ctypes.c_int64(value).value else: return value From 23cf8b6c047c7624106d8d24b4f15da4b84583b4 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Fri, 4 Apr 2025 14:38:36 +0800 Subject: [PATCH 12/16] refactor: revert redundant change --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 256dbd87ddba..a3762adac0cb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1504,8 +1504,8 @@ def _check_fn_use_yield_and_return(fn): has_yield = False has_return = False return_none_warning = ( - f"No iterator is returned by the " - f"process method in {fn.__self__.__class__}.") + "No iterator is returned by the process method in %s.", + fn.__self__.__class__) for line in source_code.split("\n"): lstripped_line = line.lstrip() if lstripped_line.startswith("yield ") or lstripped_line.startswith( From 2f01a268b51c22cd0421a0205163495684ee2b2f Mon Sep 17 00:00:00 2001 From: DKER2 Date: Fri, 4 Apr 2025 14:57:02 +0800 Subject: [PATCH 13/16] fix(java.test): remove unrelevant test --- .../test/java/org/apache/beam/sdk/util/VarIntTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java index cf15adebdf37..eb3c25abf74b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java @@ -253,14 +253,6 @@ public void decodeThrowsExceptionForIntOverflow() throws IOException { decodeInt(encoded); } - @Test - public void decodeThrowsExceptionForIntUnderflow() throws IOException { - byte[] encoded = encodeLong(-1); - - thrown.expect(IOException.class); - decodeInt(encoded); - } - @Test public void decodeThrowsExceptionForNonterminated() throws IOException { final byte[] nonTerminatedNumber = {(byte) 0xff, (byte) 0xff}; From c59ba993122414af645734f4e3518cdda25a7da4 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Fri, 4 Apr 2025 16:13:03 +0800 Subject: [PATCH 14/16] fix(avroio sdk.test): change expected behavior --- sdks/python/apache_beam/io/avroio_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 77b20117e702..205f1d3da007 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -196,9 +196,8 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): assert_that(readback, equal_to(records)) def test_avro_atomic_value_to_beam_atomic_value(self): - input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff), - ('int', None, None), ('long', 1, 1), - ('long', -1, 0xffffffffffffffff), ('long', None, None), + input_outputs = [('int', 1, 1), ('int', -1, -1), ('int', None, None), + ('long', 1, 1), ('long', -1, -1), ('long', None, None), ('string', 'foo', 'foo')] for test_avro_type, test_value, expected_value in input_outputs: actual_value = avro_atomic_value_to_beam_atomic_value( From 7db748a1b5f4555e7bf2ba6d2456d245f30e16ac Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 9 Apr 2025 15:51:16 +0800 Subject: [PATCH 15/16] Revert "fix(java.test): remove unrelevant test" This reverts commit 2f01a268b51c22cd0421a0205163495684ee2b2f. --- .../test/java/org/apache/beam/sdk/util/VarIntTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java index eb3c25abf74b..cf15adebdf37 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/VarIntTest.java @@ -253,6 +253,14 @@ public void decodeThrowsExceptionForIntOverflow() throws IOException { decodeInt(encoded); } + @Test + public void decodeThrowsExceptionForIntUnderflow() throws IOException { + byte[] encoded = encodeLong(-1); + + thrown.expect(IOException.class); + decodeInt(encoded); + } + @Test public void decodeThrowsExceptionForNonterminated() throws IOException { final byte[] nonTerminatedNumber = {(byte) 0xff, (byte) 0xff}; From f662db76db3bf1d1ed4f05156132dcbeccc259eb Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 9 Apr 2025 15:58:35 +0800 Subject: [PATCH 16/16] fix: remove workaround --- .../java/org/apache/beam/sdk/util/VarInt.java | 2 +- sdks/python/apache_beam/io/avroio.py | 57 +------------------ sdks/python/apache_beam/io/avroio_test.py | 21 ------- 3 files changed, 3 insertions(+), 77 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java index 139a40fdbc2c..5432383f5ad4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java @@ -102,7 +102,7 @@ public static void encode(long v, OutputStream stream) throws IOException { /** Decodes an integer value from the given stream. */ public static int decodeInt(InputStream stream) throws IOException { long r = decodeLong(stream); - if (r >= 1L << 32) { + if (r < 0 || r >= 1L << 32) { throw new IOException("varint overflow " + r); } return (int) r; diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 16665fd224ae..d22ac84fea36 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -43,7 +43,6 @@ Avro file. """ # pytype: skip-file -import ctypes import os from functools import partial from typing import Any @@ -629,37 +628,11 @@ def to_row(record): to_row) -def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): - """convert an avro atomic value to a beam atomic value - - if the avro type is an int or long, convert the value into from signed to - unsigned because VarInt.java expects the number to be unsigned when - decoding the number. - - Args: - avro_type: the avro type of the corresponding value. - value: the avro atomic value. - - Returns: - the converted beam atomic value. - """ - if value is None: - return value - elif avro_type == "int": - return ctypes.c_int32(value).value - elif avro_type == "long": - return ctypes.c_int64(value).value - else: - return value - - def avro_value_to_beam_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - return lambda value: avro_atomic_value_to_beam_atomic_value( - avro_type, value) + return lambda value: value elif type_info == "array_type": element_converter = avro_value_to_beam_value( beam_type.array_type.element_type) @@ -767,37 +740,11 @@ def beam_row_to_avro_dict( return lambda row: convert(row[0]) -def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): - """convert a beam atomic value to an avro atomic value - - since numeric values are converted to unsigned in - avro_atomic_value_to_beam_atomic_value we need to convert - back to a signed number. - - Args: - avro_type: avro type of the corresponding value. - value: the beam atomic value. - - Returns: - the converted avro atomic value. - """ - if value is None: - return value - elif avro_type == "int": - return ctypes.c_int32(value).value - elif avro_type == "long": - return ctypes.c_int64(value).value - else: - return value - - def beam_value_to_avro_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - return lambda value: beam_atomic_value_to_avro_atomic_value( - avro_type, value) + return lambda value: value elif type_info == "array_type": element_converter = beam_value_to_avro_value( beam_type.array_type.element_type) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 205f1d3da007..d4fc3259594a 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -40,9 +40,7 @@ from apache_beam.io.avroio import _FastAvroSource # For testing from apache_beam.io.avroio import avro_schema_to_beam_schema # For testing from apache_beam.io.avroio import beam_schema_to_avro_schema # For testing -from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing -from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing from apache_beam.io.avroio import avro_dict_to_beam_row # For testing from apache_beam.io.avroio import beam_row_to_avro_dict # For testing from apache_beam.io.avroio import _create_avro_sink # For testing @@ -195,25 +193,6 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): | beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema))) assert_that(readback, equal_to(records)) - def test_avro_atomic_value_to_beam_atomic_value(self): - input_outputs = [('int', 1, 1), ('int', -1, -1), ('int', None, None), - ('long', 1, 1), ('long', -1, -1), ('long', None, None), - ('string', 'foo', 'foo')] - for test_avro_type, test_value, expected_value in input_outputs: - actual_value = avro_atomic_value_to_beam_atomic_value( - test_avro_type, test_value) - hc.assert_that(actual_value, hc.equal_to(expected_value)) - - def test_beam_atomic_value_to_avro_atomic_value(self): - input_outputs = [('int', 1, 1), ('int', 0xffffffff, -1), - ('int', None, None), ('long', 1, 1), - ('long', 0xffffffffffffffff, -1), ('long', None, None), - ('string', 'foo', 'foo')] - for test_avro_type, test_value, expected_value in input_outputs: - actual_value = beam_atomic_value_to_avro_atomic_value( - test_avro_type, test_value) - hc.assert_that(actual_value, hc.equal_to(expected_value)) - def test_avro_union_type_to_beam_type_with_nullable_long(self): union_type = ['null', 'long'] beam_type = avro_union_type_to_beam_type(union_type)