diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json b/.github/trigger_files/beam_PostCommit_XVR_Direct.json similarity index 100% rename from .github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json rename to .github/trigger_files/beam_PostCommit_XVR_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_TransformService_Direct.json b/.github/trigger_files/beam_PostCommit_XVR_Samza.json similarity index 100% rename from .github/trigger_files/beam_PostCommit_TransformService_Direct.json rename to .github/trigger_files/beam_PostCommit_XVR_Samza.json diff --git a/.github/trigger_files/beam_PostCommit_XVR_Flink.json b/.github/trigger_files/beam_PostCommit_XVR_Spark3.json similarity index 100% rename from .github/trigger_files/beam_PostCommit_XVR_Flink.json rename to .github/trigger_files/beam_PostCommit_XVR_Spark3.json diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index c95fbb612592..77b20117e702 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -43,6 +43,8 @@ from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing +from apache_beam.io.avroio import avro_dict_to_beam_row # For testing +from apache_beam.io.avroio import beam_row_to_avro_dict # For testing from apache_beam.io.avroio import _create_avro_sink # For testing from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import StandardOptions @@ -181,23 +183,17 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): records.append({ 'name': 'Bruce', 'favorite_number': None, 'favorite_color': None }) - with tempfile.TemporaryDirectory() as tmp_dirname_input: - input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro') - parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING)) - with open(input_path, 'wb') as tmp_avro_file: - fastavro.writer(tmp_avro_file, parsed_schema, records) - - with tempfile.TemporaryDirectory() as tmp_dirname_output: - - with TestPipeline() as p: - _ = ( - p - | avroio.ReadFromAvro(input_path, as_rows=True) - | SqlTransform("SELECT * FROM PCOLLECTION") - | avroio.WriteToAvro(tmp_dirname_output)) - with TestPipeline() as p: - readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*")) - assert_that(readback, equal_to(records)) + avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING)) + beam_schema = avro_schema_to_beam_schema(avro_schema) + + with TestPipeline() as p: + readback = ( + p + | Create(records) + | beam.Map(avro_dict_to_beam_row(avro_schema, beam_schema)) + | SqlTransform("SELECT * FROM PCOLLECTION") + | beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema))) + assert_that(readback, equal_to(records)) def test_avro_atomic_value_to_beam_atomic_value(self): input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff),