From 3196f854adf83c5bfe6f8609e2fb16487fb8b9c0 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:01:40 +0000 Subject: [PATCH 1/7] add explode test --- .../apache_beam/yaml/tests/explode.yaml | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/explode.yaml diff --git a/sdks/python/apache_beam/yaml/tests/explode.yaml b/sdks/python/apache_beam/yaml/tests/explode.yaml new file mode 100644 index 000000000000..c1e07e519052 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/explode.yaml @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Simple Explode with single field + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - produce: ['Strawberry', 'Corn'] + season: ['spring', 'summer'] + - type: Explode + name: Flatten lists + config: + fields: [produce] + - type: AssertEqual + config: + elements: + - {produce: 'Strawberry', season: ['spring','summer']} + - {produce: 'Corn', season: ['spring', 'summer']} + + # Simple Explode with multiple fields and cross_product=False + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - produce: ['Strawberry', 'Corn'] + season: ['spring', 'summer'] + - type: Explode + name: Flatten lists + config: + fields: [produce, season] + cross_product: false + - type: AssertEqual + config: + elements: + - {produce: 'Strawberry', season: 'spring'} + - {produce: 'Corn', season: 'summer'} + + # Simple Explode with multiple fields and cross_product=True + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - produce: ['Strawberry', 'Corn'] + season: ['spring', 'summer'] + - type: Explode + name: Flatten lists + config: + fields: [produce, season] + cross_product: true + - type: AssertEqual + config: + elements: + - {produce: 'Strawberry', season: 'spring'} + - {produce: 'Strawberry', season: 'summer'} + - {produce: 'Corn', season: 'spring'} + - {produce: 'Corn', season: 'summer'} + From 5b9dac5548973d6d364e12d58b69c5444fc3b63b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:01:54 +0000 Subject: [PATCH 2/7] add filter test --- .../python/apache_beam/yaml/tests/filter.yaml | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/filter.yaml diff --git a/sdks/python/apache_beam/yaml/tests/filter.yaml b/sdks/python/apache_beam/yaml/tests/filter.yaml new file mode 100644 index 000000000000..40f0307770d5 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/filter.yaml @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + # Simple Filter using Python + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + - type: Filter + config: + language: python + keep: category == "Electronics" and price > 100 + - type: AssertEqual + config: + elements: + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + + # Simple Filter using Python callable + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + - type: Filter + config: + language: python + keep: + callable: | + def my_filter(row): + return row.category == "Electronics" and row.price > 100 + - type: AssertEqual + config: + elements: + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + + # Simple Filter using Java + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + - type: Filter + config: + language: java + keep: category.equals("Electronics") && price > 100 + - type: AssertEqual + config: + elements: + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + + # Simple Filter using Java callable + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + - type: Filter + config: + language: java + keep: + callable: | + import org.apache.beam.sdk.values.Row; + import java.util.function.Function; + public class MyFunction implements Function { + public Boolean apply(Row row) { + return row.getString("category").equals("Electronics") && row.getDouble("price") > 100; + } + } + - type: AssertEqual + config: + elements: + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + + + # Simple Filter with error handling + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {category: "Electronics", price: 59.99, product_name: "Headphones", transaction_id: "T0012"} + - type: Filter + name: FilterWithCategory + input: Create + config: + language: python + keep: category == Electronics + error_handling: + output: error_output + - type: WriteToJson + name: WriteErrorsToJson + input: FilterWithCategory.error_output + config: + path: "{TEMP_DIR}/error.json" + + # Read errors from previous pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromJson + config: + path: "{TEMP_DIR}/error.json*" From d756d1380f0227d366550513a40d887e0c98216e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:02:12 +0000 Subject: [PATCH 3/7] add flatten test --- .../apache_beam/yaml/tests/flatten.yaml | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/flatten.yaml diff --git a/sdks/python/apache_beam/yaml/tests/flatten.yaml b/sdks/python/apache_beam/yaml/tests/flatten.yaml new file mode 100644 index 000000000000..1c375570ca9e --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/flatten.yaml @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Simple Flatten with three inputs + - pipeline: + type: composite + transforms: + - type: Create + name: CreateA + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - type: Create + name: CreateB + config: + elements: + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - type: Create + name: CreateC + config: + elements: + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + - type: Flatten + input: [CreateA, CreateB, CreateC] + - type: AssertEqual + input: Flatten + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - {transaction_id: "T0104", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0302", product_name: "Monitor", category: "Electronics", price: 249.99} + + # Simple Flatten with one input + - pipeline: + type: composite + transforms: + - type: Create + name: CreateA + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + - type: Flatten + input: CreateA + - type: AssertEqual + input: Flatten + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0024", product_name: "Aluminum Mug", category: "Kitchen", price: 29.9} + + # Simple Flatten with duplicates in each input + - pipeline: + type: composite + transforms: + - type: Create + name: CreateA + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - type: Create + name: CreateB + config: + elements: + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - type: Flatten + input: [CreateA, CreateB] + - type: AssertEqual + input: Flatten + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + + # Simple Flatten with duplicates across inputs + - pipeline: + type: composite + transforms: + - type: Create + name: CreateA + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - type: Create + name: CreateB + config: + elements: + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - type: Flatten + input: [CreateA, CreateB] + - type: AssertEqual + input: Flatten + config: + elements: + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T0012", product_name: "Headphones", category: "Electronics", price: 59.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} + - {transaction_id: "T5034", product_name: "Leather Jacket", category: "Apparel", price: 109.99} From fad31e00dbc7ef18d819bf32644afc5d641bc590 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:03:32 +0000 Subject: [PATCH 4/7] add partition test --- .../apache_beam/yaml/tests/partition.yaml | 268 ++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/partition.yaml diff --git a/sdks/python/apache_beam/yaml/tests/partition.yaml b/sdks/python/apache_beam/yaml/tests/partition.yaml new file mode 100644 index 000000000000..6cba67425a23 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/partition.yaml @@ -0,0 +1,268 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + # Simple Partition into one group with individual values + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Orange' + - produce: 'Strawberry' + - type: Partition + name: PartitionFruits + input: Create + config: + by: "'fruits'" + language: python + outputs: + - "fruits" + - type: AssertEqual + input: PartitionFruits.fruits + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Orange' + - produce: 'Strawberry' + + # Simple Partition into two groups with individual values + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Orange' + - produce: 'Strawberry' + - type: Partition + name: PartitionFruits + input: Create + config: + by: "'vowel_fruits' if produce[0].upper() in 'AEIOU' else 'consonant_fruits'" + language: python + outputs: + - "vowel_fruits" + - "consonant_fruits" + - type: AssertEqual + input: PartitionFruits.vowel_fruits + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Orange' + - type: AssertEqual + input: PartitionFruits.consonant_fruits + config: + elements: + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Strawberry' + + # Simple Partition into two groups with lists of values + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 'Lemon', 'Peach', 'Watermelon'] + - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 'Orange', 'Pomegranate', 'Cranberry'] + - produce: ['Banana'] + - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 'Blueberry'] + - produce: ['Cherry', 'Grape'] + - type: Partition + name: PartitionFruits + input: Create + config: + by: "'too_much_fruits' if len(produce) > 4 else 'just_right_fruits'" + language: python + outputs: + - "too_much_fruits" + - "just_right_fruits" + - type: AssertEqual + input: PartitionFruits.too_much_fruits + config: + elements: + - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 'Lemon', 'Peach', 'Watermelon'] + - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 'Orange', 'Pomegranate', 'Cranberry'] + - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 'Blueberry'] + - type: AssertEqual + input: PartitionFruits.just_right_fruits + config: + elements: + - produce: ['Banana'] + - produce: ['Cherry', 'Grape'] + + # Sequentially Partition into two groups each with lists of values + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 'Lemon', 'Peach', 'Watermelon'] + - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 'Orange', 'Pomegranate', 'Cranberry'] + - produce: ['Banana'] + - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 'Blueberry'] + - produce: ['Cherry', 'Grape'] + - type: Partition + name: PartitionFruitsSize + input: Create + config: + by: "'too_much_fruits' if len(produce) > 4 else 'just_right_fruits'" + language: python + outputs: + - "too_much_fruits" + - "just_right_fruits" + - type: Partition + name: PartitionFruitsYummyness + input: PartitionFruitsSize.too_much_fruits + config: + by: + callable: | + def partition_by_yummyness(row): + produce_list = row.produce + if any(fruit[0].upper() in 'AEIOU' for fruit in produce_list): + return 'yummy' + else: + return 'not_yummy' + language: python + outputs: + - "yummy" + - "not_yummy" + - type: AssertEqual + input: PartitionFruitsYummyness.yummy + config: + elements: + - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 'Orange', 'Pomegranate', 'Cranberry'] + - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 'Blueberry'] + - type: AssertEqual + input: PartitionFruitsYummyness.not_yummy + config: + elements: + - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 'Lemon', 'Peach', 'Watermelon'] + + + # Simple Partition into two groups with individual values with unknown output + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Orange' + - produce: 'Strawberry' + - type: Partition + name: PartitionFruits + input: Create + config: + by: + callable: | + def partition(row): + if row.produce[0].upper() in 'AEIOU': + return 'vowel_fruits' + elif row.produce in ['Date', 'Mango']: + return 'yummy_fruits' + language: python + outputs: + - "vowel_fruits" + - "yummy_fruits" + unknown_output: "unknown_fruits" + - type: AssertEqual + input: PartitionFruits.vowel_fruits + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Orange' + - type: AssertEqual + input: PartitionFruits.yummy_fruits + config: + elements: + - produce: 'Date' + - produce: 'Mango' + - type: AssertEqual + input: PartitionFruits.unknown_fruits + config: + elements: + - produce: 'Blueberry' + - produce: 'Fig' + - produce: 'Strawberry' + + + # Simple Partition with error + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - produce: 'Apple' + - produce: 'Apricot' + - produce: 'Blueberry' + - produce: 'Date' + - produce: 'Fig' + - produce: 'Mango' + - produce: 'Orange' + - produce: 'Strawberry' + - type: Partition + name: PartitionFruits + input: Create + config: + by: "'vowel_fruits' if produce[100] in 'AEIOU' else 'consonant_fruits'" + language: python + outputs: + - "vowel_fruits" + - "consonant_fruits" + error_handling: + output: error_output + - type: WriteToJson + name: WriteErrorsToJson + input: PartitionFruits.error_output + config: + path: "{TEMP_DIR}/error.json" From 6583874cb291be110881318c7170cbdccacb3c17 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:03:49 +0000 Subject: [PATCH 5/7] add pytransform test --- .../apache_beam/yaml/tests/pytransform.yaml | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/pytransform.yaml diff --git a/sdks/python/apache_beam/yaml/tests/pytransform.yaml b/sdks/python/apache_beam/yaml/tests/pytransform.yaml new file mode 100644 index 000000000000..241a16fd7c60 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/pytransform.yaml @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + # Simple PyTransform using __constructor__ + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} + - type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + def increase(inc): + return beam.Map(lambda x: beam.Row(price=x.price + inc, produce=x.produce)) + inc: 0.30 + - type: AssertEqual + config: + elements: + - {price: 1.59, produce: 'Apple'} + - {price: 0.59, produce: 'Apricot'} + - {price: 0.32, produce: 'Blueberry'} + - {price: 0.49, produce: 'Date'} + + # Simple PyTransform using __constructor__ beam.PTransform + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} + - type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + class MyPTransform(beam.PTransform): + def __init__(self, inc): + self._inc = inc + def expand(self, pcoll): + return pcoll | beam.Map(lambda x: beam.Row(price=x.price + self._inc, produce=x.produce)) + inc: 0.30 + - type: AssertEqual + config: + elements: + - {price: 1.59, produce: 'Apple'} + - {price: 0.59, produce: 'Apricot'} + - {price: 0.32, produce: 'Blueberry'} + - {price: 0.49, produce: 'Date'} + + # Simple PyTransform using __callable__ + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} + - type: PyTransform + config: + constructor: __callable__ + kwargs: + source: | + def increase(pcoll, inc): + return pcoll | beam.Map(lambda x: beam.Row(price=x.price + inc, produce=x.produce)) + inc: 0.30 + - type: AssertEqual + config: + elements: + - {price: 1.59, produce: 'Apple'} + - {price: 0.59, produce: 'Apricot'} + - {price: 0.32, produce: 'Blueberry'} + - {price: 0.49, produce: 'Date'} + + # Create Csv for pytransform read in next pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} + - type: WriteToCsv + config: + path: "{TEMP_DIR}/out.csv" + + # Simple PyTransform using ReadFromCsv with args + - pipeline: + type: composite + transforms: + - type: PyTransform + name: PyTransformReadFromCsv + input: {} + config: + constructor: apache_beam.io.ReadFromCsv + args: ['{TEMP_DIR}/out.csv*'] + - type: AssertEqual + input: PyTransformReadFromCsv + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} + + # Simple PyTransform using ReadFromCsv with kwargs + - pipeline: + type: composite + transforms: + - type: PyTransform + name: PyTransformReadFromCsv + input: {} + config: + constructor: apache_beam.io.ReadFromCsv + kwargs: + path: '{TEMP_DIR}/out.csv*' + - type: AssertEqual + input: PyTransformReadFromCsv + config: + elements: + - {price: 1.29, produce: 'Apple'} + - {price: 0.29, produce: 'Apricot'} + - {price: 0.02, produce: 'Blueberry'} + - {price: 0.19, produce: 'Date'} From 92a05c6162a5a39a36e2598f7304c891db4d970e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 02:04:21 +0000 Subject: [PATCH 6/7] update yaml_mapping docs --- sdks/python/apache_beam/yaml/yaml_mapping.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 2d4d9763e75b..5a2105135674 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -544,7 +544,6 @@ class _Explode(beam.PTransform): `cross_product` is set to `true` but only the two rows `('a', 1)` and `('b', 2)` when it is set to `false`. Only meaningful (and required) if multiple rows are specified. - error_handling: Whether and how to handle errors during iteration. """ # pylint: disable=line-too-long def __init__( @@ -750,7 +749,7 @@ def _Partition( outputs: list[str], unknown_output: Optional[str] = None, error_handling: Optional[Mapping[str, Any]] = None, - language: Optional[str] = 'generic'): + language: str = 'generic'): """Splits an input into several distinct outputs. Each input element will go to a distinct output based on the field or @@ -767,7 +766,7 @@ def _Partition( parameter. error_handling: (Optional) Whether and how to handle errors during partitioning. - language: (Optional) The language of the `by` expression. + language: The language of the `by` expression. """ split_fn = _as_callable_for_pcoll(pcoll, by, 'by', language) try: From 692d5e77040cc313025d972ec1193ff9ba397b3d Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 17 Apr 2025 20:31:39 +0000 Subject: [PATCH 7/7] yapf fix --- sdks/python/apache_beam/yaml/yaml_mapping.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 5a2105135674..c4aeba1d8ae2 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -544,6 +544,7 @@ class _Explode(beam.PTransform): `cross_product` is set to `true` but only the two rows `('a', 1)` and `('b', 2)` when it is set to `false`. Only meaningful (and required) if multiple rows are specified. + error_handling: Whether and how to handle errors during iteration. """ # pylint: disable=line-too-long def __init__( @@ -563,7 +564,10 @@ def __init__( cross_product = True self._fields = fields self._cross_product = cross_product - # TODO(yaml): Support standard error handling argument. + # TODO(yaml): + # 1. Support standard error handling argument. + # 2. Supposedly error_handling parameter is not an accepted parameter when + # executing. Needs further investigation. self._exception_handling_args = exception_handling_args(error_handling) @maybe_with_exception_handling