Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions sdks/python/apache_beam/io/avroio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing
from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing
from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing
from apache_beam.io.avroio import avro_dict_to_beam_row # For testing
from apache_beam.io.avroio import beam_row_to_avro_dict # For testing
from apache_beam.io.avroio import _create_avro_sink # For testing
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import StandardOptions
Expand Down Expand Up @@ -181,23 +183,17 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
records.append({
'name': 'Bruce', 'favorite_number': None, 'favorite_color': None
})
with tempfile.TemporaryDirectory() as tmp_dirname_input:
input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro')
parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
with open(input_path, 'wb') as tmp_avro_file:
fastavro.writer(tmp_avro_file, parsed_schema, records)

with tempfile.TemporaryDirectory() as tmp_dirname_output:

with TestPipeline() as p:
_ = (
p
| avroio.ReadFromAvro(input_path, as_rows=True)
| SqlTransform("SELECT * FROM PCOLLECTION")
| avroio.WriteToAvro(tmp_dirname_output))
with TestPipeline() as p:
readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*"))
assert_that(readback, equal_to(records))
avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
beam_schema = avro_schema_to_beam_schema(avro_schema)

with TestPipeline() as p:
readback = (
p
| Create(records)
| beam.Map(avro_dict_to_beam_row(avro_schema, beam_schema))
| SqlTransform("SELECT * FROM PCOLLECTION")
| beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema)))
assert_that(readback, equal_to(records))

def test_avro_atomic_value_to_beam_atomic_value(self):
input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff),
Expand Down