diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml index a65970968b2c..92a20e5003e7 100644 --- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml @@ -91,7 +91,7 @@ jobs: - name: run PreCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:yamlIntegrationTests + gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test - name: Archive Python Test Results uses: actions/upload-artifact@v4 if: failure() diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 6eb45a109d31..472ece04f9e4 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -3021,9 +3021,15 @@ class BeamModulePlugin implements Plugin { dependsOn ':sdks:python:sdist' doLast { def distTarBall = "${pythonRootDir}/build/apache-beam.tar.gz" + def packages = "gcp,test,aws,azure,dataframe" + def extra = project.findProperty('beamPythonExtra') + if (extra) { + packages += ",${extra}" + } + project.exec { executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && pip install --pre --retries 10 ${distTarBall}[gcp,test,aws,azure,dataframe]" + args '-c', ". ${project.ext.envdir}/bin/activate && pip install --pre --retries 10 ${distTarBall}[${packages}]" } } } diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml new file mode 100644 index 000000000000..edaa581214ea --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Assign timestamp to beam row element + - pipeline: + type: chain + transforms: + - type: Create + name: CreateVisits + config: + elements: + - user: alice + new_time_stamp: 1 + - user: alice + new_time_stamp: 3 + - user: bob + new_time_stamp: 7 + - type: AssignTimestamps + config: + timestamp: new_time_stamp + - type: ExtractWindowingInfo + config: + fields: [timestamp] + - type: MapToFields + config: + language: python + fields: + user: user + timestamp: timestamp + - type: AssertEqual + config: + elements: + - {user: "alice", timestamp: 1} + - {user: "alice", timestamp: 3} + - {user: "bob", timestamp: 7} + + # Assign timestamp to beam row element with error_handling + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + - type: MapToFields + input: AssignTimestamps.invalid_rows + config: + language: python + fields: + user: "element.user" + timestamp: "element.timestamp" + - type: AssertEqual + input: MapToFields + config: + elements: + - {user: "alice", timestamp: "not-valid"} + - type: AssertEqual + input: AssignTimestamps + config: + elements: + - {user: bob, timestamp: 3} diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml b/sdks/python/apache_beam/yaml/tests/create.yaml new file mode 100644 index 000000000000..bed364c17143 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/create.yaml @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Simple Create with element list + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: [1,2,3,4,5] + - type: AssertEqual + config: + elements: + - {element: 1} + - {element: 2} + - {element: 3} + - {element: 4} + - {element: 5} + + # Simple Create with more complex beam row + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {first: 0, second: [1,2,3]} + - {first: 1, second: [4,5,6]} + - type: AssertEqual + config: + elements: + - {first: 0, second: [1,2,3]} + - {first: 1, second: [4,5,6]} + + # Simple Create with reshuffle + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {first: 0, second: [1,2,3]} + - {first: 1, second: [4,5,6]} + - {first: 2, second: [7,8,9]} + reshuffle: false + - type: AssertEqual + config: + elements: + - {first: 0, second: [1,2,3]} + - {first: 1, second: [4,5,6]} + - {first: 2, second: [7,8,9]} diff --git a/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml new file mode 100644 index 000000000000..45233e2039e5 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Simply extract windowing info with no fields resulting in default fields + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} + - type: ExtractWindowingInfo + - type: MapToFields + config: + language: python + fields: + label: + callable: "lambda x: x.label" + rank: + callable: "lambda x: x.rank" + timestamp: + callable: "lambda x: str(x.timestamp)" + window_start: + callable: "lambda x: str(x.window_start)" + window_end: + callable: "lambda x: str(x.window_end)" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)} + - {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)} + - {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)} + + + # Simply extract windowing info with all available fields + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} + - type: ExtractWindowingInfo + config: + fields: [timestamp, window_start, window_end, window_string, window_type, window_object, pane_info] + - type: MapToFields + config: + language: python + fields: + label: + callable: "lambda x: x.label" + rank: + callable: "lambda x: x.rank" + timestamp: + callable: "lambda x: str(x.timestamp)" + window_start: + callable: "lambda x: str(x.window_start)" + window_end: + callable: "lambda x: str(x.window_end)" + window_string: + callable: "lambda x: str(x.window_string)" + window_type: + callable: "lambda x: str(x.window_type)" + window_object: + callable: "lambda x: str(x.window_object)" + pane_info: + callable: "lambda x: str(x.pane_info)" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + - {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + - {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + + + # Simply extract windowing info with a few fields renamed + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} + - type: ExtractWindowingInfo + config: + fields: + ts: timestamp + ws: window_start + pane: pane_info + - type: MapToFields + config: + language: python + fields: + label: + callable: "lambda x: x.label" + rank: + callable: "lambda x: x.rank" + ts: + callable: "lambda x: str(x.ts)" + ws: + callable: "lambda x: str(x.ws)" + pane: + callable: "lambda x: str(x.pane)" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + - {label: "37a", rank: 1, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + - {label: "389a", rank: 2, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"} + diff --git a/sdks/python/apache_beam/yaml/tests/ml_transform.yaml b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml new file mode 100644 index 000000000000..c4881096f0d0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + # MLTransform with write_artifact_location + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {num: 0, text: 'To be or not to be'} + - {num: 2, text: 'I think, therefore I am'} + - {num: 5, text: 'The only thing we have to fear is fear itself'} + - {num: 8, text: 'Be the change you wish to see in the world'} + - type: MLTransform + config: + write_artifact_location: "{TEMP_DIR}" + transforms: + - type: ScaleTo01 + config: + columns: [num] + - type: ScaleByMinMax + config: + columns: [num] + min_value: 0 + max_value: 100 + - type: MapToFields + config: + language: python + fields: + num_scaled: + callable: 'lambda x: x.num[0]' + - type: AssertEqual + config: + elements: + - {num_scaled: 0.0} + - {num_scaled: 25.0} + - {num_scaled: 62.5} + - {num_scaled: 100.0} + options: + yaml_experimental_features: ['ML'] + + # MLTransform with read_artifact_location based on previous + # write_artifact_location + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {num: 0, text: 'To be or not to be'} + - {num: 2, text: 'I think, therefore I am'} + - {num: 5, text: 'The only thing we have to fear is fear itself'} + - {num: 8, text: 'Be the change you wish to see in the world'} + - type: MLTransform + config: + read_artifact_location: "{TEMP_DIR}" + - type: MapToFields + config: + language: python + fields: + num_scaled: + callable: 'lambda x: x.num[0]' + - type: AssertEqual + config: + elements: + - {num_scaled: 0.0} + - {num_scaled: 25.0} + - {num_scaled: 62.5} + - {num_scaled: 100.0} + options: + yaml_experimental_features: ['ML'] + diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml new file mode 100644 index 000000000000..d5ae57a3e8c1 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + # Validate a Beam Row with a predefined schema with no error handling + - pipeline: + type: chain + transforms: + - type: Create + name: InputData + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} + - type: ValidateWithSchema + config: + schema: + type: object + properties: + name: + type: string + age: + type: integer + score: + type: number + - type: AssertEqual + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} + + # Validate a Beam Row with a predefined schema with error handling + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} + - {name: "Charlie", age: 27, score: "apple"} + - {name: "David", age: "twenty", score: 90.0} + - {name: 30, age: 40, score: 100.0} + - type: ValidateWithSchema + input: Create + config: + schema: + type: object + properties: + name: + type: string + age: + type: integer + score: + type: number + required: [name, age, score] + error_handling: + output: invalid_rows + - type: MapToFields + input: ValidateWithSchema.invalid_rows + config: + language: python + fields: + name: "element.name" + age: "element.age" + score: "element.score" + - type: AssertEqual + input: MapToFields + config: + elements: + - {name: "Charlie", age: 27, score: "apple"} + - {name: "David", age: "twenty", score: 90.0} + - {name: 30, age: 40, score: 100.0} + - type: AssertEqual + input: ValidateWithSchema + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} + +